Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update spark version. Add cluster configs to ToFhirConfig. Fix sink p… #172

Merged
merged 5 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@
<!-- This is a special placeholder to manage the version from a single property throughout the parent and child modules. No other property name can be used. -->
<!-- https://maven.apache.org/maven-ci-friendly.html -->
<revision>1.1-SNAPSHOT</revision>

<scala.version>2.13.12</scala.version>
<scala.version>2.13.8</scala.version>
<scala.binary.version>2.13</scala.binary.version>

<!--Dependency versions-->
Expand All @@ -103,7 +102,7 @@
<json4s.version>3.7.0-M11</json4s.version>
<com.fasterxml.version>2.15.1</com.fasterxml.version>
<scalatest.version>3.2.17</scalatest.version>
<spark.version>3.5.0</spark.version>
<spark.version>3.5.1</spark.version>
<jackson.version>2.15.1</jackson.version>
<logback.version>1.2.11</logback.version>
<logstash-logback-encoder.version>7.2</logstash-logback-encoder.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object ToFhirConfig {
.entrySet()
.asScala
.filter(e => e.getKey != "app.name" && e.getKey != "master")
.map(e => e.getKey -> e.getValue.render())
.map(e => e.getKey -> e.getValue.unwrapped().toString)
.toMap

sparkConfEntries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.io.File
import java.net.URI
import java.nio.file.Paths
import java.time.LocalDateTime
import scala.collection.mutable

Expand All @@ -31,7 +33,15 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
* @throws NotImplementedError If the specified source format is not implemented.
*/
override def read(mappingSource: FileSystemSource, sourceSettings:FileSystemSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty,jobId: Option[String] = Option.empty): DataFrame = {
val finalPath = FileUtils.getPath(sourceSettings.dataFolderPath, mappingSource.path).toAbsolutePath.toString

// Do not add context path if it is a hadoop path
val finalPath = if (sourceSettings.dataFolderPath.startsWith("hdfs://")) {
new URI(s"${sourceSettings.dataFolderPath.stripSuffix("/")}/${mappingSource.path.stripPrefix("/")}").toString
} else {
FileUtils.getPath(sourceSettings.dataFolderPath, mappingSource.path).toAbsolutePath.toString
}


// validate whether the provided path is a directory when streaming is enabled in the source settings
if(sourceSettings.asStream && !new File(finalPath).isDirectory){
throw new IllegalArgumentException(s"$finalPath is not a directory. For streaming job, you should provide a directory.")
Expand Down
6 changes: 6 additions & 0 deletions tofhir-rxnorm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.5.1</version>
</dependency>
<!-- toFHIR common-->
<dependency>
<groupId>io.onfhir</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.tofhir.rxnorm

import java.io.{FileReader, FileWriter}
import com.opencsv.{CSVReaderBuilder, CSVWriter}
import io.onfhir.api.util.FHIRUtil
import org.json4s.JsonAST.{JArray, JDouble, JObject, JString}
import org.json4s._
import io.onfhir.util.JsonFormatter._

/**
* Handles the extraction of NDC (National Drug Codes) from a given CSV file (prescriptions.csv) and retrieves
* corresponding drug information from the RxNorm API. The extracted data is then compiled into a new CSV file
* named ndcToMedDetails.csv. This approach prevents HTTP 429 errors (Too Many Requests) by caching necessary
* drug information locally, facilitating efficient data mapping without direct RxNorm API calls for each request.
* The ndcToMedDetails.csv serves as a resource for the prescriptions-mapping-without-rxn mapping within
* the Mimic dataset.
*
* Inputs:
* - rxNormApiClient: Configured client for RxNorm API requests, including endpoint and timeout settings.
* - csvFilePath: File path to the prescription data CSV, containing NDC codes for querying the RxNorm API.
*
* Output:
* - Creates ndcToMedDetails.csv: A CSV file containing drug details mapped from NDC codes, with columns for:
* NDC, dose form RxNorm code (doseFormRxcui), dose form name (doseFormName), active ingredient RxNorm code
* (activeIngredientRxcui), active ingredient name (activeIngredientName), numerator unit, numerator value,
* denominator unit, and denominator value. These columns are used by the prescriptions-mapping-without-rxn
* mapping in the Mimic dataset.
*/

object PullRxNormNdcMedDetails extends App {
// RxNorm Api client for calls
val rxNormApiClient = RxNormApiClient("https://rxnav.nlm.nih.gov", 10)

// Read ncd codes from prescription table
val csvFilePath = "mimic-iv-data/hosp/prescriptions.csv"

// Open the CSV file reader
val fileReader = new FileReader(csvFilePath)
val csvReader = new CSVReaderBuilder(fileReader).withSkipLines(1).build() // Skip header line

// For unique ndc codes
var ndcSet = scala.collection.mutable.Set[String]()

try {
// Iterate over each row in the CSV file
var record = csvReader.readNext()

while (record != null) {
// Extract the "ndc" field
val ndcField = record(11)

// Add it into the set
ndcSet += ndcField

// Read the next record
record = csvReader.readNext()
}
// Convert it into immutable set
val ndcSetImmutable = ndcSet.toSet
println(s"number of ndc codes: ${ndcSetImmutable.size}")

// Create FileWriter object
val fileWriter = new FileWriter("ndcToMedDetails.csv")

// Create CSVWriter object
val csvWriter = new CSVWriter(fileWriter)

try{
// Set headers for the csv file
val header = Array("ndc", "doseFormRxcui", "doseFormName", "activeIngredientRxcui", "activeIngredientName",
"numeratorUnit", "numeratorValue", "denominatorUnit", "denominatorValue")
csvWriter.writeNext(header)

ndcSetImmutable.foreach(ndc => {
if(ndc.nonEmpty){
// Get conceptIds from RxNorm Api
try{
val conceptIds = rxNormApiClient.findRxConceptIdByNdc(ndc);

conceptIds
.iterator
.filter(_.nonEmpty)
.flatMap(rxcui =>
rxNormApiClient.getRxcuiHistoryStatus(rxcui)
)
.map(response => {
// Get the ingredient details
val ingredients =
FHIRUtil
.extractValueOptionByPath[Seq[JObject]](response, "rxcuiStatusHistory.definitionalFeatures.ingredientAndStrength")
.getOrElse(Nil)
val ingredientObjs =
ingredients
.map(i => {
val requiredFields =
Seq("activeIngredientRxcui", "activeIngredientName", "numeratorValue", "numeratorUnit", "denominatorValue", "denominatorUnit")
.map(f =>
FHIRUtil.extractValueOption[String](i, f)
.filter(_ != "") // Should filter empty string values
.map(v =>
if(f == "numeratorValue" || f == "denominatorValue")
f -> JDouble(v.toDouble)
else
f -> JString(v)
)
)
if (requiredFields.forall(_.isDefined))
Some(JObject(requiredFields.map(_.get).toList))
else
None
})

val doseFormObj =
FHIRUtil
.extractValueOptionByPath[Seq[JObject]](response, "rxcuiStatusHistory.definitionalFeatures.doseFormConcept")
.getOrElse(Nil)
.headOption
if(ingredientObjs.nonEmpty && ingredientObjs.forall(_.isDefined)){
Some(
JObject(
List(
"ingredientAndStrength" -> JArray(ingredientObjs.map(_.get).toList)
) ++
doseFormObj
.map(d => "doseFormConcept" -> d)
.toSeq
)
)
} else
None
})
.find(_.isDefined).foreach(r => {

val doseFormRxcui = (r.get \ "doseFormConcept" \ "doseFormRxcui").extract[String]
val doseFormName = (r.get \ "doseFormConcept" \ "doseFormName").extract[String]
val activeIngredientRxcui = ((r.get \ "ingredientAndStrength")(0) \ "activeIngredientRxcui").extract[String]
val activeIngredientName = ((r.get \ "ingredientAndStrength")(0) \ "activeIngredientName").extract[String]
val numeratorUnit = ((r.get \ "ingredientAndStrength")(0) \ "numeratorUnit").extract[String]
val numeratorValue = ((r.get \ "ingredientAndStrength")(0) \ "numeratorValue").extract[String]
val denominatorUnit = ((r.get \ "ingredientAndStrength")(0) \ "denominatorUnit").extract[String]
val denominatorValue = ((r.get \ "ingredientAndStrength")(0) \ "denominatorValue").extract[String]
csvWriter.writeNext(Array(ndc, doseFormRxcui, doseFormName, activeIngredientRxcui, activeIngredientName,
numeratorUnit, numeratorValue, denominatorUnit, denominatorValue))

}
)
}catch {
case e: Throwable => println(e)
}
}
})
} finally {
// Close the CSV write
fileWriter.close()
csvWriter.close()
}

} finally {
// Close the CSV reader
fileReader.close()
csvReader.close()
}
}
10 changes: 10 additions & 0 deletions tofhir-server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ spark = {
master = "local[4]"
# Directory to store Spark checkpoints
checkpoint-dir = "checkpoint"

# # Cluster configurations for spark
# spark.submit.deployMode = "cluster"
# spark.driver.host = "192.168.1.103"
# spark.driver.core = "8"
# spark.driver.memory = "12g"
# spark.executor.cores = "8"
# spark.executor.memory = "14g"
# spark.network.timeout = "600s"
# spark.jars = "tofhir-server-standalone.jar"
}

akka = {
Expand Down
Loading