Skip to content

Commit

Permalink
♻️ refactor: Rename 'batch' to 'chunk' whenever applicable
Browse files Browse the repository at this point in the history
  • Loading branch information
dogukan10 committed Aug 14, 2024
1 parent a0b8c41 commit cb17111
Show file tree
Hide file tree
Showing 22 changed files with 101 additions and 99 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ tofhir {
# Number of partitions to repartition the source data before executing the mappings for the mapping jobs
# numOfPartitions = 10
# Maximum number of records for batch mapping execution, if source data exceeds this it is divided into batches
# maxBatchSize = 10000
# Maximum number of records for batch mapping execution, if source data exceeds this it is divided into chunks
# maxChunkSize = 10000
}
terminology-systems = {
Expand All @@ -145,7 +145,7 @@ tofhir {
# Settings for FHIR repository writer
fhir-server-writer {
# The # of FHIR resources in the group while executing (create/update) a batch operation.
# The # of FHIR resources in the group while executing (create/update) a FHIR batch operation.
batch-group-size = 50
}
Expand Down Expand Up @@ -648,8 +648,8 @@ Example of a Mapping Job definition file with csv source type in streaming mode:
The json snippet above illustrates the structure of an example mapping job in streaming mode.
Similar to the batch mode, most of the fields are the same. The only differences are:
- `asStream` field in the source settings
- `path` in the source context of the mapping. `path` should be the name of the **folder** this time, and it is where toFHIR will monitor the changes.
- `fileFormat` in the source context of the mapping. `fileFormat` field is mandatory for streams and filters to process only files with the given format.
- `path` in the source binding of the mapping. `path` should be the name of the **folder** this time, and it is where toFHIR will monitor the changes.
- `fileFormat` in the source binding of the mapping. `fileFormat` field is mandatory for streams and filters to process only files with the given format.

##### SQL

Expand Down Expand Up @@ -748,7 +748,7 @@ Utilize the same configuration approach as described for Kafka, with a few key c
}
}
```
- **Topic Name**: While defining the topic name for a mapping source context within a mapping job, use the name generated by
- **Topic Name**: While defining the topic name for a mapping source binding within a mapping job, use the name generated by
the [tofhir-redcap integration module](https://github.com/srdc/tofhir-redcap) for the corresponding RedCAP project.
For detailed instructions, refer to the [README](https://github.com/srdc/tofhir-redcap/blob/main/README.md) file of the integration module.

Expand Down Expand Up @@ -799,7 +799,7 @@ Available options for different source types can be found in the following links
- SQL: https://spark.apache.org/docs/3.4.1/sql-data-sources-jdbc.html#data-source-option
- Apache Kafka: https://spark.apache.org/docs/3.4.1/structured-streaming-kafka-integration.html

To give any spark option, you can use the `options` field in the source context of the mapping in a mapping job.
To give any spark option, you can use the `options` field in the source binding of the mapping in a mapping job.

```json
{
Expand Down Expand Up @@ -879,8 +879,8 @@ data read from different sources.
}
```

##### 2. Specify Source Contexts
Next, specify the source contexts for your mappings in the job. Here's an example:
##### 2. Specify Source Bindings
Next, specify the source bindings for your mappings in the job. Here's an example:

```json
{
Expand All @@ -904,7 +904,7 @@ Next, specify the source contexts for your mappings in the job. Here's an exampl
}
```
In this example, `patient-simple.csv` is used for the `patient` mapping source, while an SQL query result is used for the `patientGender` mapping source.
Since the mapping job has more than one data source, we should specify the source reference in the mapping source context using `sourceRef` field.
Since the mapping job has more than one data source, we should specify the source reference in the mapping source binding using `sourceRef` field.
Here, `patient` source reads the csv file from `patientSource` whereas `patientGender` source reads the result of an SQL query from
`genderSource`.

Expand Down
6 changes: 3 additions & 3 deletions docker/kibana/exports/dashboards.ndjson

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/kibana/exports/required_fields_index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"isScheduledJob": {
"type": "boolean"
},
"batchResult": {
"chunkResult": {
"type": "boolean"
},
"errorExpr": {
Expand Down
6 changes: 3 additions & 3 deletions tofhir-engine/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ tofhir {
# Number of partitions to repartition the source data before executing the mappings for the mapping jobs
# numOfPartitions = 10

# Maximum number of records for batch mapping execution, if source data exceeds this it is divided into batches
# maxBatchSize = 10000
# Maximum number of records for batch mapping execution, if source data exceeds this it is divided into chunks
# maxChunkSize = 10000
}

terminology-systems = {
Expand All @@ -65,7 +65,7 @@ tofhir {

# Settings for FHIR repository writer
fhir-server-writer {
# The # of FHIR resources in the group while executing (create/update) a batch operation.
# The # of FHIR resources in the group while executing (create/update) a FHIR batch operation.
batch-group-size = 50
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ class ToFhirEngineConfig(toFhirEngineConfig: Config) {
lazy val partitionsForMappingJobs: Option[Int] = Try(toFhirEngineConfig.getInt("mapping-jobs.numOfPartitions")).toOption

/**
* Max batch size to execute for batch executions, if number of records exceed this the source data will be divided into batches
* Max chunk size to execute for batch executions, if number of records exceed this, the source data will be divided into chunks
*/
lazy val maxBatchSizeForMappingJobs: Option[Long] = Try(toFhirEngineConfig.getLong("mapping-jobs.maxBatchSize")).toOption
lazy val maxChunkSizeForMappingJobs: Option[Long] = Try(toFhirEngineConfig.getLong("mapping-jobs.maxChunkSize")).toOption

/** The # of FHIR resources in the group while executing (create/update) a batch operation. */
/** The # of FHIR resources in the group while executing (create/update) a FHIR batch operation. */
lazy val fhirWriterBatchGroupSize: Int = Try(toFhirEngineConfig.getInt("fhir-server-writer.batch-group-size")).getOrElse(10)

/** Path to the folder which acts as the folder database of toFHIR*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ object DataSourceReaderFactory {
* Return appropriate data source reader
*
* @param spark Spark session
* @param mappingSourceContext Mapping source context
* @param mappingSourceBinding Mapping source binding
* @return
*/
def apply[T <: MappingSourceBinding, S<:MappingJobSourceSettings](spark: SparkSession, mappingSourceContext: T, sourceSettings:S): BaseDataSourceReader[T,S] = {
(mappingSourceContext -> sourceSettings) match {
def apply[T <: MappingSourceBinding, S<:MappingJobSourceSettings](spark: SparkSession, mappingSourceBinding: T, sourceSettings:S): BaseDataSourceReader[T,S] = {
(mappingSourceBinding -> sourceSettings) match {
case (_: FileSystemSource, _:FileSystemSourceSettings) => new FileDataSourceReader(spark).asInstanceOf[BaseDataSourceReader[T,S]]
case (_: SqlSource, _:SqlSourceSettings) => new SqlSourceReader(spark).asInstanceOf[BaseDataSourceReader[T,S]]
case (_: KafkaSource, _:KafkaSourceSettings) => new KafkaSourceReader(spark).asInstanceOf[BaseDataSourceReader[T,S]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ class FhirRepositoryWriter(sinkSettings: FhirRepositorySinkSettings) extends Bas
}

/**
* Prepare the batch request from mapping results
* Prepare the FHIR batch request from mapping results
*
* @param mappingResults Mapping results for this batch
* @param mappingResults Mapping results for this FHIR batch
* @param onFhirClient OnFhirClient
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ object SinkHandler {
val logger: Logger = Logger(this.getClass)

/**
* Writes the FHIR mapping results to the specified resource writer.
*
* @param spark
* @param mappingJobExecution
* @param mappingUrl
* @param df
* @param resourceWriter
* @param spark The SparkSession instance.
* @param mappingJobExecution The execution context of the FHIR mapping job.
* @param mappingUrl An optional URL for the mapping.
* @param df The DataFrame containing FHIR mapping results.
* @param resourceWriter The writer instance to write the FHIR resources.
*/
def writeBatch(spark: SparkSession, mappingJobExecution: FhirMappingJobExecution, mappingUrl: Option[String], df: Dataset[FhirMappingResult], resourceWriter: BaseFhirWriter): Unit = {
def writeMappingResult(spark: SparkSession, mappingJobExecution: FhirMappingJobExecution, mappingUrl: Option[String], df: Dataset[FhirMappingResult], resourceWriter: BaseFhirWriter): Unit = {
//Cache the dataframe
df.cache()
//Filter out the errors
Expand All @@ -41,20 +42,21 @@ object SinkHandler {
}

/**
* Writes streaming FHIR mapping results to the specified resource writer.
*
* @param spark
* @param mappingJobExecution
* @param df
* @param resourceWriter
* @param mappingUrl
* @return
* @param spark The SparkSession instance.
* @param mappingJobExecution The execution context of the FHIR mapping job.
* @param df The DataFrame containing FHIR mapping results.
* @param resourceWriter The writer instance to write the FHIR resources.
* @param mappingUrl The URL for the mapping.
* @return The StreamingQuery instance representing the streaming query.
*/
def writeStream(spark: SparkSession, mappingJobExecution: FhirMappingJobExecution, df: Dataset[FhirMappingResult], resourceWriter: BaseFhirWriter, mappingUrl: String): StreamingQuery = {
val datasetWrite = (dataset: Dataset[FhirMappingResult], batchN: Long) => try {
writeBatch(spark, mappingJobExecution, Some(mappingUrl), dataset, resourceWriter)
val datasetWrite = (dataset: Dataset[FhirMappingResult], _: Long) => try {
writeMappingResult(spark, mappingJobExecution, Some(mappingUrl), dataset, resourceWriter)
} catch {
case e: Throwable =>
logger.error(s"Streaming batch resulted in error for project: ${mappingJobExecution.projectId}, job: ${mappingJobExecution.jobId}, execution: ${mappingJobExecution.id}, mapping: $mappingUrl", e.getMessage)
logger.error(s"Streaming chunk resulted in error for project: ${mappingJobExecution.projectId}, job: ${mappingJobExecution.jobId}, execution: ${mappingJobExecution.id}, mapping: $mappingUrl", e.getMessage)
}

df
Expand Down Expand Up @@ -95,7 +97,7 @@ object SinkHandler {
ExecutionLogger.logExecutionResultForStreamingMappingTask(mappingJobExecution, mappingUrl, numOfInvalids, numOfNotMapped, numOfWritten, numOfNotWritten)
} else {
// Log the result for batch execution
ExecutionLogger.logExecutionResultForBatch(mappingJobExecution, mappingUrl, numOfInvalids, numOfNotMapped, numOfWritten, numOfNotWritten)
ExecutionLogger.logExecutionResultForChunk(mappingJobExecution, mappingUrl, numOfInvalids, numOfNotMapped, numOfWritten, numOfNotWritten)
}

// Log the mapping and invalid input errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class RunningJobRegistry(spark: SparkSession) {
// However, due to Spark's task distribution across multiple threads, using cancelJobGroup may not cancel all tasks,
// especially in the case of scheduled mapping jobs because setJobGroup only works for the Spark tasks started in the same thread.
// To resolve this issue, we need to assign a unique Spark job group id at the start of mapping job execution, before registering it with the RunningJobRegistry.
// We should call setJobGroup function before Spark tasks begin execution. The ideal location for this call seems to be the readSourceExecuteAndWriteInBatches function,
// We should call setJobGroup function before Spark tasks begin execution. The ideal location for this call seems to be the readSourceExecuteAndWriteInChunks function,
// although thorough testing is required to ensure its effectiveness.
// UPDATE: Although I set the job group id of each thread to the same value, cancelJobGroup does not work as expected
// because it only cancels the active jobs in Spark 3.5 not the future submitted jobs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import io.tofhir.engine.model.{FhirMappingJobExecution, FhirMappingJobResult}
*
* This object provides methods to:
* - Log the explicit status of a mapping job execution (e.g., started, skipped, stopped, failed)
* - Log the result of individual batch executions for batch mapping jobs
* - Log the result of individual chunk executions for batch mapping jobs
* - Log the result of mapping task executions for streaming jobs
* - Log the overall result of a mapping task execution for batch mapping jobs after all batches are completed
* - Log the overall result of a mapping task execution for batch mapping jobs after all chunks are completed
*/
object ExecutionLogger {
private val logger: Logger = Logger(this.getClass)

// Stores the overall result of the mapping task execution for batch mapping jobs
// A mapping task execution is divided into multiple batches based on the max batch size configuration
// A mapping task execution is divided into multiple chunks based on the max chunk size configuration
// Keeps active executions in the form of: executionId -> FhirMappingJobResult
private val batchJobMappingTaskExecutionResults: collection.mutable.Map[String, FhirMappingJobResult] =
collection.mutable.Map[String, FhirMappingJobResult]()
Expand Down Expand Up @@ -54,8 +54,8 @@ object ExecutionLogger {
}

/**
* Logs the result of an individual batch execution for a batch mapping job.
* A batch mapping job is divided into several batches based on the given max batch size configuration.
* Logs the result of an individual chunk execution for a batch mapping job.
* A batch mapping job is divided into several chunks based on the given max chunk size configuration.
*
* @param mappingJobExecution The mapping job execution instance
* @param mappingUrl The optional URL of the mapping
Expand All @@ -64,7 +64,7 @@ object ExecutionLogger {
* @param numOfFhirResources The number of FHIR resources created
* @param numOfFailedWrites The number of failed writes
*/
def logExecutionResultForBatch(mappingJobExecution: FhirMappingJobExecution, mappingUrl: Option[String],
def logExecutionResultForChunk(mappingJobExecution: FhirMappingJobExecution, mappingUrl: Option[String],
numOfInvalids: Long = 0,
numOfNotMapped: Long = 0,
numOfFhirResources: Long = 0,
Expand Down Expand Up @@ -101,18 +101,18 @@ object ExecutionLogger {
numOfFhirResources: Long = 0,
numOfFailedWrites: Long = 0): Unit = {
//Log the job result
val jobResult = FhirMappingJobResult(mappingJobExecution, mappingUrl, numOfInvalids, numOfNotMapped, numOfFhirResources, numOfFailedWrites, batchResult = false)
val jobResult = FhirMappingJobResult(mappingJobExecution, mappingUrl, numOfInvalids, numOfNotMapped, numOfFhirResources, numOfFailedWrites, chunkResult = false)
logger.info(jobResult.toMapMarker, jobResult.toString)
}

/**
* Logs the overall result of the execution of a mapping task for a batch mapping job once all batches are completed.
* Logs the overall result of the execution of a mapping task for a batch mapping job once all chunks are completed.
*
* @param executionId The ID of the mapping job execution
*/
def logExecutionResultForBatchMappingTask(executionId: String): Unit = {
//Log the job result
val jobResult = batchJobMappingTaskExecutionResults(executionId).copy(batchResult = false)
val jobResult = batchJobMappingTaskExecutionResults(executionId).copy(chunkResult = false)
logger.info(jobResult.toMapMarker, jobResult.toString)
// remove execution from the map
batchJobMappingTaskExecutionResults.remove(executionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object FileStreamInputArchiver {

def applyArchivingOnBatchJob(execution: FhirMappingJobExecution): Unit = {
var paths: Seq[String] = Seq.empty
// Putting the archiving logic for the batch file inside within a try block so that it would not affect the caller in case of any exception.
// Putting the archiving logic for the batch job inside within a try block so that it would not affect the caller in case of any exception.
// That means archiving works in best-effort mode.
try {
val archiveMode: ArchiveModes = execution.archiveMode
Expand Down
Loading

0 comments on commit cb17111

Please sign in to comment.