diff --git a/src/main/kotlin/no/nav/syfo/Bootstrap.kt b/src/main/kotlin/no/nav/syfo/Bootstrap.kt index c59d3f6c..242edc45 100644 --- a/src/main/kotlin/no/nav/syfo/Bootstrap.kt +++ b/src/main/kotlin/no/nav/syfo/Bootstrap.kt @@ -18,19 +18,11 @@ import io.ktor.server.engine.embeddedServer import io.ktor.server.netty.Netty import io.ktor.util.KtorExperimentalAPI import io.prometheus.client.hotspot.DefaultExports -import java.nio.file.Paths -import java.time.Duration -import java.util.Properties -import java.util.UUID -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import net.logstash.logback.argument.StructuredArguments import no.nav.helse.eiFellesformat.XMLEIFellesformat import no.nav.joarkjournalfoeringhendelser.JournalfoeringHendelseRecord @@ -53,6 +45,10 @@ import no.nav.tjeneste.virksomhet.person.v3.binding.PersonV3 import org.apache.kafka.clients.consumer.KafkaConsumer import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.nio.file.Paths +import java.time.Duration +import java.util.* +import java.util.concurrent.TimeUnit const val STANDARD_NAV_ENHET = "0393" @@ -62,8 +58,6 @@ fun doReadynessCheck(): Boolean { data class ApplicationState(var running: Boolean = true, var initialized: Boolean = false) -val coroutineContext = Executors.newFixedThreadPool(1).asCoroutineDispatcher() - val log: Logger = LoggerFactory.getLogger("nav.syfo.papirmottak") val objectMapper: ObjectMapper = ObjectMapper().apply { @@ -73,7 +67,7 @@ val objectMapper: ObjectMapper = ObjectMapper().apply { } @KtorExperimentalAPI -fun main() = runBlocking(coroutineContext) { +fun main() { val env = Environment() val credentials = objectMapper.readValue(Paths.get("/var/run/secrets/nais.io/vault/credentials.json").toFile()) @@ -141,8 +135,8 @@ fun main() = runBlocking(coroutineContext) { }) } -fun CoroutineScope.createListener(applicationState: ApplicationState, action: suspend CoroutineScope.() -> Unit): Job = - launch { +fun createListener(applicationState: ApplicationState, action: suspend CoroutineScope.() -> Unit): Job = + GlobalScope.launch { try { action() } catch (e: TrackableException) { @@ -153,22 +147,18 @@ fun CoroutineScope.createListener(applicationState: ApplicationState, action: su } @KtorExperimentalAPI -suspend fun CoroutineScope.launchListeners( +fun launchListeners( env: Environment, applicationState: ApplicationState, consumerProperties: Properties, behandlingService: BehandlingService ) { - val journalfoeringHendelseListeners = 0.until(env.applicationThreads).map { - val kafkaconsumerJournalfoeringHendelse = KafkaConsumer(consumerProperties) - kafkaconsumerJournalfoeringHendelse.subscribe(listOf(env.dokJournalfoeringV1Topic)) - - createListener(applicationState) { - blockingApplicationLogic(applicationState, kafkaconsumerJournalfoeringHendelse, behandlingService) - } - }.toList() + val kafkaconsumerJournalfoeringHendelse = KafkaConsumer(consumerProperties) + kafkaconsumerJournalfoeringHendelse.subscribe(listOf(env.dokJournalfoeringV1Topic)) - journalfoeringHendelseListeners.forEach { it.join() } + createListener(applicationState) { + blockingApplicationLogic(applicationState, kafkaconsumerJournalfoeringHendelse, behandlingService) + } } @KtorExperimentalAPI @@ -176,7 +166,7 @@ suspend fun blockingApplicationLogic( applicationState: ApplicationState, consumer: KafkaConsumer, behandlingService: BehandlingService -) = coroutineScope { +) { while (applicationState.running) { consumer.poll(Duration.ofMillis(0)).forEach { consumerRecord -> val journalfoeringHendelseRecord = consumerRecord.value() diff --git a/src/main/kotlin/no/nav/syfo/Environment.kt b/src/main/kotlin/no/nav/syfo/Environment.kt index e811f678..6e0f7ce0 100644 --- a/src/main/kotlin/no/nav/syfo/Environment.kt +++ b/src/main/kotlin/no/nav/syfo/Environment.kt @@ -5,7 +5,6 @@ import no.nav.syfo.kafka.KafkaCredentials data class Environment( val applicationPort: Int = getEnvVar("APPLICATION_PORT", "8080").toInt(), - val applicationThreads: Int = getEnvVar("APPLICATION_THREADS", "1").toInt(), val applicationName: String = getEnvVar("NAIS_APP_NAME", "syfosmpapirmottak"), override val kafkaBootstrapServers: String = getEnvVar("KAFKA_BOOTSTRAP_SERVERS_URL"), val dokJournalfoeringV1Topic: String = getEnvVar("DOK_JOURNALFOERING_V1_TOPIC"), diff --git a/src/main/kotlin/no/nav/syfo/service/BehandlingService.kt b/src/main/kotlin/no/nav/syfo/service/BehandlingService.kt index 3194d173..965cf847 100644 --- a/src/main/kotlin/no/nav/syfo/service/BehandlingService.kt +++ b/src/main/kotlin/no/nav/syfo/service/BehandlingService.kt @@ -1,7 +1,6 @@ package no.nav.syfo.service import io.ktor.util.KtorExperimentalAPI -import kotlinx.coroutines.coroutineScope import net.logstash.logback.argument.StructuredArguments import net.logstash.logback.argument.StructuredArguments.fields import no.nav.joarkjournalfoeringhendelser.JournalfoeringHendelseRecord @@ -29,7 +28,7 @@ class BehandlingService constructor( journalfoeringEvent: JournalfoeringHendelseRecord, loggingMeta: LoggingMeta, sykmeldingId: String - ) = coroutineScope { + ) { wrapExceptions(loggingMeta) { val journalpostId = journalfoeringEvent.journalpostId.toString() diff --git a/src/test/kotlin/no/nav/syfo/KafkaITSpek.kt b/src/test/kotlin/no/nav/syfo/KafkaITSpek.kt index 11d9076a..3c277785 100644 --- a/src/test/kotlin/no/nav/syfo/KafkaITSpek.kt +++ b/src/test/kotlin/no/nav/syfo/KafkaITSpek.kt @@ -35,7 +35,6 @@ object KafkaITSpek : Spek({ val config = Environment( applicationPort = getRandomPort(), kafkaBootstrapServers = embeddedEnvironment.brokersURL, - applicationThreads = 1, safV1Url = "saf/api", applicationName = "syfosmpapirmottak", aktoerregisterV1Url = "aktorurl",