Skip to content

Commit

Permalink
♻️ Refactor(FhirMappingJobExecution): Update javadoc and comments, re…
Browse files Browse the repository at this point in the history
…factor related code
  • Loading branch information
camemre49 authored and dogukan10 committed Jul 3, 2024
1 parent e24b186 commit 97f0005
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object FileStreamInputArchiver {
val archiveMode: ArchiveModes = execution.archiveMode
if (archiveMode != ArchiveModes.OFF) {
// get data folder path from data source settings
val dataFolderPath = FileUtils.getPath(execution.fileSystemSourceDataFolderPath).toString
val dataFolderPath = FileUtils.getPath(execution.fileSystemSourceDataFolderPath.get).toString

// Get paths of the input files referred by the mapping tasks
paths = execution.mappingTasks.flatMap(mapping => {
Expand Down Expand Up @@ -180,16 +180,11 @@ class StreamingArchiverTask(archiver: FileStreamInputArchiver, runningJobRegistr
override def run(): Unit = {
// Get executions with streaming queries and file system sources and apply
val executions = runningJobRegistry.getRunningExecutionsWithCompleteMetadata()
.filter(execution => execution.isStreamingJob)
.filter(execution => execution.hasFileSystemSource)
.filter(execution => execution.isStreamingJob && execution.fileSystemSourceDataFolderPath.nonEmpty)
executions.foreach(execution => {
if (execution.isStreamingJob) {
execution.getStreamingQueryMap().keys.foreach(mappingUrl => {
archiver.applyArchivingOnStreamingJob(execution, mappingUrl)
})
}
execution.getStreamingQueryMap().keys.foreach(mappingUrl => {
archiver.applyArchivingOnStreamingJob(execution, mappingUrl)
})
})
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.util.regex.Pattern
* @param mappingTasks List of mapping tasks to be executed (as a subset of the mapping tasks defined in the job)
* @param jobGroupIdOrStreamingQuery Keeps Spark job group id for batch jobs and StreamingQuery for streaming jobs
* @param isStreamingJob Whether the execution is streaming or not
* @param hasFileSystemSource Whether the execution source has a file system source or not
* @param fileSystemSourceDataFolderPath If execution has a file system source, this is data folder path of it
* @param archiveMode Archive mode of execution
* @param saveErroneousRecords Whether to save erroneous records or not
Expand All @@ -28,8 +27,7 @@ case class FhirMappingJobExecution(id: String,
mappingTasks: Seq[FhirMappingTask],
jobGroupIdOrStreamingQuery: Option[Either[String, collection.mutable.Map[String, StreamingQuery]]],
isStreamingJob: Boolean,
hasFileSystemSource: Boolean,
fileSystemSourceDataFolderPath: String,
fileSystemSourceDataFolderPath: Option[String],
archiveMode: ArchiveModes,
saveErroneousRecords: Boolean
) {
Expand Down Expand Up @@ -142,38 +140,44 @@ case class FhirMappingJobExecution(id: String,
* An object to create FhirMappingJobExecution instances
*/
object FhirMappingJobExecution {

/**
*
* @param id Unique identifier for the execution
* @param projectId Unique identifier of project to which mapping job belongs
* @param job FHIR mapping job that includes this execution.
* @param mappingTasks List of mapping tasks to be executed (as a subset of the mapping tasks defined in the job)
* @param jobGroupIdOrStreamingQuery Keeps Spark job group id for batch jobs and StreamingQuery for streaming jobs
* @return
*/
def apply(id: String = UUID.randomUUID().toString,
projectId: String = "",
job: FhirMappingJob,
mappingTasks: Seq[FhirMappingTask] = Seq.empty,
jobGroupIdOrStreamingQuery: Option[Either[String, collection.mutable.Map[String, StreamingQuery]]] = None
): FhirMappingJobExecution = {
val jobId = if (job != null && job.id != null) job.id else ""

val sourceSettings = Option(job.sourceSettings)
// Configure properties related to the source settings of the job
var isStreamingJob = false
var hasFileSystemSource = false
var dataFolderPath = ""
if (sourceSettings.nonEmpty) {
isStreamingJob = sourceSettings.get.exists(source => source._2.asStream)
if (sourceSettings.get.nonEmpty) {
hasFileSystemSource = sourceSettings.get.head._2.isInstanceOf[FileSystemSourceSettings]
dataFolderPath = sourceSettings.get.head._2 match {
case settings: FileSystemSourceSettings => settings.dataFolderPath
case _ => ""
}
var fileSystemSourceDataFolderPath: Option[String] = None
if (job.sourceSettings.nonEmpty) {
isStreamingJob = job.sourceSettings.exists(source => source._2.asStream)
fileSystemSourceDataFolderPath = job.sourceSettings.head._2 match {
case settings: FileSystemSourceSettings => Some(settings.dataFolderPath)
case _ => None
}
}

val dataProcessingSettings = job.dataProcessingSettings
// Configure properties related to the data processing settings of the job
var archiveMode = ArchiveModes.OFF
var saveErroneousRecords = false
if(dataProcessingSettings != null) {
archiveMode = dataProcessingSettings.archiveMode
saveErroneousRecords = dataProcessingSettings.saveErroneousRecords
if(job.dataProcessingSettings != null) {
archiveMode = job.dataProcessingSettings.archiveMode
saveErroneousRecords = job.dataProcessingSettings.saveErroneousRecords
}

FhirMappingJobExecution(id, projectId, jobId, mappingTasks, jobGroupIdOrStreamingQuery, isStreamingJob, hasFileSystemSource, dataFolderPath, archiveMode, saveErroneousRecords)
// Create a FhirMappingJobExecution with only necessary properties
FhirMappingJobExecution(id, projectId, job.id, mappingTasks, jobGroupIdOrStreamingQuery, isStreamingJob, fileSystemSourceDataFolderPath, archiveMode, saveErroneousRecords)
}
}

0 comments on commit 97f0005

Please sign in to comment.