Skip to content

Commit

Permalink
15223: generate empty in convert (#16137)
Browse files Browse the repository at this point in the history
* 15223: generate empty report for garbled convert data

* fixup! 15223: generate empty report for garbled convert data

* fixup! 15223: generate empty report for garbled convert data

* fixup! 15223: generate empty report for garbled convert data

* fixup! 15223: generate empty report for garbled convert data

* fixup! 15223: generate empty report for garbled convert data
  • Loading branch information
mkalish authored Oct 8, 2024
1 parent 1333c99 commit 5025bf5
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 49 deletions.
16 changes: 16 additions & 0 deletions prime-router/src/main/kotlin/azure/ActionHistory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,22 @@ class ActionHistory(
}
}

/**
* Allows tracking of an empty report regardless of where it is generated in the pipeline
* @param report the details of the report
*/
fun trackEmptyReport(report: Report) {
generatingEmptyReport = true
val reportFile = ReportFile()
reportFile.reportId = report.id
reportFile.schemaTopic = report.schema.topic
reportFile.schemaName = "None"
reportFile.itemCount = 0
reportFile.bodyFormat = report.bodyFormat.toString()
reportFile.nextAction = TaskAction.none
reportsOut[reportFile.reportId] = reportFile
}

/**
* Use this to record history info about a newly generated empty [report] for sending to [receiver] that
* has requested an empty batch. The [event] will be batch or send.
Expand Down
33 changes: 3 additions & 30 deletions prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import gov.cdc.prime.router.azure.LookupTableConditionMapper
import gov.cdc.prime.router.azure.ProcessEvent
import gov.cdc.prime.router.azure.db.Tables
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ItemLineage
import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor
import gov.cdc.prime.router.azure.observability.bundleDigest.FhirPathBundleDigestLabResultExtractorStrategy
import gov.cdc.prime.router.azure.observability.context.MDCUtils
Expand Down Expand Up @@ -277,41 +276,15 @@ class FHIRConverter(
}.collect(Collectors.toList()).filterNotNull()
}
} else {
val nextEvent = ProcessEvent(
Event.EventAction.NONE,
queueMessage.reportId,
Options.None,
emptyMap(),
emptyList()
)

// TODO: https://github.com/CDCgov/prime-reportstream/issues/15223
val report = Report(
MimeFormat.FHIR,
format,
emptyList(),
1,
0,
metadata = this.metadata,
topic = queueMessage.topic,
nextAction = TaskAction.none
)

// create item lineage
report.itemLineages = listOf(
ItemLineage(
null,
queueMessage.reportId,
1,
report.id,
1,
null,
null,
null,
report.getItemHashForRow(1)
)
)

// ensure tracking is set
actionHistory.trackCreatedReport(nextEvent, report)
actionHistory.trackEmptyReport(report)
reportEventService.sendReportProcessingError(
ReportStreamEventName.REPORT_NOT_PROCESSABLE,
report,
Expand Down
47 changes: 33 additions & 14 deletions prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,24 @@ OBX|4|CWE|95421-4^Resides in a congregate care setting^LN^^^^2.69||N^No^HL70136|
OBX|5|CWE|95419-8^Has symptoms related to condition of interest^LN^^^^2.69||N^No^HL70136||||||F|||202102090000-0600|||||||||||||||QST
SPM|1|0cba76f5-35e0-4a28-803a-2f31308aae9b||258500001^Nasopharyngeal swab^SCT||||71836000^Nasopharyngeal structure (body structure)^SCT^^^^2020-09-01|||||||||202102090000-0600|202102090000-0600"""

// This report is trying to contain two items, but the HL7 is garbled, the first is missing an MSH segment and the second
// has a typo in its MSH segment
@Suppress("ktlint:standard:max-line-length")
const val garbledHL7Record =
"""FT|Centers for Disease Control and Prevention|0.1-SNAPSHOT|PRIME ReportStream|0.1-SNAPSHOT||20210210
PID|1||2a14112c-ece1-4f82-915c-7b3a8d152eda^^^Avante at Ormond Beach^PI||Buckridge^Kareem^Millie^^^^L||19580810|F||2106-3^White^HL70005^^^^2.5.1|688 Leighann Inlet^^South Rodneychester^TX^67071^^^^48077||7275555555:1:^PRN^^[email protected]^1^211^2240784|||||||||U^Unknown^HL70189||||||||N
ORC|RE|73a6e9bd-aaec-418e-813a-0ad33366ca85^6^7^8&F^9|73a6e9bd-aaec-418e-813a-0ad33366ca85|||||||||1629082607^Eddin^Husam^^^^^^CMS&2.16.840.1.113883.3.249&ISO^^^^NPI||^WPN^^^1^386^6825220|20210209||||||Avante at Ormond Beach|170 North King Road^^Ormond Beach^FL^32174^^^^12127|^WPN^^[email protected]^1^407^7397506|^^^^32174
OBR|1|73a6e9bd-aaec-418e-813a-0ad33366ca85|0cba76f5-35e0-4a28-803a-2f31308aae9b|94558-4^SARS-CoV-2 (COVID-19) Ag [Presence] in Respiratory specimen by Rapid immunoassay^LN|||202102090000-0600|202102090000-0600||||||||1629082607^Eddin^Husam^^^^^^CMS&2.16.840.1.113883.3.249&ISO^^^^NPI|^WPN^^^1^386^6825220|||||202102090000-0600|||F
OBX|1|CWE|94558-4^SARS-CoV-2 (COVID-19) Ag [Presence] in Respiratory specimen by Rapid immunoassay^LN||260415000^Not detected^SCT|||N^Normal (applies to non-numeric results)^HL70078|||F|||202102090000-0600|||CareStart COVID-19 Antigen test_Access Bio, Inc._EUA^^99ELR||202102090000-0600||||Avante at Ormond Beach^^^^^CLIA&2.16.840.1.113883.4.7&ISO^^^^10D0876999^CLIA|170 North King Road^^Ormond Beach^FL^32174^^^^12127
OBX|2|CWE|95418-0^Whether patient is employed in a healthcare setting^LN^^^^2.69||Y^Yes^HL70136||||||F|||202102090000-0600|||||||||||||||QST
OBX|3|CWE|95417-2^First test for condition of interest^LN^^^^2.69||Y^Yes^HL70136||||||F|||202102090000-0600|||||||||||||||QST
OBX|4|CWE|95421-4^Resides in a congregate care setting^LN^^^^2.69||N^No^HL70136||||||F|||202102090000-0600|||||||||||||||QST
OBX|5|CWE|95419-8^Has symptoms related to condition of interest^LN^^^^2.69||N^No^HL70136||||||F|||202102090000-0600|||||||||||||||QST
SPM|1|0cba76f5-35e0-4a28-803a-2f31308aae9b||258500001^Nasopharyngeal swab^SCT||||71836000^Nasopharyngeal structure (body structure)^SCT^^^^2020-09-01|||||||||202102090000-0600|202102090000-0600
SH|^~\&#!|CDC PRIME - Atlanta, Georgia (Dekalb)^2.16.840.1.114222.4.1.237821^ISO|Avante at Ormond Beach^10D0876999^CLIA|PRIME_DOH|Prime ReportStream|20210210170737||ORU^R01^ORU_R01|371784|P|2.5.1|||NE|NE|USA||||PHLabReportNoAck^ELR_Receiver^2.16.840.1.113883.9.99^ISO
SFT|Centers for Disease Control and Prevention|0.1-SNAPSHOT|PRIME ReportStream|0.1-SNAPSHOT||20210210
PID|1||2a14112c-ece1-4f82-915c-7b3a8d152eda^^^Avante at Ormond Beach^PI||Buckridge^Kareem^Millie^^^^L||19580810|F||2106-3^White^HL70005^^^^2.5.1|688 Leighann Inlet^^South Rodneychester^TX^67071^^^^48077||7275555555:1:^PRN^^[email protected]^1^211^2240784|||||||||U^Unknown^HL70189||||||||N"""

@Suppress("ktlint:standard:max-line-length")
const val validRadxMarsHL7Message =
"""MSH|^~\&|MMTC.PROD^2.16.840.1.113883.3.8589.4.2.106.1^ISO|CAREEVOLUTION^00Z0000024^CLIA|AIMS.INTEGRATION.PRD^2.16.840.1.114222.4.3.15.1^ISO|AIMS.PLATFORM^2.16.840.1.114222.4.1.217446^ISO|20240403205305+0000||ORU^R01^ORU_R01|20240403205305_dba7572cc6334f1ea0744c5f235c823e|P|2.5.1|||NE|NE|||||PHLabReport-NoAck^ELR251R1_Rcvr_Prof^2.16.840.1.113883.9.11^ISO
Expand Down Expand Up @@ -257,12 +275,13 @@ object UniversalPipelineTestUtils {
)

/**
* fetch child reports associated with a [parent] report and ensure we find an [expected] number of children
* fetch child reports associated with a [parent] report and ensure we find an [expectedItems] number of children
*/
fun fetchChildReports(
parent: Report,
txn: DataAccessTransaction,
expected: Int? = null,
expectedItems: Int? = null,
expectedReports: Int = 1,
): List<ReportFile> {
val itemLineages = DSL
.using(txn)
Expand All @@ -271,15 +290,15 @@ object UniversalPipelineTestUtils {
.where(ItemLineage.ITEM_LINEAGE.PARENT_REPORT_ID.eq(parent.id))
.fetchInto(gov.cdc.prime.router.azure.db.tables.pojos.ItemLineage::class.java)

if (expected != null) {
assertThat(itemLineages).hasSize(expected)
assertThat(itemLineages.map { it.childIndex }).isEqualTo(MutableList(expected) { 1 })
if (expectedItems != null) {
assertThat(itemLineages).hasSize(expectedItems)
assertThat(itemLineages.map { it.childIndex }).isEqualTo(MutableList(expectedItems) { 1 })

// itemCount is on the report created by the test. It will not be null.
if (parent.itemCount > 1) {
assertThat(itemLineages.map { it.parentIndex }).isEqualTo((1..expected).toList())
assertThat(itemLineages.map { it.parentIndex }).isEqualTo((1..expectedItems).toList())
} else {
assertThat(itemLineages.map { it.parentIndex }).isEqualTo(MutableList(expected) { 1 })
assertThat(itemLineages.map { it.parentIndex }).isEqualTo(MutableList(expectedItems) { 1 })
}
}

Expand All @@ -290,9 +309,7 @@ object UniversalPipelineTestUtils {
.where(ReportLineage.REPORT_LINEAGE.PARENT_REPORT_ID.eq(parent.id))
.fetchInto(gov.cdc.prime.router.azure.db.tables.pojos.ReportLineage::class.java)

if (expected != null) {
assertThat(reportLineages).hasSize(expected)
}
assertThat(reportLineages).hasSize(expectedReports)

val childReportIds = reportLineages.map {
it.childReportId
Expand All @@ -307,11 +324,13 @@ object UniversalPipelineTestUtils {
)
)
.fetchInto(ReportFile::class.java)
if (expected != null) {
assertThat(reportFiles).hasSize(expected)

assertThat(reportFiles).hasSize(expectedReports)

if (expectedItems != 0) {
assertThat(itemLineages).transform { lineages -> lineages.map { it.childReportId }.sorted() }
.isEqualTo(reportFiles.map { it.reportId }.sorted())
}
assertThat(itemLineages).transform { lineages -> lineages.map { it.childReportId }.sorted() }
.isEqualTo(reportFiles.map { it.reportId }.sorted())

return reportFiles
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import gov.cdc.prime.router.common.cleanHL7Record
import gov.cdc.prime.router.common.cleanHL7RecordConverted
import gov.cdc.prime.router.common.cleanHL7RecordConvertedAndTransformed
import gov.cdc.prime.router.common.conditionCodedValidFHIRRecord1
import gov.cdc.prime.router.common.garbledHL7Record
import gov.cdc.prime.router.common.invalidEmptyFHIRRecord
import gov.cdc.prime.router.common.invalidHL7Record
import gov.cdc.prime.router.common.invalidHL7RecordConverted
Expand Down Expand Up @@ -257,7 +258,7 @@ class FHIRConverterIntegrationTests {

ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val (routedReports, unroutedReports) = fetchChildReports(
receiveReport, txn, 4
receiveReport, txn, 4, 4
).partition { it.nextAction != TaskAction.none }
assertThat(routedReports).hasSize(2)
routedReports.forEach {
Expand Down Expand Up @@ -441,7 +442,7 @@ class FHIRConverterIntegrationTests {

ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val (routedReports, unroutedReports) = fetchChildReports(
receiveReport, txn, 4
receiveReport, txn, 4, 4
).partition { it.nextAction != TaskAction.none }
assertThat(routedReports).hasSize(2)
routedReports.forEach {
Expand Down Expand Up @@ -584,7 +585,7 @@ class FHIRConverterIntegrationTests {

ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val (routedReports, notRouted) = fetchChildReports(
receiveReport, txn, 2
receiveReport, txn, 2, 2
).partition { it.nextAction != TaskAction.none }

with(routedReports.single()) {
Expand Down Expand Up @@ -711,7 +712,7 @@ class FHIRConverterIntegrationTests {
fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert))

ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val routedReports = fetchChildReports(receiveReport, txn, 2)
val routedReports = fetchChildReports(receiveReport, txn, 2, 2)
routedReports.forEach {
assertThat(it.nextAction).isEqualTo(TaskAction.destination_filter)
assertThat(it.receivingOrg).isEqualTo(null)
Expand Down Expand Up @@ -804,4 +805,62 @@ class FHIRConverterIntegrationTests {
assertThat(report.bodyFormat).isEqualTo("FHIR")
}
}

@Test
fun `test should gracefully handle a case where number of items is unknown`() {
val receivedReportContents = garbledHL7Record
val receiveBlobUrl = BlobAccess.uploadBlob(
"receive/happy-path.hl7",
receivedReportContents.toByteArray(),
getBlobContainerMetadata()
)

val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1)
val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender)
val fhirFunctions = createFHIRFunctionsInstance()

fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert))

verify(exactly = 0) {
QueueAccess.sendMessage(any(), any())
}
ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val report = fetchChildReports(receiveReport, txn, 0).single()
assertThat(report.nextAction).isEqualTo(TaskAction.none)
assertThat(report.receivingOrg).isEqualTo(null)
assertThat(report.receivingOrgSvc).isEqualTo(null)
assertThat(report.schemaName).isEqualTo("None")
assertThat(report.schemaTopic).isEqualTo(Topic.FULL_ELR)
assertThat(report.bodyFormat).isEqualTo("HL7")
}
}

@Test
fun `test should gracefully handle a case with an empty contents`() {
val receivedReportContents = " "
val receiveBlobUrl = BlobAccess.uploadBlob(
"receive/happy-path.hl7",
receivedReportContents.toByteArray(),
getBlobContainerMetadata()
)

val receiveReport = setupConvertStep(MimeFormat.HL7, hl7Sender, receiveBlobUrl, 1)
val queueMessage = generateQueueMessage(receiveReport, receivedReportContents, hl7Sender)
val fhirFunctions = createFHIRFunctionsInstance()

fhirFunctions.process(queueMessage, 1, createFHIRConverter(), ActionHistory(TaskAction.convert))

verify(exactly = 0) {
QueueAccess.sendMessage(any(), any())
}
ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val report = fetchChildReports(receiveReport, txn, 0, 1).single()
assertThat(report.nextAction).isEqualTo(TaskAction.none)
assertThat(report.receivingOrg).isEqualTo(null)
assertThat(report.receivingOrgSvc).isEqualTo(null)
assertThat(report.schemaName).isEqualTo("None")
assertThat(report.schemaTopic).isEqualTo(Topic.FULL_ELR)
assertThat(report.bodyFormat).isEqualTo("HL7")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class FHIRDestinationFilterIntegrationTests : Logging {

// check results
ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val routedReports = fetchChildReports(report, txn, 2)
val routedReports = fetchChildReports(report, txn, 2, 2)
with(routedReports.first()) {
assertThat(this.nextAction).isEqualTo(TaskAction.receiver_filter)
assertThat(this.receivingOrg).isEqualTo("phd")
Expand Down

0 comments on commit 5025bf5

Please sign in to comment.