Skip to content

Commit

Permalink
EY-4849 send melding endring enhet (#6812)
Browse files Browse the repository at this point in the history
* Oppsett for å sende melding til statistikk ved endring av enhet

* Jobb for å sende meldinger på det som er under behandling

Siden vi har en del oppgaver som er under behandling som potensielt har
endret enheter må vi sende ut en melding for disse også

* Setter opp kjøring av send meldinger jobb

* Bump migreringsscript

* Passer på at teknisk tid for oppdatert enhet er strengt stigende

* Og en bump til

* Korrigerer migreringsscript

* Bump migreringsscript
  • Loading branch information
oyvindsh authored Jan 24, 2025
1 parent 69965dc commit 8c52a4b
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 0 deletions.
1 change: 1 addition & 0 deletions apps/etterlatte-behandling/src/main/kotlin/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private fun timerJobs(context: ApplicationContext): List<TimerJob> =
context.doedsmeldingerJob,
context.doedsmeldingerReminderJob,
context.saksbehandlerJob,
context.sendMeldingOmOppgaverUnderBehandlingJob,
)

@Deprecated("Denne blir brukt i veldig mange testar. Bør rydde opp, men tar det etter denne endringa er inne")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package no.nav.etterlatte.behandling

import no.nav.etterlatte.kafka.JsonMessage
import no.nav.etterlatte.kafka.KafkaProdusent
import no.nav.etterlatte.libs.common.Enhetsnummer
import no.nav.etterlatte.libs.common.aktivitetsplikt.AKTIVITETSPLIKT_DTO_RIVER_KEY
import no.nav.etterlatte.libs.common.aktivitetsplikt.AktivitetspliktDto
import no.nav.etterlatte.libs.common.aktivitetsplikt.AktivitetspliktHendelse
import no.nav.etterlatte.libs.common.behandling.BEHANDLING_ID_PAA_VENT_RIVER_KEY
import no.nav.etterlatte.libs.common.behandling.BehandlingHendelseType
import no.nav.etterlatte.libs.common.behandling.NY_ENHET_KEY
import no.nav.etterlatte.libs.common.behandling.PAA_VENT_AARSAK_KEY
import no.nav.etterlatte.libs.common.behandling.PaaVentAarsak
import no.nav.etterlatte.libs.common.behandling.REFERANSE_ENDRET_ENHET_KEY
import no.nav.etterlatte.libs.common.behandling.STATISTIKKBEHANDLING_RIVER_KEY
import no.nav.etterlatte.libs.common.behandling.StatistikkBehandling
import no.nav.etterlatte.libs.common.logging.getCorrelationId
Expand All @@ -26,6 +29,12 @@ interface BehandlingHendelserKafkaProducer {
overstyrtTekniskTid: Tidspunkt? = null,
)

fun sendMeldingForEndretEnhet(
oppgaveReferanse: String,
enhet: Enhetsnummer,
overstyrtTekniskTid: Tidspunkt? = null,
)

fun sendMeldingForHendelsePaaVent(
behandlingId: UUID,
hendelseType: BehandlingHendelseType,
Expand Down Expand Up @@ -72,6 +81,34 @@ class BehandlingsHendelserKafkaProducerImpl(
}
}

override fun sendMeldingForEndretEnhet(
oppgaveReferanse: String,
enhet: Enhetsnummer,
overstyrtTekniskTid: Tidspunkt?,
) {
val correlationId = getCorrelationId()
val hendelse = BehandlingHendelseType.ENDRET_ENHET
rapid
.publiser(
oppgaveReferanse,
JsonMessage
.newMessage(
hendelse.lagEventnameForType(),
mapOf(
CORRELATION_ID_KEY to correlationId,
TEKNISK_TID_KEY to (overstyrtTekniskTid ?: Tidspunkt.now()),
REFERANSE_ENDRET_ENHET_KEY to oppgaveReferanse,
NY_ENHET_KEY to enhet.enhetNr,
),
).toJson(),
).also { (partition, offset) ->
logger.info(
"Postet hendelse ${hendelse.lagEventnameForType()} for oppgave med referanse " +
"$oppgaveReferanse til partisjon $partition, offset $offset correlationid: $correlationId",
)
}
}

