From 48e5255160ab26f70cf5c502185b8bcbf1d7ba95 Mon Sep 17 00:00:00 2001 From: okanmercan Date: Tue, 7 May 2024 13:03:14 +0300 Subject: [PATCH 1/5] :construction_worker: ci(tofhir-server): Deploy an onFhir test container during tests. --- pom.xml | 11 ++++---- tofhir-server/pom.xml | 11 ++++++++ .../io/tofhir/server/BaseEndpointTest.scala | 28 +++++++++++++------ .../project/FhirDefinitionsEndpointTest.scala | 7 ++--- 4 files changed, 40 insertions(+), 17 deletions(-) diff --git a/pom.xml b/pom.xml index bac1d126..2c026e86 100644 --- a/pom.xml +++ b/pom.xml @@ -113,7 +113,7 @@ 42.7.0 2.2.224 2.2.5 - 1.19.3 + 1.19.3 2.8.5 10.5.3 0.10.2 @@ -488,12 +488,13 @@ test - + org.testcontainers - kafka - ${testcontainers.kafka} - test + testcontainers-bom + ${testcontainers} + pom + import diff --git a/tofhir-server/pom.xml b/tofhir-server/pom.xml index d194b2d6..ea87dcf6 100644 --- a/tofhir-server/pom.xml +++ b/tofhir-server/pom.xml @@ -215,6 +215,17 @@ com.opencsv opencsv + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + diff --git a/tofhir-server/src/test/scala/io/tofhir/server/BaseEndpointTest.scala b/tofhir-server/src/test/scala/io/tofhir/server/BaseEndpointTest.scala index 6926cc12..37124aa7 100644 --- a/tofhir-server/src/test/scala/io/tofhir/server/BaseEndpointTest.scala +++ b/tofhir-server/src/test/scala/io/tofhir/server/BaseEndpointTest.scala @@ -19,6 +19,10 @@ import org.json4s.jackson.Serialization.writePretty import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.utility.DockerImageName import java.io.File import java.util.UUID @@ -36,15 +40,23 @@ trait BaseEndpointTest extends AnyWordSpec with Matchers with ScalatestRouteTest // route endpoint var route: Route = _ - // URL for the FHIR repository, defaulting to a local onFHIR if the environment variable is not set - val fhirRepoUrl: String = sys.env.getOrElse(EnvironmentVariable.FHIR_REPO_URL.toString, "http://localhost:8081/fhir") - // Instance of OnFhirNetworkClient initialized with the FHIR repository URL - private val onFhirClient: OnFhirNetworkClient = OnFhirNetworkClient.apply(fhirRepoUrl) - // Boolean indicating whether the FHIR server is available - val fhirServerIsAvailable: Boolean = - Try(Await.result(onFhirClient.search("Patient").execute(), FiniteDuration(5, TimeUnit.SECONDS)).httpStatus == StatusCodes.OK) - .getOrElse(false) + // Instance of OnFhirNetworkClient initialized with onFhir container + var onFhirClient: OnFhirNetworkClient = initializeOnFhirClient(); + /** + * Deploy an onFhir container for testing purpose + * */ + def initializeOnFhirClient(): OnFhirNetworkClient = { + @Container + val container: GenericContainer[Nothing] = new GenericContainer(DockerImageName.parse("srdc/onfhir:r4")).withExposedPorts(8081); + container.addEnv("DB_EMBEDDED", "true"); + container.addEnv("SERVER_PORT", "8081"); + container.addEnv("SERVER_BASE_URI", "fhir"); + container.addEnv("FHIR_ROOT_URL", s"http://${container.getHost}:8081/fhir"); + container.waitingFor(Wait.forHttp("/fhir").forStatusCode(200)); + container.start(); + OnFhirNetworkClient.apply(s"http://${container.getHost}:${container.getFirstMappedPort}/fhir"); + } /** * Identifier of test project which can be used in endpoint tests. * Endpoint tests, which require a test project, should call {@link createProject} method to create it diff --git a/tofhir-server/src/test/scala/io/tofhir/server/project/FhirDefinitionsEndpointTest.scala b/tofhir-server/src/test/scala/io/tofhir/server/project/FhirDefinitionsEndpointTest.scala index 667bf04a..81a5c641 100644 --- a/tofhir-server/src/test/scala/io/tofhir/server/project/FhirDefinitionsEndpointTest.scala +++ b/tofhir-server/src/test/scala/io/tofhir/server/project/FhirDefinitionsEndpointTest.scala @@ -26,7 +26,6 @@ class FhirDefinitionsEndpointTest extends BaseEndpointTest { * Test case for validating FHIR resources. */ "validate FHIR resources" in { - assume(fhirServerIsAvailable) // Validate the resource without providing a FHIR validation URL Post(s"/${webServerConfig.baseUri}/validate", HttpEntity(ContentTypes.`application/json`, conditionResourceJson)) ~> route ~> check { @@ -39,12 +38,12 @@ class FhirDefinitionsEndpointTest extends BaseEndpointTest { } // Validate the resource with a valid FHIR validation URL - Post(s"/${webServerConfig.baseUri}/validate?fhirValidationUrl=$fhirRepoUrl/Condition/$$validate", HttpEntity(ContentTypes.`application/json`, conditionResourceJson)) ~> route ~> check { + Post(s"/${webServerConfig.baseUri}/validate?fhirValidationUrl=${this.onFhirClient.getBaseUrl()}/Condition/$$validate", HttpEntity(ContentTypes.`application/json`, conditionResourceJson)) ~> route ~> check { status shouldEqual StatusCodes.OK } // Validate the resource with a valid FHIR validation URL - Post(s"/${webServerConfig.baseUri}/validate?fhirValidationUrl=$fhirRepoUrl/Condition/$$validate", HttpEntity(ContentTypes.`application/json`, invalidConditionResourceJson)) ~> route ~> check { + Post(s"/${webServerConfig.baseUri}/validate?fhirValidationUrl=${this.onFhirClient.getBaseUrl();}/Condition/$$validate", HttpEntity(ContentTypes.`application/json`, invalidConditionResourceJson)) ~> route ~> check { status shouldEqual StatusCodes.OK // Convert the JSON response to a JValue @@ -59,7 +58,7 @@ class FhirDefinitionsEndpointTest extends BaseEndpointTest { } // Validate a bundle of resources - Post(s"/${webServerConfig.baseUri}/validate?fhirValidationUrl=$fhirRepoUrl", HttpEntity(ContentTypes.`application/json`, patientBundleJson)) ~> route ~> check { + Post(s"/${webServerConfig.baseUri}/validate?fhirValidationUrl=${this.onFhirClient.getBaseUrl()}", HttpEntity(ContentTypes.`application/json`, patientBundleJson)) ~> route ~> check { status shouldEqual StatusCodes.OK // Convert the JSON response to a JValue From d97cbaf5936fea4eb69b8a9d7e12be8bc000fc80 Mon Sep 17 00:00:00 2001 From: okanmercan Date: Tue, 7 May 2024 16:26:44 +0300 Subject: [PATCH 2/5] :construction_worker: ci(tofhir-engine): Deploy an onFhir test container during tests. --- tofhir-engine/pom.xml | 11 +++++ .../test/scala/io/tofhir/ToFhirTestSpec.scala | 23 ++++++++++ .../KafkaSourceIntegrationTest.scala | 19 ++------ .../test/FhirMappingJobManagerTest.scala | 25 ++--------- .../io/tofhir/test/FhirServerSourceTest.scala | 45 ++++++------------- .../scala/io/tofhir/test/SchedulingTest.scala | 26 +++-------- .../scala/io/tofhir/test/SqlSourceTest.scala | 23 ++-------- .../io/tofhir/server/BaseEndpointTest.scala | 5 --- 8 files changed, 64 insertions(+), 113 deletions(-) diff --git a/tofhir-engine/pom.xml b/tofhir-engine/pom.xml index 4d29132d..20527703 100644 --- a/tofhir-engine/pom.xml +++ b/tofhir-engine/pom.xml @@ -263,6 +263,17 @@ kafka test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + org.reflections diff --git a/tofhir-engine/src/test/scala/io/tofhir/ToFhirTestSpec.scala b/tofhir-engine/src/test/scala/io/tofhir/ToFhirTestSpec.scala index 6989c78e..d656e99c 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/ToFhirTestSpec.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/ToFhirTestSpec.scala @@ -1,6 +1,7 @@ package io.tofhir import akka.actor.ActorSystem +import io.onfhir.client.OnFhirNetworkClient import io.tofhir.engine.config.ToFhirConfig import io.tofhir.engine.execution.RunningJobRegistry import io.tofhir.engine.mapping._ @@ -8,6 +9,10 @@ import io.tofhir.engine.util.FileUtils import org.apache.spark.sql.SparkSession import org.scalatest.matchers.should.Matchers import org.scalatest.{Inside, Inspectors, OptionValues} +import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.utility.DockerImageName import java.io.FileWriter import java.net.URI @@ -29,6 +34,24 @@ trait ToFhirTestSpec extends Matchers with OptionValues with Inside with Inspect implicit val actorSystem: ActorSystem = ActorSystem("toFhirEngineTest") + // Instance of OnFhirNetworkClient initialized with onFhir container + var onFhirClient: OnFhirNetworkClient = initializeOnFhirClient(); + + /** + * Deploy an onFhir container for testing purpose + * */ + def initializeOnFhirClient(): OnFhirNetworkClient = { + @Container + val container: GenericContainer[Nothing] = new GenericContainer(DockerImageName.parse("srdc/onfhir:r4")).withExposedPorts(8081); + container.addEnv("DB_EMBEDDED", "true"); + container.addEnv("SERVER_PORT", "8081"); + container.addEnv("SERVER_BASE_URI", "fhir"); + container.addEnv("FHIR_ROOT_URL", s"http://${container.getHost}:8081/fhir"); + container.waitingFor(Wait.forHttp("/fhir").forStatusCode(200)); + container.start(); + OnFhirNetworkClient.apply(s"http://${container.getHost}:${container.getFirstMappedPort}/fhir"); + } + /** * Copies the content of a resource file to given location in the context path. * @param path The path to the resource file diff --git a/tofhir-engine/src/test/scala/io/tofhir/integrationtest/KafkaSourceIntegrationTest.scala b/tofhir-engine/src/test/scala/io/tofhir/integrationtest/KafkaSourceIntegrationTest.scala index e1bef46d..c5e8e565 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/integrationtest/KafkaSourceIntegrationTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/integrationtest/KafkaSourceIntegrationTest.scala @@ -3,14 +3,12 @@ package io.tofhir.integrationtest import akka.http.scaladsl.model.StatusCodes import io.onfhir.api.client.FhirBatchTransactionRequestBuilder import io.onfhir.api.util.FHIRUtil -import io.onfhir.client.OnFhirNetworkClient import io.onfhir.path.FhirPathUtilFunctionsFactory -import io.tofhir.common.model.Json4sSupport.formats import io.tofhir.ToFhirTestSpec +import io.tofhir.common.model.Json4sSupport.formats import io.tofhir.engine.Execution.actorSystem.dispatcher import io.tofhir.engine.mapping.FhirMappingJobManager import io.tofhir.engine.model._ -import io.tofhir.engine.util.FhirMappingJobFormatter.EnvironmentVariable import io.tofhir.engine.util.FhirMappingUtility import org.apache.commons.io import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic} @@ -30,14 +28,11 @@ import java.util.concurrent.TimeUnit import java.util.{Collections, Properties, UUID} import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, Future} -import scala.util.Try class KafkaSourceIntegrationTest extends AnyFlatSpec with ToFhirTestSpec with BeforeAndAfterAll { override protected def afterAll(): Unit = { - if(fhirServerIsAvailable) { - deleteResources() - } + deleteResources() if (adminClient != null) adminClient.close() if (producer != null) producer.close() if (consumer != null) consumer.close() @@ -84,7 +79,7 @@ class KafkaSourceIntegrationTest extends AnyFlatSpec with ToFhirTestSpec with Be val streamingSourceSettings: Map[String, KafkaSourceSettings] = Map("source" -> KafkaSourceSettings("kafka-source", "https://aiccelerate.eu/data-integration-suite/kafka-data", s"PLAINTEXT://localhost:$kafkaPort")) - val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = sys.env.getOrElse(EnvironmentVariable.FHIR_REPO_URL.toString, "http://localhost:8081/fhir")) + val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = onFhirClient.getBaseUrl()) val patientMappingTask: FhirMappingTask = FhirMappingTask( mappingRef = "https://aiccelerate.eu/fhir/mappings/patient-mapping", @@ -107,11 +102,6 @@ class KafkaSourceIntegrationTest extends AnyFlatSpec with ToFhirTestSpec with Be dataProcessingSettings = DataProcessingSettings() ) - val onFhirClient: OnFhirNetworkClient = OnFhirNetworkClient.apply(fhirSinkSettings.fhirRepoUrl) - val fhirServerIsAvailable: Boolean = - Try(Await.result(onFhirClient.search("Patient").execute(), FiniteDuration(5, TimeUnit.SECONDS)).httpStatus == StatusCodes.OK) - .getOrElse(false) - val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map(FhirPathUtilFunctionsFactory.defaultPrefix -> FhirPathUtilFunctionsFactory), sparkSession) it should "check the test container working" in { @@ -196,7 +186,6 @@ class KafkaSourceIntegrationTest extends AnyFlatSpec with ToFhirTestSpec with Be } it should "consume patients, observations and family member history data and map and write to the fhir repository" in { - assume(fhirServerIsAvailable) val execution: FhirMappingJobExecution = FhirMappingJobExecution(job = fhirMappingJob, mappingTasks = Seq(patientMappingTask, otherObservationMappingTask, familyMemberHistoryMappingTask)) val streamingQueryFutures: Map[String, Future[StreamingQuery]] = fhirMappingJobManager.startMappingJobStream(mappingJobExecution = execution, @@ -248,8 +237,6 @@ class KafkaSourceIntegrationTest extends AnyFlatSpec with ToFhirTestSpec with Be foo() }) consumer.unsubscribe() - - assume(fhirServerIsAvailable) // modify familyMemberHistoryMappingTask to listen to familyMembersCorrupted topic val mappingTask = familyMemberHistoryMappingTask.copy(sourceContext = Map("source" -> KafkaSource(topicName = "familyMembersCorrupted", groupId = "tofhir", startingOffsets = "earliest"))) val execution: FhirMappingJobExecution = FhirMappingJobExecution(job = fhirMappingJob, mappingTasks = Seq(mappingTask)) diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/FhirMappingJobManagerTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/FhirMappingJobManagerTest.scala index 2d29a0e1..929b48b6 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/FhirMappingJobManagerTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/FhirMappingJobManagerTest.scala @@ -4,13 +4,11 @@ import akka.http.scaladsl.model.StatusCodes import io.onfhir.api.Resource import io.onfhir.api.client.FhirBatchTransactionRequestBuilder import io.onfhir.api.util.FHIRUtil -import io.onfhir.client.OnFhirNetworkClient import io.onfhir.path.{FhirPathIdentityServiceFunctionsFactory, FhirPathUtilFunctionsFactory} import io.onfhir.util.JsonFormatter._ import io.tofhir.ToFhirTestSpec import io.tofhir.engine.mapping.{FhirMappingJobManager, MappingContextLoader} import io.tofhir.engine.model._ -import io.tofhir.engine.util.FhirMappingJobFormatter.EnvironmentVariable import io.tofhir.engine.util.{FhirMappingJobFormatter, FhirMappingUtility, FileUtils} import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.{Assertion, BeforeAndAfterAll} @@ -20,14 +18,13 @@ import java.nio.file.Paths import java.util.concurrent.TimeUnit import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent.{Await, ExecutionContext} -import scala.util.Try class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTestSpec { val dataSourceSettings: Map[String, DataSourceSettings] = Map("source" -> FileSystemSourceSettings("test-source", "https://aiccelerate.eu/data-integration-suite/test-data", Paths.get(getClass.getResource("/test-data").toURI).normalize().toAbsolutePath.toString)) - val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = sys.env.getOrElse(EnvironmentVariable.FHIR_REPO_URL.toString, "http://localhost:8081/fhir")) + val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = onFhirClient.getBaseUrl()) val patientMappingTask: FhirMappingTask = FhirMappingTask( mappingRef = "https://aiccelerate.eu/fhir/mappings/patient-mapping", @@ -69,11 +66,6 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit sourceContext = Map("source" -> FileSystemSource(path = "patients.tsv")) ) - val onFhirClient: OnFhirNetworkClient = OnFhirNetworkClient.apply(fhirSinkSettings.fhirRepoUrl) - val fhirServerIsAvailable: Boolean = - Try(Await.result(onFhirClient.search("Patient").execute(), FiniteDuration(5, TimeUnit.SECONDS)).httpStatus == StatusCodes.OK) - .getOrElse(false) - val testMappingJobFilePath: String = getClass.getResource("/test-mappingjob.json").toURI.getPath val testMappingJobWithIdentityServiceFilePath: String = getClass.getResource("/test-mappingjob-using-services.json").toURI.getPath @@ -89,9 +81,7 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit dataProcessingSettings = DataProcessingSettings()) override protected def afterAll(): Unit = { - if (fhirServerIsAvailable) { - deleteResources() - } + deleteResources() // delete context path org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.getPath("").toFile) super.afterAll() @@ -171,11 +161,10 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit } it should "execute a mapping job with two data sources" in { - assume(fhirServerIsAvailable) val mappingJob = FhirMappingJobFormatter.readMappingJobFromFile(getClass.getResource("/patient-mapping-job-with-two-sources.json").toURI.getPath) val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map.empty, sparkSession) - fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = mappingJob.mappings, job = mappingJob), sourceSettings = mappingJob.sourceSettings, sinkSettings = mappingJob.sinkSettings) flatMap { _ => + fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = mappingJob.mappings, job = mappingJob), sourceSettings = mappingJob.sourceSettings, sinkSettings = mappingJob.sinkSettings.asInstanceOf[FhirRepositorySinkSettings].copy(fhirRepoUrl = onFhirClient.getBaseUrl())) flatMap { _ => onFhirClient.read("Patient", "test-patient").executeAndReturnResource() flatMap { p1Resource => (p1Resource \ "id").extract[String] shouldBe "test-patient" (p1Resource \ "gender").extract[String] shouldBe "male" @@ -184,8 +173,6 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit } it should "execute the mappings with FHIR Path patch" in { - assume(fhirServerIsAvailable) - val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map.empty, sparkSession) fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(patientMappingTask), job = fhirMappingJob), sourceSettings = dataSourceSettings, sinkSettings = fhirSinkSettings).flatMap(_ => fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(patientExtraMappingWithPatch), job = fhirMappingJob) , sourceSettings = dataSourceSettings, sinkSettings = fhirSinkSettings) flatMap { response => @@ -203,7 +190,6 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit } it should "execute the mappings with conditional FHIR Path patch" in { - assume(fhirServerIsAvailable) val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map.empty, sparkSession) fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(patientMappingTask), job = fhirMappingJob), sourceSettings = dataSourceSettings, sinkSettings = fhirSinkSettings).flatMap(_ => fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(patientExtraMappingWithConditionalPatch), job = fhirMappingJob), sourceSettings = dataSourceSettings, sinkSettings = fhirSinkSettings) flatMap { response => @@ -290,8 +276,6 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit it should "execute the mapping job with multiple mapping tasks and write the results into a FHIR repository" in { - assume(fhirServerIsAvailable) - val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map(FhirPathUtilFunctionsFactory.defaultPrefix -> FhirPathUtilFunctionsFactory), sparkSession) fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(patientMappingTask, otherObservationMappingTask), job = fhirMappingJob), sourceSettings = dataSourceSettings, sinkSettings = fhirSinkSettings) flatMap { response => onFhirClient.read("Patient", FhirMappingUtility.getHashedId("Patient", "p8")).executeAndReturnResource() flatMap { p1Resource => @@ -313,8 +297,6 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit } it should "continue execute the mapping job when encounter without an error" in { - assume(fhirServerIsAvailable) - val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map(FhirPathUtilFunctionsFactory.defaultPrefix -> FhirPathUtilFunctionsFactory), sparkSession) val future = fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution( @@ -380,7 +362,6 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit } it should "execute the FhirMappingJob using an identity service" in { - assume(fhirServerIsAvailable) val lMappingJob = FhirMappingJobFormatter.readMappingJobFromFile(testMappingJobWithIdentityServiceFilePath) val terminologyServiceFolderPath = Paths.get(getClass.getResource("/terminology-service").toURI).normalize().toAbsolutePath.toString diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/FhirServerSourceTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/FhirServerSourceTest.scala index 40d568c9..47d9921e 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/FhirServerSourceTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/FhirServerSourceTest.scala @@ -10,17 +10,12 @@ import io.onfhir.util.JsonFormatter._ import io.tofhir.ToFhirTestSpec import io.tofhir.engine.mapping.FhirMappingJobManager import io.tofhir.engine.model._ -import io.tofhir.engine.util.FhirMappingJobFormatter.EnvironmentVariable import org.json4s.JsonAST.JArray import org.json4s.jackson.JsonMethods import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AsyncFlatSpec -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.Await import scala.io.Source -import scala.util.Try /** * Test suite for verifying the behavior of FhirServerSource. @@ -28,10 +23,10 @@ import scala.util.Try class FhirServerSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTestSpec { // Sink Settings of mapping job - val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = sys.env.getOrElse(EnvironmentVariable.FHIR_REPO_URL.toString, "http://localhost:8081/fhir")) + val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = onFhirClient.getBaseUrl()) // Define OnFhir clients for source and target servers - val targetOnFhirClient: OnFhirNetworkClient = OnFhirNetworkClient.apply(fhirSinkSettings.fhirRepoUrl) - val sourceOnFhirClient: OnFhirNetworkClient = OnFhirNetworkClient.apply("http://localhost:6080/fhir") + val targetOnFhirClient: OnFhirNetworkClient = onFhirClient + val sourceOnFhirClient: OnFhirNetworkClient = initializeOnFhirClient() // Settings of Fhir Server data source val fhirServerSourceSettings: Map[String, FhirServerSourceSettings] = @@ -43,15 +38,6 @@ class FhirServerSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToF // FhirMappingJobManager to execute mapping job val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map(FhirPathUtilFunctionsFactory.defaultPrefix -> FhirPathUtilFunctionsFactory), sparkSession) - // Whether the source onFHIR server is available - val sourceServerIsAvailable: Boolean = - Try(Await.result(sourceOnFhirClient.search("Patient").execute(), FiniteDuration(5, TimeUnit.SECONDS)).httpStatus == StatusCodes.OK) - .getOrElse(false) - // Whether the target onFHIR server is available - val targetServerIsAvailable: Boolean = - Try(Await.result(targetOnFhirClient.search("Patient").execute(), FiniteDuration(5, TimeUnit.SECONDS)).httpStatus == StatusCodes.OK) - .getOrElse(false) - // Observation mapping task val observationMappingTask: FhirMappingTask = FhirMappingTask( mappingRef = "https://aiccelerate.eu/fhir/mappings/test-observation-mapping", @@ -73,12 +59,11 @@ class FhirServerSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToF * */ override protected def beforeAll(): Unit = { super.beforeAll() - if(sourceServerIsAvailable) - sourceOnFhirClient.batch() - .entry(_.update(testObservationResource)) - .returnMinimal().asInstanceOf[FhirBatchTransactionRequestBuilder].execute() map { res => - res.httpStatus shouldBe StatusCodes.OK - } + sourceOnFhirClient.batch() + .entry(_.update(testObservationResource)) + .returnMinimal().asInstanceOf[FhirBatchTransactionRequestBuilder].execute() map { res => + res.httpStatus shouldBe StatusCodes.OK + } } /** @@ -86,12 +71,11 @@ class FhirServerSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToF * */ override protected def afterAll(): Unit = { super.afterAll() - if(sourceServerIsAvailable) - sourceOnFhirClient.batch() - .entry(_.delete("Observation", "example-observation")) - .returnMinimal().asInstanceOf[FhirBatchTransactionRequestBuilder].execute() map { res => - res.httpStatus shouldBe StatusCodes.OK - } + sourceOnFhirClient.batch() + .entry(_.delete("Observation", "example-observation")) + .returnMinimal().asInstanceOf[FhirBatchTransactionRequestBuilder].execute() map { res => + res.httpStatus shouldBe StatusCodes.OK + } } /** @@ -99,7 +83,6 @@ class FhirServerSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToF * It should produce two observation resources. * */ "Observation mapping" should "should read data from Fhir Server source and map it" in { - assume(sourceServerIsAvailable) fhirMappingJobManager.executeMappingTaskAndReturn(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(observationMappingTask), job = fhirMappingJob), sourceSettings = fhirServerSourceSettings) map { mappingResults => val results = mappingResults.map(r => { r.mappedResource should not be None @@ -126,8 +109,6 @@ class FhirServerSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToF * There should be two Observation resources on the target onFHIR after the mapping job is completed. * */ it should "map test data and write it to FHIR repo successfully" in { - assume(sourceServerIsAvailable) - assume(targetServerIsAvailable) fhirMappingJobManager .executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(observationMappingTask), job = fhirMappingJob), sourceSettings = fhirServerSourceSettings, sinkSettings = fhirSinkSettings) .flatMap(_ => { diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/SchedulingTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/SchedulingTest.scala index 57a5c3a6..75f2dcc2 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/SchedulingTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/SchedulingTest.scala @@ -3,9 +3,9 @@ package io.tofhir.test import akka.http.scaladsl.model.StatusCodes import io.onfhir.api.client.FhirBatchTransactionRequestBuilder import io.onfhir.api.util.FHIRUtil -import io.onfhir.client.OnFhirNetworkClient -import io.tofhir.common.model.Json4sSupport.formats +import io.onfhir.path.FhirPathUtilFunctionsFactory import io.tofhir.ToFhirTestSpec +import io.tofhir.common.model.Json4sSupport.formats import io.tofhir.engine.config.ToFhirConfig import io.tofhir.engine.mapping.{FhirMappingJobManager, MappingContextLoader, MappingJobScheduler} import io.tofhir.engine.model.{FhirMappingJob, FhirMappingJobExecution, FhirRepositorySinkSettings} @@ -19,14 +19,10 @@ import java.io.File import java.net.URI import java.nio.file.{Path, Paths} import java.sql.{Connection, DriverManager, Statement} -import java.util.concurrent.TimeUnit -import io.onfhir.path.FhirPathUtilFunctionsFactory -import io.tofhir.engine.util.FhirMappingJobFormatter.EnvironmentVariable - +import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} -import scala.concurrent.duration.{Duration, FiniteDuration} import scala.io.{BufferedSource, Source} -import scala.util.{Failure, Success, Try, Using} +import scala.util.{Failure, Success, Using} class SchedulingTest extends AnyFlatSpec with BeforeAndAfterAll with ToFhirTestSpec { @@ -41,9 +37,7 @@ class SchedulingTest extends AnyFlatSpec with BeforeAndAfterAll with ToFhirTestS } override protected def afterAll(): Unit = { - if(fhirServerIsAvailable) { - deleteResources() - } + deleteResources() val sql = readFileContent("/sql/scheduling-drop.sql") runSQL(sql) super.afterAll() @@ -89,21 +83,15 @@ class SchedulingTest extends AnyFlatSpec with BeforeAndAfterAll with ToFhirTestS val mappingJobScheduler: MappingJobScheduler = MappingJobScheduler(scheduler, toFhirDb.toUri) - val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = sys.env.getOrElse(EnvironmentVariable.FHIR_REPO_URL.toString, "http://localhost:8081/fhir")) - - val onFhirClient: OnFhirNetworkClient = OnFhirNetworkClient.apply(fhirSinkSettings.fhirRepoUrl) - val fhirServerIsAvailable: Boolean = - Try(Await.result(onFhirClient.search("Patient").execute(), FiniteDuration(5, TimeUnit.SECONDS)).httpStatus == StatusCodes.OK) - .getOrElse(false) + val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = onFhirClient.getBaseUrl()) val testScheduleMappingJobFilePath: String = getClass.getResource("/test-schedule-mappingjob.json").toURI.getPath it should "schedule a FhirMappingJob with cron and sink settings restored from a file" in { - assume(fhirServerIsAvailable) val lMappingJob: FhirMappingJob = FhirMappingJobFormatter.readMappingJobFromFile(testScheduleMappingJobFilePath) val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, new MappingContextLoader(mappingRepository), schemaRepository, Map(FhirPathUtilFunctionsFactory.defaultPrefix -> FhirPathUtilFunctionsFactory), sparkSession, Some(mappingJobScheduler)) - fhirMappingJobManager.scheduleMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = lMappingJob.mappings, job = lMappingJob), sourceSettings = lMappingJob.sourceSettings, sinkSettings = lMappingJob.sinkSettings, schedulingSettings = lMappingJob.schedulingSettings.get) + fhirMappingJobManager.scheduleMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = lMappingJob.mappings, job = lMappingJob), sourceSettings = lMappingJob.sourceSettings, sinkSettings = lMappingJob.sinkSettings.asInstanceOf[FhirRepositorySinkSettings].copy(fhirRepoUrl = onFhirClient.getBaseUrl()), schedulingSettings = lMappingJob.schedulingSettings.get) scheduler.start() //job set to run every minute Thread.sleep(61000) //wait for the job to be executed once scheduler.stop() diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/SqlSourceTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/SqlSourceTest.scala index 2aa4aae3..4a6fe331 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/SqlSourceTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/SqlSourceTest.scala @@ -5,24 +5,20 @@ import com.typesafe.scalalogging.Logger import io.onfhir.api.Resource import io.onfhir.api.client.FhirBatchTransactionRequestBuilder import io.onfhir.api.util.FHIRUtil -import io.onfhir.client.OnFhirNetworkClient import io.onfhir.path.FhirPathUtilFunctionsFactory import io.onfhir.util.JsonFormatter._ import io.tofhir.ToFhirTestSpec import io.tofhir.engine.mapping.{FhirMappingJobManager, MappingContextLoader} import io.tofhir.engine.model._ -import io.tofhir.engine.util.FhirMappingJobFormatter.EnvironmentVariable import io.tofhir.engine.util.{FhirMappingJobFormatter, FhirMappingUtility} import org.json4s.JsonAST.JObject import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AsyncFlatSpec import java.sql.{Connection, DriverManager, Statement} -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{Await, Future} +import scala.concurrent.Future import scala.io.{BufferedSource, Source} -import scala.util.{Failure, Success, Try, Using} +import scala.util.{Failure, Success, Using} class SqlSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTestSpec { @@ -68,12 +64,7 @@ class SqlSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTest val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, contextLoader, schemaRepository, Map(FhirPathUtilFunctionsFactory.defaultPrefix -> FhirPathUtilFunctionsFactory), sparkSession) - val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = sys.env.getOrElse(EnvironmentVariable.FHIR_REPO_URL.toString, "http://localhost:8081/fhir")) - val onFhirClient: OnFhirNetworkClient = OnFhirNetworkClient.apply(fhirSinkSettings.fhirRepoUrl) - - val fhirServerIsAvailable: Boolean = - Try(Await.result(onFhirClient.search("Patient").execute(), FiniteDuration(5, TimeUnit.SECONDS)).httpStatus == StatusCodes.OK) - .getOrElse(false) + val fhirSinkSettings: FhirRepositorySinkSettings = FhirRepositorySinkSettings(fhirRepoUrl = onFhirClient.getBaseUrl()) // sql tablename mappings tasks val patientMappingTask: FhirMappingTask = FhirMappingTask( @@ -132,7 +123,6 @@ class SqlSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTest it should "map test data and write it to FHIR repo successfully" in { //Send it to our fhir repo if they are also validated - assume(fhirServerIsAvailable) fhirMappingJobManager .executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(patientMappingTask), job = fhirMappingJob), sourceSettings = sqlSourceSettings, sinkSettings = fhirSinkSettings) .flatMap(_ => { @@ -165,7 +155,6 @@ class SqlSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTest } it should "map test data and write it to FHIR repo successfully" in { - assume(fhirServerIsAvailable) fhirMappingJobManager .executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(otherObsMappingTask), job = fhirMappingJob), sourceSettings = sqlSourceSettings, sinkSettings = fhirSinkSettings) .flatMap(_ => { @@ -208,7 +197,6 @@ class SqlSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTest it should "map test data and write it to FHIR repo successfully" in { //Send it to our fhir repo if they are also validated - assume(fhirServerIsAvailable) fhirMappingJobManager .executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(careSiteMappingTask), job = fhirMappingJob) , sourceSettings = sqlSourceSettings, sinkSettings = fhirSinkSettings) .flatMap(_ => { @@ -241,7 +229,6 @@ class SqlSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTest } it should "map test data and write it to FHIR repo successfully" in { - assume(fhirServerIsAvailable) fhirMappingJobManager .executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(locationMappingTask), job = fhirMappingJob), sourceSettings = sqlSourceSettings, sinkSettings = fhirSinkSettings) .flatMap(_ => { @@ -276,7 +263,6 @@ class SqlSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTest } it should "map test data and write it to FHIR repo successfully" in { - assume(fhirServerIsAvailable) fhirMappingJobManager .executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = Seq(procedureOccurrenceMappingTask), job = fhirMappingJob), sourceSettings = sqlSourceSettings, sinkSettings = fhirSinkSettings) .flatMap(_ => { @@ -292,12 +278,11 @@ class SqlSourceTest extends AsyncFlatSpec with BeforeAndAfterAll with ToFhirTest } it should "execute the FhirMappingJob with SQL source and sink settings restored from a file" in { - assume(fhirServerIsAvailable) val lMappingJob = FhirMappingJobFormatter.readMappingJobFromFile(testSqlMappingJobFilePath) val fhirMappingJobManager = new FhirMappingJobManager(mappingRepository, new MappingContextLoader(mappingRepository), schemaRepository, Map.empty, sparkSession) - fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = lMappingJob.mappings, job = lMappingJob), sourceSettings = lMappingJob.sourceSettings, sinkSettings = lMappingJob.sinkSettings) flatMap { unit => + fhirMappingJobManager.executeMappingJob(mappingJobExecution = FhirMappingJobExecution(mappingTasks = lMappingJob.mappings, job = lMappingJob), sourceSettings = lMappingJob.sourceSettings, sinkSettings = lMappingJob.sinkSettings.asInstanceOf[FhirRepositorySinkSettings].copy(fhirRepoUrl = onFhirClient.getBaseUrl())) flatMap { unit => //Delete written resources var batchRequest: FhirBatchTransactionRequestBuilder = onFhirClient.batch() (1 to 10).foreach { i => diff --git a/tofhir-server/src/test/scala/io/tofhir/server/BaseEndpointTest.scala b/tofhir-server/src/test/scala/io/tofhir/server/BaseEndpointTest.scala index 37124aa7..68b4c20f 100644 --- a/tofhir-server/src/test/scala/io/tofhir/server/BaseEndpointTest.scala +++ b/tofhir-server/src/test/scala/io/tofhir/server/BaseEndpointTest.scala @@ -6,7 +6,6 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest import io.onfhir.client.OnFhirNetworkClient import io.tofhir.common.model.Json4sSupport.formats import io.tofhir.engine.config.ToFhirEngineConfig -import io.tofhir.engine.util.FhirMappingJobFormatter.EnvironmentVariable import io.tofhir.engine.util.FileUtils import io.tofhir.server.config.RedCapServiceConfig import io.tofhir.server.common.config.WebServerConfig @@ -26,10 +25,6 @@ import org.testcontainers.utility.DockerImageName import java.io.File import java.util.UUID -import java.util.concurrent.TimeUnit -import scala.concurrent.Await -import scala.concurrent.duration.FiniteDuration -import scala.util.Try trait BaseEndpointTest extends AnyWordSpec with Matchers with ScalatestRouteTest with BeforeAndAfterAll { // toFHIR engine config From c9e51ae94ad1f0b916473cc1128b8d8b1e8cf923 Mon Sep 17 00:00:00 2001 From: okanmercan Date: Tue, 7 May 2024 17:56:43 +0300 Subject: [PATCH 3/5] :white_check_mark: test(FhirMappingJobManagerTest): Fix identity service test failure. --- .../test/scala/io/tofhir/test/FhirMappingJobManagerTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tofhir-engine/src/test/scala/io/tofhir/test/FhirMappingJobManagerTest.scala b/tofhir-engine/src/test/scala/io/tofhir/test/FhirMappingJobManagerTest.scala index 929b48b6..bd2bf7cc 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/test/FhirMappingJobManagerTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/test/FhirMappingJobManagerTest.scala @@ -375,7 +375,7 @@ class FhirMappingJobManagerTest extends AsyncFlatSpec with BeforeAndAfterAll wit sourceSettings = dataSourceSettings, sinkSettings = fhirSinkSettings, terminologyServiceSettings = Some(terminologyServiceSettings), - identityServiceSettings = lMappingJob.getIdentityServiceSettings()) map { res => + identityServiceSettings = lMappingJob.copy(sinkSettings = lMappingJob.sinkSettings.asInstanceOf[FhirRepositorySinkSettings].copy(fhirRepoUrl = onFhirClient.getBaseUrl())).getIdentityServiceSettings()) map { res => res shouldBe a[Unit] } } From 6edcfa33dc89c7b818f19ce7efa86f873d6e9663 Mon Sep 17 00:00:00 2001 From: okanmercan Date: Wed, 8 May 2024 10:20:43 +0300 Subject: [PATCH 4/5] :white_check_mark: test(FhirMappingJobManagerTest): Fix dataFolderPath error. --- .../test/resources/patient-mapping-job-with-two-sources.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tofhir-engine/src/test/resources/patient-mapping-job-with-two-sources.json b/tofhir-engine/src/test/resources/patient-mapping-job-with-two-sources.json index 1f2caa06..37364111 100644 --- a/tofhir-engine/src/test/resources/patient-mapping-job-with-two-sources.json +++ b/tofhir-engine/src/test/resources/patient-mapping-job-with-two-sources.json @@ -5,14 +5,14 @@ "jsonClass" : "FileSystemSourceSettings", "name" : "patient-test-data", "sourceUri" : "http://test-data", - "dataFolderPath" : "/test-data", + "dataFolderPath" : "./test-data", "asStream" : false }, "patientGender" : { "jsonClass" : "FileSystemSourceSettings", "name" : "patient-gender-test-data", "sourceUri" : "http://test-data", - "dataFolderPath" : "/test-data-gender", + "dataFolderPath" : "./test-data-gender", "asStream" : false } }, From 3e7c3db7ee9ab7398ce8413957a88987cd7a385e Mon Sep 17 00:00:00 2001 From: dogukan10 Date: Thu, 9 May 2024 17:19:50 +0300 Subject: [PATCH 5/5] :white_check_mark: test: Increase the timeout for Kafka source integration test --- .../io/tofhir/integrationtest/KafkaSourceIntegrationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tofhir-engine/src/test/scala/io/tofhir/integrationtest/KafkaSourceIntegrationTest.scala b/tofhir-engine/src/test/scala/io/tofhir/integrationtest/KafkaSourceIntegrationTest.scala index c5e8e565..fa6ff1c9 100644 --- a/tofhir-engine/src/test/scala/io/tofhir/integrationtest/KafkaSourceIntegrationTest.scala +++ b/tofhir-engine/src/test/scala/io/tofhir/integrationtest/KafkaSourceIntegrationTest.scala @@ -193,7 +193,7 @@ class KafkaSourceIntegrationTest extends AnyFlatSpec with ToFhirTestSpec with Be sinkSettings = fhirSinkSettings ) streamingQueryFutures.foreach(sq => { - val streamingQuery: StreamingQuery = Await.result(sq._2, FiniteDuration.apply(5, TimeUnit.SECONDS)) // First wait for the StreamingQuery to become available + val streamingQuery: StreamingQuery = Await.result(sq._2, FiniteDuration.apply(60, TimeUnit.SECONDS)) // First wait for the StreamingQuery to become available streamingQuery.awaitTermination(20000L) // Wait for 20 seconds to consume and write to the fhir repo and terminate streamingQuery.stop() io.FileUtils.deleteDirectory(new File(execution.getCheckpointDirectory(sq._1))) // Clear checkpoint directory to prevent conflicts with other tests