override fun sendMeldingForHendelsePaaVent(
behandlingId: UUID,
hendelseType: BehandlingHendelseType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package no.nav.etterlatte.behandling.jobs

import no.nav.etterlatte.Context
import no.nav.etterlatte.Kontekst
import no.nav.etterlatte.Self
import no.nav.etterlatte.behandling.BehandlingHendelserKafkaProducer
import no.nav.etterlatte.behandling.hendelse.getUUID
import no.nav.etterlatte.common.ConnectionAutoclosing
import no.nav.etterlatte.common.DatabaseContext
import no.nav.etterlatte.inTransaction
import no.nav.etterlatte.jobs.LoggerInfo
import no.nav.etterlatte.jobs.fixedRateCancellableTimer
import no.nav.etterlatte.libs.common.Enhetsnummer
import no.nav.etterlatte.libs.common.TimerJob
import no.nav.etterlatte.libs.database.toList
import no.nav.etterlatte.libs.ktor.token.HardkodaSystembruker
import no.nav.etterlatte.oppgave.OppgaveDaoMedEndringssporing
import no.nav.etterlatte.sak.SakTilgangDao
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.Timer
import java.util.UUID
import javax.sql.DataSource

class SendMeldingOmOppgaverUnderBehandlingJob(
private val erLeader: () -> Boolean,
private val sendEndretEnhet: SendEndretEnhet,
sakTilgangDao: SakTilgangDao,
dataSource: DataSource,
) : TimerJob {
private val jobbNavn = this::class.java.simpleName
private val logger: Logger = LoggerFactory.getLogger(SendMeldingOmOppgaverUnderBehandlingJob::class.java)
private val initialDelay = Duration.ofMinutes(3)
private val period = Duration.ofMinutes(1)

private var jobContext: Context =
Context(
Self(SendMeldingOmOppgaverUnderBehandlingJob::class.java.simpleName),
DatabaseContext(dataSource),
sakTilgangDao,
HardkodaSystembruker.doedshendelse,
)

override fun schedule(): Timer {
logger.info("$jobbNavn er satt til å kjøre med oppstart=$initialDelay og periode $period")

return fixedRateCancellableTimer(
name = jobbNavn,
initialDelay = initialDelay.toMillis(),
loggerInfo = LoggerInfo(logger = logger, loggTilSikkerLogg = false),
period = period.toMillis(),
) {
if (erLeader()) {
sendEndretEnhet
.setupKontekstAndRun(jobContext)
}
}
}
}

class SendEndretEnhet(
private val behandlingHendelserKafkaProducer: BehandlingHendelserKafkaProducer,
private val oppgaveEnhetEndretDao: OppgaveEnhetEndretDao,
private val oppgaveEndringerDao: OppgaveDaoMedEndringssporing,
) {
private val logger: Logger = LoggerFactory.getLogger(SendMeldingOmOppgaverUnderBehandlingJob::class.java)

fun setupKontekstAndRun(jobContext: Context) {
Kontekst.set(jobContext)
sendMeldingForEnheter()
}

private fun sendMeldingForEnheter() {
val oppgaverAaSendeMeldingFor =
try {
inTransaction {
oppgaveEnhetEndretDao.hentOppgaverUnderBehandlingManglerMelding()
}
} catch (e: Exception) {
logger.warn("Kunne ikke hente ut oppgaver under behandling med enhet", e)
return
}
oppgaverAaSendeMeldingFor.forEach { oppgaveOgEnhet ->
try {
inTransaction {
val endringerForOppgaveSistFoerst =
oppgaveEndringerDao.hentEndringerForOppgave(oppgaveOgEnhet.oppgaveId).sortedByDescending {
it.tidspunkt
}
val tidspunktForEndring =
endringerForOppgaveSistFoerst
.firstOrNull { it.oppgaveFoer.enhet != oppgaveOgEnhet.enhet }
?.tidspunkt
behandlingHendelserKafkaProducer.sendMeldingForEndretEnhet(
oppgaveReferanse = oppgaveOgEnhet.referanse,
enhet = oppgaveOgEnhet.enhet,
overstyrtTekniskTid = tidspunktForEndring,
)
oppgaveEnhetEndretDao.oppdaterSendtMelding(oppgaveOgEnhet.oppgaveId)
}
} catch (e: Exception) {
logger.warn(
"Kunne ikke sende endret enhet for oppgave med id=${oppgaveOgEnhet.oppgaveId}, " +
"referanse=${oppgaveOgEnhet.referanse}",
e,
)
}
}
}
}

data class OppgaveEndretEnhet(
val oppgaveId: UUID,
val referanse: String,
val enhet: Enhetsnummer,
)

class OppgaveEnhetEndretDao(
private val connectionAutoclosing: ConnectionAutoclosing,
) {
private val logger: Logger = LoggerFactory.getLogger(OppgaveEnhetEndretDao::class.java)

fun hentOppgaverUnderBehandlingManglerMelding(): List<OppgaveEndretEnhet> =
connectionAutoclosing.hentConnection {
val statement =
it.prepareStatement(
"""
SELECT oppgave_id, referanse, enhet from send_melding_om_enhet where not sendt limit 10
""".trimIndent(),
)
statement.executeQuery().toList {
val oppgaveId = getUUID("oppgave_id")
val referanse = getString("referanse")
val enhet = Enhetsnummer(getString("enhet"))
OppgaveEndretEnhet(
oppgaveId = oppgaveId,
referanse = referanse,
enhet = enhet,
)
}
}

fun oppdaterSendtMelding(oppgaveId: UUID) {
connectionAutoclosing.hentConnection {
val statement =
it.prepareStatement(
"""
UPDATE send_melding_om_enhet SET sendt = true where oppgave_id = ?
""".trimIndent(),
)
statement.setObject(1, oppgaveId)
val updated = statement.executeUpdate()
if (updated != 1) {
logger.warn("Kunne ikke oppdatere sendt enhet for oppgave med id=$oppgaveId")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import no.nav.etterlatte.behandling.hendelse.HendelseDao
import no.nav.etterlatte.behandling.job.SaksbehandlerJobService
import no.nav.etterlatte.behandling.jobs.DoedsmeldingJob
import no.nav.etterlatte.behandling.jobs.DoedsmeldingReminderJob
import no.nav.etterlatte.behandling.jobs.OppgaveEnhetEndretDao
import no.nav.etterlatte.behandling.jobs.SaksbehandlerJob
import no.nav.etterlatte.behandling.jobs.SendEndretEnhet
import no.nav.etterlatte.behandling.jobs.SendMeldingOmOppgaverUnderBehandlingJob
import no.nav.etterlatte.behandling.klage.KlageBrevService
import no.nav.etterlatte.behandling.klage.KlageDaoImpl
import no.nav.etterlatte.behandling.klage.KlageHendelserServiceImpl
Expand Down Expand Up @@ -612,6 +615,21 @@ internal class ApplicationContext(
behandlingService = behandlingService,
)

// Utsending av meldinger for enheter oppgaver under behandling
private val sendEndretEnhet =
SendEndretEnhet(
behandlingHendelserKafkaProducer = behandlingsHendelser,
oppgaveEnhetEndretDao = OppgaveEnhetEndretDao(connectionAutoclosing = autoClosingDatabase),
oppgaveEndringerDao = oppgaveDaoEndringer,
)
val sendMeldingOmOppgaverUnderBehandlingJob: SendMeldingOmOppgaverUnderBehandlingJob =
SendMeldingOmOppgaverUnderBehandlingJob(
erLeader = { leaderElectionKlient.isLeader() },
sendEndretEnhet = sendEndretEnhet,
sakTilgangDao = sakTilgangDao,
dataSource = dataSource,
)

// Jobs
val metrikkerJob: MetrikkerJob by lazy {
MetrikkerJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,19 @@ class OppgaveService(
oppgaverForSak.forEach {
if (it.erUnderBehandling()) {
oppgaveDao.endreStatusPaaOppgave(it.id, Status.NY)

// For oppgaver som ikke er ferdige er det relevant for saksbehandlingsstatistikken
// å få en oppdatert rad med ny enhet
when (it.type) {
OppgaveType.FOERSTEGANGSBEHANDLING,
OppgaveType.REVURDERING,
OppgaveType.TILBAKEKREVING,
OppgaveType.KLAGE,
-> {
hendelser.sendMeldingForEndretEnhet(it.referanse, enhetsID)
}
else -> Unit
}
}
oppgaveDao.endreEnhetPaaOppgave(it.id, enhetsID)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
create table send_melding_om_enhet
(
oppgave_id uuid,
referanse text,
enhet text,
sendt boolean default false
);

-- Legger inn alle oppgaver knyttet til saksbehandlingsstatistikk-oppgaver som er under behandling
-- i listen over oppgaver som skal bli sendt på nytt
insert into send_melding_om_enhet (oppgave_id, referanse, enhet)
select o.id, o.referanse, s.enhet
from oppgave o
inner join public.sak s on o.sak_id = s.id
where o.type in ('KLAGE', 'TILBAKEKREVING', 'FOERSTEGANGSBEHANDLING', 'REVURDERING')
and o.status not in ('AVBRUTT', 'FERDIGSTILT');
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import no.nav.etterlatte.statistikk.database.StoenadRepository
import no.nav.etterlatte.statistikk.jobs.MaanedligStatistikkJob
import no.nav.etterlatte.statistikk.river.AktivitetspliktHendelseRiver
import no.nav.etterlatte.statistikk.river.AvbruttOpprettetBehandlinghendelseRiver
import no.nav.etterlatte.statistikk.river.BehandlingEndretEnhetRiver
import no.nav.etterlatte.statistikk.river.BehandlingPaaVentHendelseRiver
import no.nav.etterlatte.statistikk.river.KlagehendelseRiver
import no.nav.etterlatte.statistikk.river.SoeknadStatistikkRiver
Expand Down Expand Up @@ -52,6 +53,7 @@ class ApplicationContext {
SoeknadStatistikkRiver(rapidsConnection, soeknadStatistikkService)
KlagehendelseRiver(rapidsConnection, statistikkService)
AktivitetspliktHendelseRiver(rapidsConnection, aktivitetspliktService)
BehandlingEndretEnhetRiver(rapidsConnection, statistikkService)
}

private val behandlingKlient: BehandlingKlient by lazy {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package no.nav.etterlatte.statistikk.river

import no.nav.etterlatte.libs.common.Enhetsnummer
import no.nav.etterlatte.libs.common.behandling.BehandlingHendelseType
import no.nav.etterlatte.libs.common.behandling.NY_ENHET_KEY
import no.nav.etterlatte.libs.common.behandling.REFERANSE_ENDRET_ENHET_KEY
import no.nav.etterlatte.libs.common.objectMapper
import no.nav.etterlatte.libs.common.rapidsandrivers.TEKNISK_TID_KEY
import no.nav.etterlatte.libs.common.rapidsandrivers.correlationId
import no.nav.etterlatte.libs.common.rapidsandrivers.lagParMedEventNameKey
import no.nav.etterlatte.libs.common.toJson
import no.nav.etterlatte.rapidsandrivers.ListenerMedLogging
import no.nav.etterlatte.statistikk.service.StatistikkService
import no.nav.helse.rapids_rivers.JsonMessage
import no.nav.helse.rapids_rivers.MessageContext
import no.nav.helse.rapids_rivers.RapidsConnection
import org.slf4j.LoggerFactory
import java.util.UUID

class BehandlingEndretEnhetRiver(
rapidsConnection: RapidsConnection,
private val service: StatistikkService,
) : ListenerMedLogging() {
private val logger = LoggerFactory.getLogger(this::class.java)

init {
initialiserRiver(rapidsConnection, BehandlingHendelseType.ENDRET_ENHET) {
validate { it.requireKey(TEKNISK_TID_KEY) }
validate { it.requireKey(REFERANSE_ENDRET_ENHET_KEY) }
validate { it.requireKey(NY_ENHET_KEY) }
}
}

override fun haandterPakke(
packet: JsonMessage,
context: MessageContext,
) = try {
val tekniskTid = parseTekniskTid(packet, logger)
val referanse = UUID.fromString(packet[REFERANSE_ENDRET_ENHET_KEY].textValue())
val nyEnhet = Enhetsnummer(packet[NY_ENHET_KEY].textValue())

service
.registrerEndretEnhetForReferanse(referanse, nyEnhet, tekniskTid)
?.also {
context.publish(
mapOf(
StatistikkhendelseType.REGISTRERT.lagParMedEventNameKey(),
"sak_rad" to objectMapper.writeValueAsString(it),
).toJson(),
)
} ?: logger.info("Ikke registrert statistikk på pakken ${packet.correlationId}")
} catch (e: Exception) {
logger.error(
"""
Kunne ikke mappe ut statistikk for endret enhet-hendelse i pakken med korrelasjonsId ${packet.correlationId}
""".trimIndent(),
)
}
}
Loading

0 comments on commit 8c52a4b

Please sign in to comment.