From 187090e06867dc79cfe0d3bb6cc4a969d61b9115 Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 11 Oct 2024 11:38:51 +0200 Subject: [PATCH] Introduce passivation for projections on inactive projects (#5175) * Introduce passivation for projections on inactive projects --------- Co-authored-by: Simon Dumas --- delta/app/src/main/resources/app.conf | 35 +-- .../nexus/delta/config/AppConfig.scala | 3 +- .../delta/routes/SupervisionRoutes.scala | 10 +- .../nexus/delta/wiring/DeltaModule.scala | 3 +- .../nexus/delta/wiring/EventsModule.scala | 6 +- .../nexus/delta/wiring/ProjectsModule.scala | 3 + .../nexus/delta/wiring/StreamModule.scala | 26 +- .../delta/wiring/SupervisionModule.scala | 8 +- .../delta/routes/SupervisionRoutesSpec.scala | 56 +++-- .../GraphAnalyticsPluginModule.scala | 7 +- .../indexing/GraphAnalyticsStream.scala | 9 +- .../deletion/ProjectDeletionCoordinator.scala | 9 +- .../nexus/delta/sdk/sse/SseElemStream.scala | 29 +-- .../sdk/stream/GraphResourceStream.scala | 51 ++-- .../ProjectDeletionCoordinatorSuite.scala | 26 +- .../scripts/postgres/drop/drop-tables.ddl | 1 + .../V1_11_M03_001__project_last_updates.ddl | 4 +- .../sourcing/config/ElemQueryConfig.scala | 62 +++++ .../config/ProjectLastUpdateConfig.scala | 4 +- .../delta/sourcing/config/QueryConfig.scala | 8 +- .../delta/sourcing/event/EventStreaming.scala | 2 +- .../delta/sourcing/exporter/Exporter.scala | 4 +- .../projections/ProjectLastUpdateStore.scala | 31 +-- .../projections/ProjectLastUpdateStream.scala | 37 +++ .../projections/ProjectionRestartStore.scala | 15 +- .../projections/model/ProjectLastUpdate.scala | 2 - .../delta/sourcing/query/ElemStreaming.scala | 236 ++++++++++++++++++ .../delta/sourcing/query/RefreshOrStop.scala | 76 ++++++ .../delta/sourcing/query/StreamingQuery.scala | 202 +-------------- .../sourcing/state/GlobalStateStore.scala | 2 +- .../sourcing/state/ScopedStateStore.scala | 2 +- .../stream/ProjectActivitySignals.scala | 128 ++++++++++ .../stream/ProjectLastUpdateProjection.scala | 70 ------ .../stream/ProjectLastUpdateWrites.scala | 71 ++++++ .../stream/ProjectLastUpdatesSink.scala | 33 +-- .../sourcing/stream/ProjectSignals.scala | 62 +++++ .../ProjectLastUpdateStoreSuite.scala | 26 +- ...rySuite.scala => ElemStreamingSuite.scala} | 24 +- .../sourcing/query/RefreshOrStopSuite.scala | 85 +++++++ .../stream/ProjectActivitySignalsSuite.scala | 57 +++++ .../stream/ProjectLastUpdatesSinkSuite.scala | 23 +- .../sourcing/stream/ProjectSignalsSuite.scala | 41 +++ tests/docker/config/delta-postgres.conf | 23 +- 43 files changed, 1126 insertions(+), 486 deletions(-) create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ElemQueryConfig.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStream.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/ElemStreaming.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStop.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectActivitySignals.scala delete mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdateProjection.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdateWrites.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectSignals.scala rename delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/{StreamingQuerySuite.scala => ElemStreamingSuite.scala} (93%) create mode 100644 delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStopSuite.scala create mode 100644 delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectActivitySignalsSuite.scala create mode 100644 delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectSignalsSuite.scala diff --git a/delta/app/src/main/resources/app.conf b/delta/app/src/main/resources/app.conf index 4e3f37256b..4f788d00cc 100644 --- a/delta/app/src/main/resources/app.conf +++ b/delta/app/src/main/resources/app.conf @@ -50,6 +50,27 @@ app { slow-query-threshold = 2 seconds } + elem-query { + type = "delay" + batch-size = 30 + delay = 2s + } + + project-last-update { + batch { + # the maximum batching size, corresponding to the maximum number of elements being aggregated + # at the same time before pushing the update. + max-elements = 100 + # the maximum batching duration. + max-interval = 1 second + } + query { + batch-size = 30 + refresh-strategy = 1 second + } + inactive-interval = 10 minutes + } + # Database export configuration export { batch-size = 30 @@ -295,20 +316,6 @@ app { } } - project-last-update { - batch { - # the maximum batching size, corresponding to the maximum number of elements being aggregated - # at the same time before pushing the update. - max-elements = 100 - # the maximum batching duration. - max-interval = 1 seconds - } - query { - batch-size = 30 - refresh-strategy = 1s - } - } - # Type hierarchy configuration type-hierarchy { # the type hierarchy event-log configuration diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfig.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfig.scala index c5e61e8f51..e930828e8d 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfig.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfig.scala @@ -20,7 +20,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.ResourcesConfig import ch.epfl.bluebrain.nexus.delta.sdk.schemas.SchemasConfig import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseConfig import ch.epfl.bluebrain.nexus.delta.sdk.typehierarchy.TypeHierarchyConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectLastUpdateConfig, ProjectionConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ElemQueryConfig, ProjectLastUpdateConfig, ProjectionConfig} import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportConfig import com.typesafe.config.Config import pureconfig.ConfigReader @@ -50,6 +50,7 @@ final case class AppConfig( schemas: SchemasConfig, typeHierarchy: TypeHierarchyConfig, serviceAccount: ServiceAccountConfig, + elemQuery: ElemQueryConfig, sse: SseConfig, projections: ProjectionConfig, projectLastUpdate: ProjectLastUpdateConfig, diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SupervisionRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SupervisionRoutes.scala index ff57174eb1..4712859fbb 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SupervisionRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SupervisionRoutes.scala @@ -21,9 +21,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.{projects, supervision} import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ProjectHealer, ProjectRejection, ProjectsHealth} import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisedDescription +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectActivitySignals, SupervisedDescription} import io.circe.generic.semiauto.deriveEncoder -import io.circe.syntax.KeyOps +import io.circe.syntax.{EncoderOps, KeyOps} import io.circe.{Encoder, Json} class SupervisionRoutes( @@ -31,7 +31,8 @@ class SupervisionRoutes( aclCheck: AclCheck, supervised: IO[List[SupervisedDescription]], projectsHealth: ProjectsHealth, - projectHealer: ProjectHealer + projectHealer: ProjectHealer, + activitySignals: ProjectActivitySignals )(implicit baseUri: BaseUri, cr: RemoteContextResolution, @@ -54,6 +55,9 @@ class SupervisionRoutes( if (projects.isEmpty) emit(StatusCodes.OK, IO.pure(allProjectsAreHealthy)) else emit(StatusCodes.InternalServerError, IO.pure(unhealthyProjectsEncoder(projects))) } + }, + (pathPrefix("activity") & pathPrefix("projects") & get & pathEndOrSingleSlash) { + emit(activitySignals.activityMap.map(_.asJson)) } ) }, diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala index 426bb711bb..582bcb401a 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala @@ -30,7 +30,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model._ import ch.epfl.bluebrain.nexus.delta.sdk.plugin.PluginDef import ch.epfl.bluebrain.nexus.delta.sdk.projects.{OwnerPermissionsScopeInitialization, ProjectsConfig, ScopeInitializationErrorStore} import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors -import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectLastUpdateConfig, ProjectionConfig, QueryConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ElemQueryConfig, ProjectLastUpdateConfig, ProjectionConfig, QueryConfig} import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings import com.typesafe.config.Config import izumi.distage.model.definition.{Id, ModuleDef} @@ -57,6 +57,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class make[FusionConfig].from { appCfg.fusion } make[ProjectsConfig].from { appCfg.projects } make[ProjectionConfig].from { appCfg.projections } + make[ElemQueryConfig].from { appCfg.elemQuery } make[ProjectLastUpdateConfig].from { appCfg.projectLastUpdate } make[QueryConfig].from { appCfg.projections.query } make[BaseUri].from { appCfg.http.baseUri } diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala index 28d844e9bc..fb22c5077c 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.organizations.Organizations import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects import ch.epfl.bluebrain.nexus.delta.sdk.sse.{SseElemStream, SseEncoder, SseEventLog} import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors -import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.query.ElemStreaming import izumi.distage.model.definition.{Id, ModuleDef} /** @@ -40,9 +40,7 @@ object EventsModule extends ModuleDef { )(jo) } - make[SseElemStream].from { (qc: QueryConfig, xas: Transactors) => - SseElemStream(qc, xas) - } + make[SseElemStream].from { (elemStreaming: ElemStreaming) => SseElemStream(elemStreaming) } make[EventsRoutes].from { ( diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ProjectsModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ProjectsModule.scala index 0125c6c804..0711e0cf9b 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ProjectsModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ProjectsModule.scala @@ -24,6 +24,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.provisioning.ProjectProvisioning import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectLastUpdateStore import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor import izumi.distage.model.definition.{Id, ModuleDef} @@ -100,6 +101,7 @@ object ProjectsModule extends ModuleDef { config: AppConfig, serviceAccount: ServiceAccount, supervisor: Supervisor, + projectLastUpdateStore: ProjectLastUpdateStore, xas: Transactors, clock: Clock[IO] ) => @@ -109,6 +111,7 @@ object ProjectsModule extends ModuleDef { config.projects.deletion, serviceAccount, supervisor, + projectLastUpdateStore, xas, clock ) diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala index ed113c5a6b..91663cd946 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala @@ -3,8 +3,9 @@ package ch.epfl.bluebrain.nexus.delta.wiring import cats.effect.{Clock, IO, Sync} import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream -import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectLastUpdateConfig, ProjectionConfig, QueryConfig} -import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectLastUpdateStore, ProjectionErrors, Projections} +import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ElemQueryConfig, ProjectLastUpdateConfig, ProjectionConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectLastUpdateStore, ProjectLastUpdateStream, ProjectionErrors, Projections} +import ch.epfl.bluebrain.nexus.delta.sourcing.query.ElemStreaming import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._ @@ -18,8 +19,13 @@ import izumi.distage.model.definition.ModuleDef object StreamModule extends ModuleDef { addImplicit[Sync[IO]] - make[GraphResourceStream].from { (qc: QueryConfig, xas: Transactors, shifts: ResourceShifts) => - GraphResourceStream(qc, xas, shifts) + make[ElemStreaming].from { + (xas: Transactors, queryConfig: ElemQueryConfig, activitySignals: ProjectActivitySignals) => + new ElemStreaming(xas, queryConfig, activitySignals) + } + + make[GraphResourceStream].from { (elemStreaming: ElemStreaming, shifts: ResourceShifts) => + GraphResourceStream(elemStreaming, shifts) } many[PipeDef].add(DiscardMetadata) @@ -54,10 +60,18 @@ object StreamModule extends ModuleDef { } make[ProjectLastUpdateStore].from { (xas: Transactors) => ProjectLastUpdateStore(xas) } + make[ProjectLastUpdateStream].from { (xas: Transactors, config: ProjectLastUpdateConfig) => + ProjectLastUpdateStream(xas, config.query) + } - make[ProjectLastUpdateProjection].fromEffect { + make[ProjectLastUpdateWrites].fromEffect { (supervisor: Supervisor, store: ProjectLastUpdateStore, xas: Transactors, config: ProjectLastUpdateConfig) => - ProjectLastUpdateProjection(supervisor, store, xas, config.batch, config.query) + ProjectLastUpdateWrites(supervisor, store, xas, config.batch) + } + + make[ProjectActivitySignals].fromEffect { + (supervisor: Supervisor, stream: ProjectLastUpdateStream, clock: Clock[IO], config: ProjectLastUpdateConfig) => + ProjectActivitySignals(supervisor, stream, clock, config.inactiveInterval) } make[PurgeProjectionCoordinator.type].fromEffect { diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SupervisionModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SupervisionModule.scala index 5b37d3e29c..404fc40b30 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SupervisionModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SupervisionModule.scala @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ProjectHealer, ProjectsHealth} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectActivitySignals, Supervisor} import izumi.distage.model.definition.{Id, ModuleDef} /** @@ -31,14 +31,16 @@ object SupervisionModule extends ModuleDef { rc: RemoteContextResolution @Id("aggregate"), jo: JsonKeyOrdering, projectsHealth: ProjectsHealth, - projectHealer: ProjectHealer + projectHealer: ProjectHealer, + projectActivitySignals: ProjectActivitySignals ) => new SupervisionRoutes( identities, aclCheck, supervisor.getRunningProjections(), projectsHealth, - projectHealer + projectHealer, + projectActivitySignals )(baseUri, rc, jo) } diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SupervisionRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SupervisionRoutesSpec.scala index 2ca4d7a502..c7200b94af 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SupervisionRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SupervisionRoutesSpec.scala @@ -17,28 +17,29 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authent import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ +import fs2.concurrent.SignallingRef import org.scalatest.Assertion import java.time.Instant class SupervisionRoutesSpec extends BaseRouteSpec { - private val superviser = User("superviser", realm) + private val supervisor = User("supervisor", realm) - implicit private val callerSuperviser: Caller = - Caller(superviser, Set(superviser, Anonymous, Authenticated(realm), Group("group", realm))) + implicit private val callerSupervisor: Caller = + Caller(supervisor, Set(supervisor, Anonymous, Authenticated(realm), Group("group", realm))) - private val asSuperviser = addCredentials(OAuth2BearerToken("superviser")) + private val asSupervisor = addCredentials(OAuth2BearerToken("supervisor")) - private val identities = IdentitiesDummy(callerSuperviser) + private val identities = IdentitiesDummy(callerSupervisor) private val aclCheck = AclSimpleCheck().accepted - private val projectRef = ProjectRef(Label.unsafe("myorg"), Label.unsafe("myproject")) - private val projectRef2 = ProjectRef(Label.unsafe("myorg"), Label.unsafe("myproject2")) + private val project = ProjectRef.unsafe("myorg", "myproject") + private val project2 = ProjectRef.unsafe("myorg", "myproject2") - private val unhealthyProjects = Set(projectRef, projectRef2) + private val unhealthyProjects = Set(project, project2) - private val metadata = ProjectionMetadata("module", "name", Some(projectRef), None) + private val metadata = ProjectionMetadata("module", "name", Some(project), None) private val progress = ProjectionProgress(Offset.start, Instant.EPOCH, 1L, 1L, 1L) private val description1 = SupervisedDescription(metadata, ExecutionStrategy.PersistentSingleNode, 1, ExecutionStatus.Running, progress) @@ -69,13 +70,19 @@ class SupervisionRoutesSpec extends BaseRouteSpec { override def heal(project: ProjectRef): IO[Unit] = IO.unit } + private val activitySignals = new ProjectActivitySignals { + override def apply(project: ProjectRef): IO[Option[SignallingRef[IO, Boolean]]] = IO.none + override def activityMap: IO[Map[ProjectRef, Boolean]] = IO.pure(Map(project -> true, project2 -> false)) + } + private def routesTemplate(unhealthyProjects: Set[ProjectRef], healer: ProjectHealer) = Route.seal( new SupervisionRoutes( identities, aclCheck, IO.pure { List(description1, description2) }, projectsHealth(unhealthyProjects), - healer + healer, + activitySignals ).routes ) @@ -83,7 +90,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec { override def beforeAll(): Unit = { super.beforeAll() - aclCheck.append(AclAddress.Root, superviser -> Set(supervision.read, projects.write)).accepted + aclCheck.append(AclAddress.Root, supervisor -> Set(supervision.read, projects.write)).accepted } "The supervision projection endpoint" should { @@ -95,7 +102,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec { } "be accessible with supervision/read permission and return expected payload" in { - Get("/v1/supervision/projections") ~> asSuperviser ~> routes ~> check { + Get("/v1/supervision/projections") ~> asSupervisor ~> routes ~> check { response.status shouldEqual StatusCodes.OK response.asJson shouldEqual jsonContentOf("supervision/supervision-running-proj-response.json") } @@ -113,14 +120,14 @@ class SupervisionRoutesSpec extends BaseRouteSpec { "return a successful http code when there are no unhealthy projects" in { val routesWithHealthyProjects = routesTemplate(Set.empty, noopHealer) - Get("/v1/supervision/projects") ~> asSuperviser ~> routesWithHealthyProjects ~> check { + Get("/v1/supervision/projects") ~> asSupervisor ~> routesWithHealthyProjects ~> check { response.status shouldEqual StatusCodes.OK } } "return an error code when there are unhealthy projects" in { val routesWithUnhealthyProjects = routesTemplate(unhealthyProjects, noopHealer) - Get("/v1/supervision/projects") ~> asSuperviser ~> routesWithUnhealthyProjects ~> check { + Get("/v1/supervision/projects") ~> asSupervisor ~> routesWithUnhealthyProjects ~> check { response.status shouldEqual StatusCodes.InternalServerError response.asJson shouldEqual json""" @@ -154,7 +161,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec { val project = ProjectRef(Label.unsafe("myorg"), Label.unsafe("myproject")) val routesWithHealer = routesTemplate(Set.empty, projectHealer) - Post(s"/v1/supervision/projects/$project/heal") ~> asSuperviser ~> routesWithHealer ~> check { + Post(s"/v1/supervision/projects/$project/heal") ~> asSupervisor ~> routesWithHealer ~> check { response.status shouldEqual StatusCodes.OK response.asJson shouldEqual json""" @@ -168,7 +175,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec { "return an error if the healing failed" in { val routesWithFailingHealer = routesTemplate(Set.empty, failingHealer) - Post("/v1/supervision/projects/myorg/myproject/heal") ~> asSuperviser ~> routesWithFailingHealer ~> check { + Post("/v1/supervision/projects/myorg/myproject/heal") ~> asSupervisor ~> routesWithFailingHealer ~> check { response.status shouldEqual StatusCodes.InternalServerError response.asJson shouldEqual json""" @@ -184,4 +191,21 @@ class SupervisionRoutesSpec extends BaseRouteSpec { } + "The supervision project activity endpoint" should { + + "be forbidden without supervision/read permission" in { + Get("/v1/supervision/activity/projects") ~> routes ~> check { + response.shouldBeForbidden + } + } + + "be accessible with supervision/read permission and return expected payload" in { + Get("/v1/supervision/activity/projects") ~> asSupervisor ~> routes ~> check { + response.status shouldEqual StatusCodes.OK + response.asJson shouldEqual json"""{ "$project": true, "$project2": false }""" + } + } + + } + } diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginModule.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginModule.scala index cf78499c06..fd2790ba1e 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginModule.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginModule.scala @@ -13,9 +13,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities import ch.epfl.bluebrain.nexus.delta.sdk.model._ import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, Projects} import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors -import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.projections.Projections -import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter +import ch.epfl.bluebrain.nexus.delta.sourcing.query.{ElemStreaming, SelectFilter} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor import izumi.distage.model.definition.{Id, ModuleDef} @@ -33,8 +32,8 @@ class GraphAnalyticsPluginModule(priority: Int) extends ModuleDef { GraphAnalytics(client, fetchContext, config.prefix, config.termAggregations) } - make[GraphAnalyticsStream].from { (qc: QueryConfig, xas: Transactors) => - GraphAnalyticsStream(qc, xas) + make[GraphAnalyticsStream].from { (elemStreaming: ElemStreaming, xas: Transactors) => + GraphAnalyticsStream(elemStreaming, xas) } make[GraphAnalyticsCoordinator].fromEffect { diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala index ac35f1770a..da3e1a65e2 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala @@ -10,12 +10,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileState import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceState -import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} -import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, ProjectRef, Tag} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.query.{SelectFilter, StreamingQuery} +import ch.epfl.bluebrain.nexus.delta.sourcing.query.{ElemStreaming, SelectFilter} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} import doobie._ import doobie.syntax.all._ import io.circe.Json @@ -77,7 +76,7 @@ object GraphAnalyticsStream { } // $COVERAGE-OFF$ - def apply(qc: QueryConfig, xas: Transactors): GraphAnalyticsStream = + def apply(elemStreaming: ElemStreaming, xas: Transactors): GraphAnalyticsStream = (project: ProjectRef, start: Offset) => { // This seems a reasonable value to batch relationship resolution for resources with a lot @@ -102,7 +101,7 @@ object GraphAnalyticsStream { case _ => IO.pure(Noop) } - StreamingQuery.elems(Scope(project), start, SelectFilter.latest, qc, xas, decode) + elemStreaming(Scope(project), start, SelectFilter.latest, decode) } // $COVERAGE-ON$ diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/deletion/ProjectDeletionCoordinator.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/deletion/ProjectDeletionCoordinator.scala index d11d201690..086c84f1fa 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/deletion/ProjectDeletionCoordinator.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/deletion/ProjectDeletionCoordinator.scala @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectLastUpdateStore import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import fs2.Stream @@ -38,6 +39,7 @@ object ProjectDeletionCoordinator { deletionTasks: List[ProjectDeletionTask], deletionConfig: DeletionConfig, serviceAccount: ServiceAccount, + projectLastUpdateStore: ProjectLastUpdateStore, deletionStore: ProjectDeletionStore, clock: Clock[IO] ) extends ProjectDeletionCoordinator { @@ -72,6 +74,8 @@ object ProjectDeletionCoordinator { } // Waiting for events issued by deletion tasks to be taken into account _ <- IO.sleep(deletionConfig.propagationDelay) + // Delete the last updates for this project + _ <- projectLastUpdateStore.delete(project.project) // Delete the events and states and save the deletion report _ <- deletionStore.deleteAndSaveReport(report) _ <- logger.info(s"Project ${project.project} has been successfully deleted.") @@ -89,6 +93,7 @@ object ProjectDeletionCoordinator { deletionTasks: Set[ProjectDeletionTask], deletionConfig: ProjectsConfig.DeletionConfig, serviceAccount: ServiceAccount, + projectLastUpdateStore: ProjectLastUpdateStore, xas: Transactors, clock: Clock[IO] ): ProjectDeletionCoordinator = @@ -98,6 +103,7 @@ object ProjectDeletionCoordinator { deletionTasks.toList, deletionConfig, serviceAccount, + projectLastUpdateStore, new ProjectDeletionStore(xas), clock ) @@ -114,10 +120,11 @@ object ProjectDeletionCoordinator { deletionConfig: ProjectsConfig.DeletionConfig, serviceAccount: ServiceAccount, supervisor: Supervisor, + projectLastUpdateStore: ProjectLastUpdateStore, xas: Transactors, clock: Clock[IO] ): IO[ProjectDeletionCoordinator] = { - val stream = apply(projects, deletionTasks, deletionConfig, serviceAccount, xas, clock) + val stream = apply(projects, deletionTasks, deletionConfig, serviceAccount, projectLastUpdateStore, xas, clock) stream match { case Noop => logger.info("Projection deletion is disabled.").as(Noop) case active: Active => diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala index 6ac39a925c..f784b66575 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala @@ -3,11 +3,10 @@ package ch.epfl.bluebrain.nexus.delta.sdk.sse import akka.http.scaladsl.model.sse.ServerSentEvent import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling.defaultPrinter -import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} -import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.Scope import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, SelectFilter, StreamingQuery} +import ch.epfl.bluebrain.nexus.delta.sourcing.query.{ElemStreaming, SelectFilter} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} import io.circe.syntax.EncoderOps @@ -56,28 +55,18 @@ object SseElemStream { /** * Create a [[SseElemStream]] */ - def apply(qc: QueryConfig, xas: Transactors): SseElemStream = new SseElemStream { + def apply(elemStreaming: ElemStreaming): SseElemStream = new SseElemStream { + + val stopping = elemStreaming.stopping override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream = - StreamingQuery.elems(Scope(project), start, selectFilter, qc, xas).map(toServerSentEvent) + elemStreaming(Scope(project), start, selectFilter).map(toServerSentEvent) override def currents(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream = - StreamingQuery - .elems( - Scope(project), - start, - selectFilter, - qc.copy(refreshStrategy = RefreshStrategy.Stop), - xas - ) - .map(toServerSentEvent) + stopping(Scope(project), start, selectFilter).map(toServerSentEvent) - override def remaining( - project: ProjectRef, - selectFilter: SelectFilter, - start: Offset - ): IO[Option[RemainingElems]] = - StreamingQuery.remaining(Scope(project), selectFilter, start, xas) + override def remaining(project: ProjectRef, selectFilter: SelectFilter, start: Offset): IO[Option[RemainingElems]] = + elemStreaming.remaining(Scope(project), selectFilter, start) } private[sse] def toServerSentEvent(elem: Elem[Unit]): ServerSentEvent = { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/GraphResourceStream.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/GraphResourceStream.scala index 1fe16d6c01..78466aa57c 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/GraphResourceStream.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/GraphResourceStream.scala @@ -2,11 +2,10 @@ package ch.epfl.bluebrain.nexus.delta.sdk.stream import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts -import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} -import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.Scope import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, SelectFilter, StreamingQuery} +import ch.epfl.bluebrain.nexus.delta.sourcing.query.{ElemStreaming, SelectFilter} import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems import fs2.Stream @@ -70,32 +69,32 @@ object GraphResourceStream { /** * Create a graph resource stream */ - def apply( - qc: QueryConfig, - xas: Transactors, - shifts: ResourceShifts - ): GraphResourceStream = new GraphResourceStream { + def apply(elemStreaming: ElemStreaming, shifts: ResourceShifts): GraphResourceStream = + new GraphResourceStream { - override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ElemStream[GraphResource] = - StreamingQuery.elems(Scope(project), start, selectFilter, qc, xas, shifts.decodeGraphResource(_, _)) + val stopping = elemStreaming.stopping - override def currents(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ElemStream[GraphResource] = - StreamingQuery.elems( - Scope(project), - start, - selectFilter, - qc.copy(refreshStrategy = RefreshStrategy.Stop), - xas, - shifts.decodeGraphResource(_, _) - ) + override def continuous( + project: ProjectRef, + selectFilter: SelectFilter, + start: Offset + ): ElemStream[GraphResource] = + elemStreaming(Scope(project), start, selectFilter, shifts.decodeGraphResource(_, _)) - override def remaining( - project: ProjectRef, - selectFilter: SelectFilter, - start: Offset - ): IO[Option[RemainingElems]] = - StreamingQuery.remaining(Scope(project), selectFilter, start, xas) - } + override def currents( + project: ProjectRef, + selectFilter: SelectFilter, + start: Offset + ): ElemStream[GraphResource] = + stopping(Scope(project), start, selectFilter, shifts.decodeGraphResource(_, _)) + + override def remaining( + project: ProjectRef, + selectFilter: SelectFilter, + start: Offset + ): IO[Option[RemainingElems]] = + elemStreaming.remaining(Scope(project), selectFilter, start) + } /** * Constructs a GraphResourceStream from an existing stream without assuring the termination of the operations. diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/deletion/ProjectDeletionCoordinatorSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/deletion/ProjectDeletionCoordinatorSuite.scala index cf384d06ff..1779328d7e 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/deletion/ProjectDeletionCoordinatorSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/deletion/ProjectDeletionCoordinatorSuite.scala @@ -22,6 +22,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityDependency.DependsOn import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectLastUpdateStore, ProjectLastUpdateStream} +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import doobie.syntax.all._ import munit.AnyFixture @@ -51,7 +53,9 @@ class ProjectDeletionCoordinatorSuite extends NexusSuite with ConfigFixtures wit override def munitFixtures: Seq[AnyFixture[_]] = List(projectFixture) - private lazy val (xas, projects) = projectFixture() + private lazy val (xas, projects) = projectFixture() + private lazy val projectLastUpdateStore = ProjectLastUpdateStore(xas) + private lazy val projectLastUpdateStream = ProjectLastUpdateStream(xas, queryConfig) private val active = ProjectRef.unsafe("org", "active") private val deprecated = ProjectRef.unsafe("org", "deprecated") @@ -78,7 +82,18 @@ class ProjectDeletionCoordinatorSuite extends NexusSuite with ConfigFixtures wit ): IO[ProjectDeletionReport.Stage] = deleted.update(_ + project).as(taskStage) } - (deleted, ProjectDeletionCoordinator(projects, Set(deletionTask), config, serviceAccount, xas, clock)) + ( + deleted, + ProjectDeletionCoordinator( + projects, + Set(deletionTask), + config, + serviceAccount, + projectLastUpdateStore, + xas, + clock + ) + ) } // Asserting partition number for both events and states @@ -102,6 +117,9 @@ class ProjectDeletionCoordinatorSuite extends NexusSuite with ConfigFixtures wit _ <- projects.deprecate(deprecated, 1) _ <- projects.create(markedAsDeleted, fields) _ <- projects.delete(markedAsDeleted, 1) + _ <- projectLastUpdateStore.save( + List(ProjectLastUpdate(markedAsDeleted, Instant.EPOCH, Offset.start)) + ) } yield () } @@ -147,6 +165,10 @@ class ProjectDeletionCoordinatorSuite extends NexusSuite with ConfigFixtures wit _ <- EntityDependencyStore .directDependencies(markedAsDeleted, entityToDelete, xas) .assertEquals(Set.empty[DependsOn]) + // Checking that the last updates have been cleared + _ <- projectLastUpdateStream(Offset.start) + .filter(_.project == markedAsDeleted) + .assertEmpty } yield () } diff --git a/delta/sourcing-psql/src/main/resources/scripts/postgres/drop/drop-tables.ddl b/delta/sourcing-psql/src/main/resources/scripts/postgres/drop/drop-tables.ddl index eac9b7e574..795658cb74 100644 --- a/delta/sourcing-psql/src/main/resources/scripts/postgres/drop/drop-tables.ddl +++ b/delta/sourcing-psql/src/main/resources/scripts/postgres/drop/drop-tables.ddl @@ -1,3 +1,4 @@ +DROP TABLE IF EXISTS public.project_last_updates; DROP TABLE IF EXISTS public.ship_original_project_context; DROP TABLE IF EXISTS public.ship_reports; DROP TABLE IF EXISTS public.global_events; diff --git a/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M03_001__project_last_updates.ddl b/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M03_001__project_last_updates.ddl index 9a53a360d7..8d0f7fa45e 100644 --- a/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M03_001__project_last_updates.ddl +++ b/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M03_001__project_last_updates.ddl @@ -2,9 +2,9 @@ CREATE TABLE IF NOT EXISTS public.project_last_updates( org text NOT NULL, project text NOT NULL, last_instant timestamptz NOT NULL, - last_state_ordering bigint NOT NULL, + last_ordering bigint NOT NULL, PRIMARY KEY(org, project) ); -CREATE INDEX IF NOT EXISTS project_last_updates_last_instant_idx ON public.project_last_updates(last_instant); +CREATE INDEX IF NOT EXISTS project_last_updates_last_ordering_idx ON public.project_last_updates(last_ordering); diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ElemQueryConfig.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ElemQueryConfig.scala new file mode 100644 index 0000000000..ba4a3f5397 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ElemQueryConfig.scala @@ -0,0 +1,62 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.config + +import pureconfig.ConfigReader +import pureconfig.generic.semiauto.deriveReader + +import scala.concurrent.duration.FiniteDuration + +/** + * Defines how to run streaming queries for elements + * - the batch size to apply for each query + * - the strategy to adopt when all elems are consumed (stopping, wait for a fixed amount of time or wait for updates + * on a project) + */ +sealed trait ElemQueryConfig { + + /** + * @return + * the maximum number of elements to fetch with one query + */ + def batchSize: Int + +} + +object ElemQueryConfig { + + /** + * Will successfully stop the stream when all elems have been consumed + */ + final case class StopConfig(batchSize: Int) extends ElemQueryConfig + + /** + * Will pause the stream for a fixed delay before executing the query again + * @param delay + * the amount of time to wait for + */ + final case class DelayConfig(batchSize: Int, delay: FiniteDuration) extends ElemQueryConfig + + /** + * Will pause the stream until a resource gets created / updated in the project the stream is running against + * @param delay + * the amount of time to wait for active projects + */ + final case class PassivationConfig(batchSize: Int, delay: FiniteDuration) extends ElemQueryConfig + + implicit final val queryConfig: ConfigReader[ElemQueryConfig] = { + val stopConfigReader: ConfigReader[StopConfig] = deriveReader[StopConfig] + val delayConfigReader: ConfigReader[DelayConfig] = deriveReader[DelayConfig] + val waitForProjectUpdateReader: ConfigReader[PassivationConfig] = deriveReader[PassivationConfig] + ConfigReader.fromCursor { cursor => + for { + obj <- cursor.asObjectCursor + tpec <- obj.atKey("type") + tpe <- ConfigReader[String].from(tpec) + config <- tpe match { + case "stop" => stopConfigReader.from(obj) + case "delay" => delayConfigReader.from(obj) + case "passivation" => waitForProjectUpdateReader.from(obj) + } + } yield config + } + } +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectLastUpdateConfig.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectLastUpdateConfig.scala index 579662d6be..9462240f0f 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectLastUpdateConfig.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectLastUpdateConfig.scala @@ -3,7 +3,9 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.config import pureconfig.ConfigReader import pureconfig.generic.semiauto.deriveReader -final case class ProjectLastUpdateConfig(batch: BatchConfig, query: QueryConfig) +import scala.concurrent.duration.FiniteDuration + +final case class ProjectLastUpdateConfig(batch: BatchConfig, query: QueryConfig, inactiveInterval: FiniteDuration) object ProjectLastUpdateConfig { implicit final val projectLastUpdateConfig: ConfigReader[ProjectLastUpdateConfig] = diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/QueryConfig.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/QueryConfig.scala index b9926e42b7..08058b63f3 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/QueryConfig.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/QueryConfig.scala @@ -13,13 +13,7 @@ import scala.concurrent.duration.FiniteDuration * @param refreshStrategy * complete the stream when all the rows have been consumed or delay and re-execute the query */ -final case class QueryConfig(batchSize: Int, refreshStrategy: RefreshStrategy) { - - /** - * Stop the existing query when all rows have been consumed - */ - def stop: QueryConfig = copy(refreshStrategy = RefreshStrategy.Stop) -} +final case class QueryConfig(batchSize: Int, refreshStrategy: RefreshStrategy) object QueryConfig { implicit final val queryConfig: ConfigReader[QueryConfig] = diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala index c749da75a0..7cbcf75209 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala @@ -83,7 +83,7 @@ object EventStreaming { cfg: QueryConfig, decode: (EntityType, Json) => IO[Option[A]] ): SuccessElemStream[A] = - StreamingQuery[Elem.SuccessElem[Json]](start, query, _.offset, cfg, xas) + StreamingQuery[Elem.SuccessElem[Json]](start, query, _.offset, cfg.refreshStrategy, xas) // evalMapFilter re-chunks to 1, the following 2 statements do the same but preserve the chunks .evalMapChunk(e => decode(e.tpe, e.value).map(_.map(a => e.copy(value = a)))) .collect { case Some(e) => e } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala index 682cecefcf..43c6fcdd6e 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala @@ -4,7 +4,6 @@ import cats.effect.IO import cats.effect.std.Semaphore import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors -import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset @@ -36,7 +35,6 @@ object Exporter { private class ExporterImpl(config: ExportConfig, semaphore: Semaphore[IO], xas: Transactors) extends Exporter { - private val queryConfig = QueryConfig(config.batchSize, RefreshStrategy.Stop) override def events(query: ExportEventQuery): IO[ExportResult] = { val projectFilter = Fragments.orOpt( query.projects.map { project => sql"(org = ${project.organization} and project = ${project.project})" } @@ -73,7 +71,7 @@ object Exporter { targetDirectory / s"${paddedOffset(o)}.json" } - StreamingQuery[RowEvent](start, query, _.ordering, queryConfig, xas) + StreamingQuery[RowEvent](start, query, _.ordering, RefreshStrategy.Stop, xas) .evalTap { rowEvent => offsetRef.set(rowEvent.ordering) } .map(_.asJson.noSpaces) .through(StreamingUtils.writeRotate(computePath, config.limitPerFile)) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStore.scala index 574e07bde7..85a2959669 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStore.scala @@ -5,13 +5,9 @@ import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate -import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate.ProjectLastUpdateMap -import doobie.Fragments import doobie.syntax.all._ import doobie.postgres.implicits._ -import java.time.Instant - /** * Keeps track of the last update on a given project */ @@ -27,16 +23,6 @@ trait ProjectLastUpdateStore { */ def save(updates: List[ProjectLastUpdate]): IO[Unit] - /** - * Fetch all updates from the database - */ - def fetchAll: IO[ProjectLastUpdateMap] - - /** - * Fetch updates older than the given instant - */ - def fetchUpdates(after: Instant): IO[ProjectLastUpdateMap] - } object ProjectLastUpdateStore { @@ -55,26 +41,13 @@ object ProjectLastUpdateStore { .void private def saveOne(p: ProjectLastUpdate) = - sql"""INSERT INTO project_last_updates (org, project, last_instant, last_state_ordering) + sql"""INSERT INTO project_last_updates (org, project, last_instant, last_ordering) |VALUES (${p.project.organization}, ${p.project.project} ,${p.lastInstant}, ${p.lastOrdering}) |ON CONFLICT (org, project) |DO UPDATE set | last_instant = EXCLUDED.last_instant, - | last_state_ordering = EXCLUDED.last_state_ordering; + | last_ordering = EXCLUDED.last_ordering; |""".stripMargin.update.run - - override def fetchAll: IO[ProjectLastUpdateMap] = fetch(None) - - override def fetchUpdates(after: Instant): IO[ProjectLastUpdateMap] = fetch(Some(after)) - - private def fetch(after: Option[Instant]) = { - val afterFragment = after.map { a => fr"last_instant > $a" } - sql"""SELECT * from project_last_updates ${Fragments.whereAndOpt(afterFragment)}""" - .query[ProjectLastUpdate] - .map { plu => plu.project -> plu } - .toMap - .transact(xas.read) - } } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStream.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStream.scala new file mode 100644 index 0000000000..572742c7db --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStream.scala @@ -0,0 +1,37 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.projections + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate +import doobie.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.sourcing.query.StreamingQuery +import fs2.Stream + +trait ProjectLastUpdateStream { + + /** + * Stream updates from the database + */ + def apply(offset: Offset): Stream[IO, ProjectLastUpdate] + +} + +object ProjectLastUpdateStream { + + def apply(xas: Transactors, config: QueryConfig): ProjectLastUpdateStream = + (offset: Offset) => + StreamingQuery[ProjectLastUpdate]( + offset, + o => sql"""SELECT * + |FROM project_last_updates + |WHERE last_ordering > $o + |ORDER BY last_ordering ASC + |LIMIT ${config.batchSize}""".stripMargin.query[ProjectLastUpdate], + _.lastOrdering, + config.refreshStrategy, + xas + ) + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStore.scala index 80bed63e5b..9aa93e0aa4 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStore.scala @@ -45,17 +45,16 @@ final class ProjectionRestartStore(xas: Transactors, config: QueryConfig) { .void def stream(offset: Offset): Stream[IO, (Offset, ProjectionRestart)] = - StreamingQuery - .apply[(Offset, ProjectionRestart)]( - offset, - o => sql"""SELECT ordering, value, instant from public.projection_restarts + StreamingQuery[(Offset, ProjectionRestart)]( + offset, + o => sql"""SELECT ordering, value, instant from public.projection_restarts |WHERE ordering > $o and acknowledged = false |ORDER BY ordering ASC |LIMIT ${config.batchSize}""".stripMargin.query[(Offset, ProjectionRestart)], - _._1, - config, - xas - ) + _._1, + config.refreshStrategy, + xas + ) } object ProjectionRestartStore { diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/model/ProjectLastUpdate.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/model/ProjectLastUpdate.scala index a4614ad0a7..acf19aa5fd 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/model/ProjectLastUpdate.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/model/ProjectLastUpdate.scala @@ -11,8 +11,6 @@ final case class ProjectLastUpdate(project: ProjectRef, lastInstant: Instant, la object ProjectLastUpdate { - type ProjectLastUpdateMap = Map[ProjectRef, ProjectLastUpdate] - implicit val projectLastUpdateRead: Read[ProjectLastUpdate] = Read[(Label, Label, Instant, Offset)].map { case (org, project, instant, offset) => ProjectLastUpdate(ProjectRef(org, project), instant, offset) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/ElemStreaming.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/ElemStreaming.scala new file mode 100644 index 0000000000..8f142ca7fc --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/ElemStreaming.scala @@ -0,0 +1,236 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.query + +import cats.effect.IO +import doobie.Fragments +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.sourcing.config.ElemQueryConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.query.ElemStreaming.{logger, newState} +import ch.epfl.bluebrain.nexus.delta.sourcing.query.StreamingQuery.{logQuery, stateFilter, typesSqlArray} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, SuccessElem} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, ProjectActivitySignals, RemainingElems} +import doobie.syntax.all._ +import doobie.postgres.implicits._ +import doobie.util.query.Query0 +import fs2.{Chunk, Stream} +import io.circe.Json + +import java.time.Instant +import scala.collection.mutable.ListBuffer +import scala.concurrent.duration.FiniteDuration + +/** + * Allow to stream elements from the database providing advanced configuration than the regular StreamingQuery + * operations + */ +final class ElemStreaming(xas: Transactors, queryConfig: ElemQueryConfig, activitySignals: ProjectActivitySignals) { + + private val batchSize = queryConfig.batchSize + + /** + * The stopping alternative for this elem streaming + */ + def stopping: ElemStreaming = ElemStreaming.stopping(xas, batchSize) + + /** + * Get information about the remaining elements to stream + * @param scope + * the scope for the query + * @param selectFilter + * what to filter for + * @param start + * the offset to start from + */ + def remaining(scope: Scope, selectFilter: SelectFilter, start: Offset): IO[Option[RemainingElems]] = + StreamingQuery.remaining(scope, selectFilter, start, xas) + + /** + * Streams states and tombstones as [[Elem]] s without fetching the state value. + * + * Tombstones are translated as [[DroppedElem]]. + * + * The stream termination depends on the provided [[ElemQueryConfig]] + * + * @param scope + * the scope of the states / tombstones + * @param start + * the offset to start with + * @param selectFilter + * what to filter for + */ + def apply( + scope: Scope, + start: Offset, + selectFilter: SelectFilter + ): Stream[IO, Elem[Unit]] = { + def query(offset: Offset): Query0[Elem[Unit]] = { + sql"""((SELECT 'newState', type, id, org, project, instant, ordering, rev + |FROM public.scoped_states + |${stateFilter(scope, offset, selectFilter)} + |ORDER BY ordering + |LIMIT $batchSize) + |UNION ALL + |(SELECT 'tombstone', type, id, org, project, instant, ordering, -1 + |FROM public.scoped_tombstones + |${tombstoneFilter(scope, offset, selectFilter)} + |ORDER BY ordering + |LIMIT $batchSize) + |ORDER BY ordering) + |LIMIT $batchSize + |""".stripMargin.query[(String, EntityType, Iri, Label, Label, Instant, Long, Int)].map { + case (`newState`, entityType, id, org, project, instant, offset, rev) => + SuccessElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), (), rev) + case (_, entityType, id, org, project, instant, offset, rev) => + DroppedElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), rev) + } + } + val refresh: RefreshOrStop = RefreshOrStop(scope, queryConfig, activitySignals) + execute[Unit](start, query, refresh) + } + + /** + * Streams states and tombstones as [[Elem]] s. + * + * State values are decoded via the provided function. If the function succeeds they will be streamed as + * [[SuccessElem[A]] ]. If the function fails, they will be streamed as FailedElem + * + * Tombstones are translated as [[DroppedElem]]. + * + * The stream termination depends on the provided [[ElemQueryConfig]] + * + * @param scope + * the scope for the query + * @param start + * the offset to start with + * @param selectFilter + * what to filter for + * @param decodeValue + * the function to decode states + */ + def apply[A]( + scope: Scope, + start: Offset, + selectFilter: SelectFilter, + decodeValue: (EntityType, Json) => IO[A] + ): Stream[IO, Elem[A]] = { + def query(offset: Offset): Query0[Elem[Json]] = { + sql"""((SELECT 'newState', type, id, org, project, value, instant, ordering, rev + |FROM public.scoped_states + |${stateFilter(scope, offset, selectFilter)} + |ORDER BY ordering + |LIMIT $batchSize) + |UNION ALL + |(SELECT 'tombstone', type, id, org, project, null, instant, ordering, -1 + |FROM public.scoped_tombstones + |${tombstoneFilter(scope, offset, selectFilter)} + |ORDER BY ordering + |LIMIT $batchSize) + |ORDER BY ordering) + |LIMIT $batchSize + |""".stripMargin.query[(String, EntityType, Iri, Label, Label, Option[Json], Instant, Long, Int)].map { + case (`newState`, entityType, id, org, project, Some(json), instant, offset, rev) => + SuccessElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), json, rev) + case (_, entityType, id, org, project, _, instant, offset, rev) => + DroppedElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), rev) + } + } + + val refresh: RefreshOrStop = RefreshOrStop(scope, queryConfig, activitySignals) + execute[Json](start, query, refresh) + .evalMapChunk { e => + e.evalMap { value => + decodeValue(e.tpe, value).onError { err => + logger.error(err)( + s"An error occurred while decoding value with id '${e.id}' of type '${e.tpe}' in '$scope'." + ) + } + } + } + } + + /** + * Streams the results of a query starting with the provided offset. + * + * The stream termination depends on the provided [[ElemQueryConfig]]. + * + * @param start + * the offset to start with + * @param query + * the query to execute depending on the offset + * @param refresh + * whether to continue or stop after the stream completion + */ + private def execute[A]( + start: Offset, + query: Offset => Query0[Elem[A]], + refresh: RefreshOrStop + ): Stream[IO, Elem[A]] = { + def onRefresh(offset: Offset): IO[Option[(Chunk[Elem[A]], Offset)]] = refresh.run.map { result => + Option.when(result != RefreshOrStop.Outcome.Stopped)(Chunk.empty[Elem[A]] -> offset) + } + Stream + .unfoldChunkEval[IO, Offset, Elem[A]](start) { offset => + query(offset).to[List].transact(xas.streaming).flatMap { elems => + elems.lastOption.fold(onRefresh(offset)) { last => + IO.pure(Some((dropDuplicates(elems), last.offset))) + } + } + } + .onFinalizeCase(logQuery(query(start))) + } + + // Looks for duplicates and keep the last occurrence + private def dropDuplicates[A](elems: List[Elem[A]]): Chunk[Elem[A]] = { + val (_, buffer) = elems.foldRight((Set.empty[(ProjectRef, Iri)], new ListBuffer[Elem[A]])) { + case (elem, (seen, buffer)) => + val key = (elem.project, elem.id) + if (seen.contains(key)) + (seen, buffer) + else + (seen + key, buffer.prepend(elem)) + } + Chunk.from(buffer) + } + + private def tombstoneFilter(scope: Scope, offset: Offset, selectFilter: SelectFilter) = { + val typeFragment = + selectFilter.types.asRestrictedTo.map(includedTypes => fr"cause -> 'types' ??| ${typesSqlArray(includedTypes)}") + val causeFragment = Fragments.orOpt(Some(fr"cause->>'deleted' = 'true'"), typeFragment) + Fragments.whereAndOpt( + scope.asFragment, + offset.asFragment, + selectFilter.tag.asFragment, + causeFragment + ) + } + +} + +object ElemStreaming { + + private val logger = Logger[ElemStreaming] + + private val newState = "newState" + + /** + * Constructs an elem streaming with a stopping strategy + */ + def stopping(xas: Transactors, batchSize: Int): ElemStreaming = { + val eqc = ElemQueryConfig.StopConfig(batchSize) + val signals = ProjectActivitySignals.noop + new ElemStreaming(xas, eqc, signals) + } + + /** + * Constructs an elem streaming with a delay strategy + */ + def delay(xas: Transactors, batchSize: Int, delay: FiniteDuration): ElemStreaming = { + val eqc = ElemQueryConfig.DelayConfig(batchSize, delay) + val signals = ProjectActivitySignals.noop + new ElemStreaming(xas, eqc, signals) + } +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStop.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStop.scala new file mode 100644 index 0000000000..cd5dcf433c --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStop.scala @@ -0,0 +1,76 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.query + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.sourcing.Scope +import ch.epfl.bluebrain.nexus.delta.sourcing.config.ElemQueryConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.config.ElemQueryConfig.{DelayConfig, PassivationConfig, StopConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshOrStop.Outcome +import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshOrStop.Outcome._ +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectActivitySignals +import fs2.Stream + +/** + * Computes the outcome to apply when all elements are consumed by a projection + */ +trait RefreshOrStop { + def run: IO[Outcome] +} + +object RefreshOrStop { + + sealed trait Outcome + + object Outcome { + case object Stopped extends Outcome + sealed trait Continue extends Outcome + case object Delayed extends Continue + case object NoSignal extends Continue + case object DelayedPassivation extends Continue + case object Passivated extends Continue + } + + private val logger = Logger[RefreshOrStop.type] + + /** + * Generates an instance from the provided parameters + * @param scope + * scope of the projection (ex: root/project) + * @param config + * the elem query configuration of the projection + * @param activitySignals + * the activity signals (used only in the case of a passivation config) + */ + def apply(scope: Scope, config: ElemQueryConfig, activitySignals: ProjectActivitySignals): RefreshOrStop = + new RefreshOrStop { + override def run: IO[Outcome] = { + (config, scope) match { + case (_: StopConfig, _) => IO.pure(Stopped) + case (d: DelayConfig, _) => IO.sleep(d.delay).as(Delayed) + case (w: PassivationConfig, Scope.Project(project)) => + activitySignals.apply(project).flatMap { + case Some(signal) => + signal.get.flatMap { + case true => + logger.debug(s"Project '$project' is active, continue after ${w.delay}") >> + IO.sleep(w.delay).as(DelayedPassivation) + case false => + logger.info(s"Project '$project' is inactive, pausing until some activity is seen again.") >> + Stream.never[IO].interruptWhen(signal).compile.drain >> + logger + .info(s"An update has been detected on project '$project', querying will resume.") + .as(Passivated) + } + case None => + logger.debug(s"No signal has been found for project '$project', continue after ${w.delay}") >> IO + .sleep(w.delay) + .as(NoSignal) + } + case (c, s) => + // Passivation is only available at the project scope + IO.raiseError(new IllegalStateException(s"'$c' and '$s' is not a valid combination, it should not happen")) + } + } + } + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala index ce88559792..3cdb88c5e2 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala @@ -2,24 +2,19 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.query import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} -import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, IriFilter, Label, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.IriFilter import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, SuccessElem} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems import doobie.Fragments import doobie.syntax.all._ import doobie.postgres.implicits._ import doobie.util.fragment.Fragment import doobie.util.query.Query0 import fs2.{Chunk, Stream} -import io.circe.Json import java.time.Instant -import scala.collection.mutable.ListBuffer import cats.effect.kernel.Resource /** @@ -29,8 +24,6 @@ object StreamingQuery { private val logger = Logger[StreamingQuery.type] - private val newState = "newState" - /** * Get information about the remaining elements to stream * @param scope @@ -58,119 +51,6 @@ object StreamingQuery { .transact(xas.read) } - /** - * Streams states and tombstones as [[Elem]] s without fetching the state value. - * - * Tombstones are translated as [[DroppedElem]]. - * - * The stream termination depends on the provided [[QueryConfig]] - * - * @param project - * the project of the states / tombstones - * @param start - * the offset to start with - * @param selectFilter - * what to filter for - * @param cfg - * the query config - * @param xas - * the transactors - */ - def elems( - scope: Scope, - start: Offset, - selectFilter: SelectFilter, - cfg: QueryConfig, - xas: Transactors - ): Stream[IO, Elem[Unit]] = { - def query(offset: Offset): Query0[Elem[Unit]] = { - sql"""((SELECT 'newState', type, id, org, project, instant, ordering, rev - |FROM public.scoped_states - |${stateFilter(scope, offset, selectFilter)} - |ORDER BY ordering - |LIMIT ${cfg.batchSize}) - |UNION ALL - |(SELECT 'tombstone', type, id, org, project, instant, ordering, -1 - |FROM public.scoped_tombstones - |${tombstoneFilter(scope, offset, selectFilter)} - |ORDER BY ordering - |LIMIT ${cfg.batchSize}) - |ORDER BY ordering) - |LIMIT ${cfg.batchSize} - |""".stripMargin.query[(String, EntityType, Iri, Label, Label, Instant, Long, Int)].map { - case (`newState`, entityType, id, org, project, instant, offset, rev) => - SuccessElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), (), rev) - case (_, entityType, id, org, project, instant, offset, rev) => - DroppedElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), rev) - } - } - StreamingQuery[Elem[Unit], Iri](start, query, _.offset, _.id, cfg, xas) - } - - /** - * Streams states and tombstones as [[Elem]] s. - * - * State values are decoded via the provided function. If the function succeeds they will be streamed as - * [[SuccessElem[A]] ]. If the function fails, they will be streamed as FailedElem - * - * Tombstones are translated as [[DroppedElem]]. - * - * The stream termination depends on the provided [[QueryConfig]] - * - * @param scope - * the scope for the query - * @param start - * the offset to start with - * @param selectFilter - * what to filter for - * @param cfg - * the query config - * @param xas - * the transactors - * @param decodeValue - * the function to decode states - */ - def elems[A]( - scope: Scope, - start: Offset, - selectFilter: SelectFilter, - cfg: QueryConfig, - xas: Transactors, - decodeValue: (EntityType, Json) => IO[A] - ): Stream[IO, Elem[A]] = { - def query(offset: Offset): Query0[Elem[Json]] = { - sql"""((SELECT 'newState', type, id, org, project, value, instant, ordering, rev - |FROM public.scoped_states - |${stateFilter(scope, offset, selectFilter)} - |ORDER BY ordering - |LIMIT ${cfg.batchSize}) - |UNION ALL - |(SELECT 'tombstone', type, id, org, project, null, instant, ordering, -1 - |FROM public.scoped_tombstones - |${tombstoneFilter(scope, offset, selectFilter)} - |ORDER BY ordering - |LIMIT ${cfg.batchSize}) - |ORDER BY ordering) - |LIMIT ${cfg.batchSize} - |""".stripMargin.query[(String, EntityType, Iri, Label, Label, Option[Json], Instant, Long, Int)].map { - case (`newState`, entityType, id, org, project, Some(json), instant, offset, rev) => - SuccessElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), json, rev) - case (_, entityType, id, org, project, _, instant, offset, rev) => - DroppedElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), rev) - } - } - StreamingQuery[Elem[Json], Iri](start, query, _.offset, _.id, cfg, xas) - .evalMapChunk { e => - e.evalMap { value => - decodeValue(e.tpe, value).onError { err => - logger.error(err)( - s"An error occurred while decoding value with id '${e.id}' of type '${e.tpe}' in '$scope'." - ) - } - } - } - } - /** * Streams the results of a query starting with the provided offset. * @@ -182,8 +62,8 @@ object StreamingQuery { * the query to execute depending on the offset * @param extractOffset * how to extract the offset from an [[A]] to be able to pursue the stream - * @param cfg - * the query config + * @param refreshStrategy + * the refresh strategy * @param xas * the transactors */ @@ -191,74 +71,26 @@ object StreamingQuery { start: Offset, query: Offset => Query0[A], extractOffset: A => Offset, - cfg: QueryConfig, + refreshStrategy: RefreshStrategy, xas: Transactors ): Stream[IO, A] = Stream .unfoldChunkEval[IO, Offset, A](start) { offset => query(offset).accumulate[Chunk].transact(xas.streaming).flatMap { elems => - elems.last.fold(refreshOrStop[A](cfg, offset)) { last => + elems.last.fold(refreshOrStop[A](refreshStrategy, offset)) { last => IO.pure(Some((elems, extractOffset(last)))) } } } .onFinalizeCase(logQuery(query(start))) - /** - * Streams the results of a query starting with the provided offset. - * - * The stream termination depends on the provided [[QueryConfig]]. - * - * @param start - * the offset to start with - * @param query - * the query to execute depending on the offset - * @param extractOffset - * how to extract the offset from an [[A]] to be able to pursue the stream - * @param extractId - * how to extract an id from an [[A]] to look for duplicates - * @param cfg - * the query config - * @param xas - * the transactors - */ - def apply[A, K]( - start: Offset, - query: Offset => Query0[A], - extractOffset: A => Offset, - extractId: A => K, - cfg: QueryConfig, - xas: Transactors - ): Stream[IO, A] = - Stream - .unfoldChunkEval[IO, Offset, A](start) { offset => - query(offset).to[List].transact(xas.streaming).flatMap { elems => - elems.lastOption.fold(refreshOrStop[A](cfg, offset)) { last => - IO.pure(Some((dropDuplicates(elems, extractId), extractOffset(last)))) - } - } - } - .onFinalizeCase(logQuery(query(start))) - - private def refreshOrStop[A](cfg: QueryConfig, offset: Offset): IO[Option[(Chunk[A], Offset)]] = - cfg.refreshStrategy match { + private def refreshOrStop[A](refreshStrategy: RefreshStrategy, offset: Offset): IO[Option[(Chunk[A], Offset)]] = + refreshStrategy match { case RefreshStrategy.Stop => IO.none case RefreshStrategy.Delay(value) => IO.sleep(value) >> IO.pure(Some((Chunk.empty[A], offset))) } - // Looks for duplicates and keep the last occurrence - private[query] def dropDuplicates[A, K](elems: List[A], f: A => K): Chunk[A] = { - val (_, buffer) = elems.foldRight((Set.empty[K], new ListBuffer[A])) { case (x, (seen, buffer)) => - val key = f(x) - if (seen.contains(key)) - (seen, buffer) - else - (seen + key, buffer.prepend(x)) - } - Chunk.from(buffer) - } - - private def logQuery[A](query: Query0[A]): Resource.ExitCase => IO[Unit] = { + def logQuery[A](query: Query0[A]): Resource.ExitCase => IO[Unit] = { case Resource.ExitCase.Succeeded => logger.debug(s"Reached the end of the single evaluation of query '${query.sql}'.") case Resource.ExitCase.Errored(cause) => @@ -267,7 +99,7 @@ object StreamingQuery { logger.debug(s"Reached the end of the single evaluation of query '${query.sql}'.") } - private def stateFilter(scope: Scope, offset: Offset, selectFilter: SelectFilter) = { + def stateFilter(scope: Scope, offset: Offset, selectFilter: SelectFilter) = { val typeFragment = selectFilter.types.asRestrictedTo.map(restriction => fr"value -> 'types' ??| ${typesSqlArray(restriction)}") Fragments.whereAndOpt( @@ -279,19 +111,7 @@ object StreamingQuery { ) } - private def tombstoneFilter(scope: Scope, offset: Offset, selectFilter: SelectFilter) = { - val typeFragment = - selectFilter.types.asRestrictedTo.map(includedTypes => fr"cause -> 'types' ??| ${typesSqlArray(includedTypes)}") - val causeFragment = Fragments.orOpt(Some(fr"cause->>'deleted' = 'true'"), typeFragment) - Fragments.whereAndOpt( - scope.asFragment, - offset.asFragment, - selectFilter.tag.asFragment, - causeFragment - ) - } - - private def typesSqlArray(includedTypes: IriFilter.Include): Fragment = + def typesSqlArray(includedTypes: IriFilter.Include): Fragment = Fragment.const(s"ARRAY[${includedTypes.iris.map(t => s"'$t'").mkString(",")}]") } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala index 438785d9c3..f23872cecd 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala @@ -122,7 +122,7 @@ object GlobalStateStore { |ORDER BY ordering |LIMIT ${config.batchSize}""".stripMargin.query[(S, Long)], { case (_, offset: Long) => Offset.at(offset) }, - config.copy(refreshStrategy = strategy), + refreshStrategy = strategy, xas ).map { case (value, _) => value } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala index fd5e388bc3..9dd99fbe3a 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala @@ -247,7 +247,7 @@ object ScopedStateStore { |ORDER BY ordering |LIMIT ${config.batchSize}""".stripMargin.query[Elem.SuccessElem[S]], _.offset, - config.copy(refreshStrategy = strategy), + strategy, xas ) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectActivitySignals.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectActivitySignals.scala new file mode 100644 index 0000000000..55c11d14b4 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectActivitySignals.scala @@ -0,0 +1,128 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.effect.{Clock, IO} +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectLastUpdateStream +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem +import fs2.Pipe +import fs2.Stream +import fs2.concurrent.SignallingRef + +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +/** + * Return signals indicating activity signals in the different projects + */ +trait ProjectActivitySignals { + + /** + * Return the activity signal for the given project + */ + def apply(project: ProjectRef): IO[Option[SignallingRef[IO, Boolean]]] + + /** + * Returns a snapshot of the activity status of the different projects + */ + def activityMap: IO[Map[ProjectRef, Boolean]] + +} + +object ProjectActivitySignals { + + val noop: ProjectActivitySignals = new ProjectActivitySignals { + override def apply(project: ProjectRef): IO[Option[SignallingRef[IO, Boolean]]] = IO.none + + override def activityMap: IO[Map[ProjectRef, Boolean]] = IO.pure(Map.empty) + } + + private def refresh( + signals: ProjectSignals[ProjectLastUpdate], + clock: Clock[IO], + inactiveInterval: FiniteDuration, + updated: Map[ProjectRef, ProjectLastUpdate] + ) = + clock.realTimeInstant.flatMap { now => + val inactivityThreshold = now.minusSeconds(inactiveInterval.toSeconds) + signals.refresh(updated, _.lastInstant.isAfter(inactivityThreshold)) + } + + private def refreshStream( + signals: ProjectSignals[ProjectLastUpdate], + clock: Clock[IO], + inactiveInterval: FiniteDuration + ) = + Stream.awakeEvery[IO](1.second).evalTap { _ => + refresh(signals, clock, inactiveInterval, Map.empty) + } + + private[stream] def signalPipe( + signals: ProjectSignals[ProjectLastUpdate], + clock: Clock[IO], + inactiveInterval: FiniteDuration + ): Pipe[IO, ProjectLastUpdate, ProjectLastUpdate] = + _.groupWithin(10, 100.millis) + .evalTap { chunk => + val map = chunk.foldLeft(Map.empty[ProjectRef, ProjectLastUpdate]) { case (acc, update) => + acc.updated(update.project, update) + } + refresh(signals, clock, inactiveInterval, map) + } + .unchunks + .concurrently(refreshStream(signals, clock, inactiveInterval)) + + // $COVERAGE-OFF$ + private val projectionMetadata: ProjectionMetadata = + ProjectionMetadata("system", "project-activity-signals", None, None) + + private val entityType: EntityType = EntityType("project-activity-signals") + + private def lastUpdatesId(project: ProjectRef): Iri = nxv + s"projection/project-activity-signals/$project" + + private def successElem(lastUpdate: ProjectLastUpdate): SuccessElem[Unit] = + SuccessElem( + entityType, + lastUpdatesId(lastUpdate.project), + lastUpdate.project, + lastUpdate.lastInstant, + lastUpdate.lastOrdering, + (), + 1 + ) + + def apply( + supervisor: Supervisor, + stream: ProjectLastUpdateStream, + clock: Clock[IO], + inactiveInterval: FiniteDuration + ): IO[ProjectActivitySignals] = { + + for { + signals <- ProjectSignals[ProjectLastUpdate] + compiled = + CompiledProjection.fromStream( + projectionMetadata, + ExecutionStrategy.EveryNode, + (offset: Offset) => + stream(offset) + .through(signalPipe(signals, clock, inactiveInterval)) + .map(successElem) + ) + _ <- supervisor.run(compiled) + } yield apply(signals) + } + // $COVERAGE-ON$ + + def apply(signals: ProjectSignals[ProjectLastUpdate]): ProjectActivitySignals = + new ProjectActivitySignals { + override def apply(project: ProjectRef): IO[Option[SignallingRef[IO, Boolean]]] = signals.get(project) + + /** + * Returns a snapshot of the activity status of the different projects + */ + override def activityMap: IO[Map[ProjectRef, Boolean]] = signals.activityMap + } +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdateProjection.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdateProjection.scala deleted file mode 100644 index 9326213d3b..0000000000 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdateProjection.scala +++ /dev/null @@ -1,70 +0,0 @@ -package ch.epfl.bluebrain.nexus.delta.sourcing.stream - -import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} -import ch.epfl.bluebrain.nexus.delta.sourcing.config.{BatchConfig, QueryConfig} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectLastUpdateStore -import ch.epfl.bluebrain.nexus.delta.sourcing.query.{SelectFilter, StreamingQuery} - -trait ProjectLastUpdateProjection - -// $COVERAGE-OFF$ -object ProjectLastUpdateProjection { - - private val projectionMetadata: ProjectionMetadata = ProjectionMetadata("system", "project-last-updates", None, None) - - // We need a value to return to Distage - private val dummy = new ProjectLastUpdateProjection {} - - /** - * Creates a projection allowing to compute the last instant and the last ordering values for every project. - * - * This value is then used for passivation and the reactivation of other projections (ex: those related to indexing) - * - * @param supervisor - * the supervisor which will supervise the projection - * @param store - * the store allowing to fetch and save project last updates - * @param xas - * doobie - * @param batchConfig - * a batch configuration for the sink - * @param queryConfig - * query config for fetching scoped states - */ - def apply( - supervisor: Supervisor, - store: ProjectLastUpdateStore, - xas: Transactors, - batchConfig: BatchConfig, - queryConfig: QueryConfig - ): IO[ProjectLastUpdateProjection] = { - val elemStream = (offset: Offset) => StreamingQuery.elems(Scope.root, offset, SelectFilter.latest, queryConfig, xas) - apply(supervisor, store, elemStream, batchConfig) - } - - def apply( - supervisor: Supervisor, - store: ProjectLastUpdateStore, - elemStream: Offset => ElemStream[Unit], - batchConfig: BatchConfig - ): IO[ProjectLastUpdateProjection] = { - val source = Source { (offset: Offset) => elemStream(offset) } - - for { - sink <- ProjectLastUpdatesSink(store, batchConfig.maxElements, batchConfig.maxInterval) - compiledProjection = CompiledProjection.compile( - projectionMetadata, - ExecutionStrategy.PersistentSingleNode, - source, - sink - ) - projection <- IO.fromEither(compiledProjection) - _ <- supervisor.run(projection) - } yield dummy - } - -} -// $COVERAGE-ON$ diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdateWrites.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdateWrites.scala new file mode 100644 index 0000000000..b8f2a2455a --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdateWrites.scala @@ -0,0 +1,71 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectLastUpdateStore +import ch.epfl.bluebrain.nexus.delta.sourcing.query.{ElemStreaming, SelectFilter} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} + +trait ProjectLastUpdateWrites + +// $COVERAGE-OFF$ +object ProjectLastUpdateWrites { + + private val projectionMetadata: ProjectionMetadata = + ProjectionMetadata("system", "project-last-updates-writes", None, None) + + // We need a value to return to Distage + private val dummy = new ProjectLastUpdateWrites {} + + /** + * Creates a projection allowing to compute the last instant and the last ordering values for every project. + * + * This value is then used for passivation and the reactivation of other projections (ex: those related to indexing) + * + * @param supervisor + * the supervisor which will supervise the projection + * @param store + * the store allowing to fetch and save project last updates + * @param xas + * doobie + * @param batchConfig + * a batch configuration for fetching the elems and for the sink + */ + def apply( + supervisor: Supervisor, + store: ProjectLastUpdateStore, + xas: Transactors, + batchConfig: BatchConfig + ): IO[ProjectLastUpdateWrites] = { + // We build an elem streaming based on a delay + val es = ElemStreaming.delay(xas, batchConfig.maxElements, batchConfig.maxInterval) + val elemStream = (offset: Offset) => es(Scope.root, offset, SelectFilter.latest) + apply(supervisor, store, elemStream, batchConfig) + } + + def apply( + supervisor: Supervisor, + store: ProjectLastUpdateStore, + elemStream: Offset => ElemStream[Unit], + batchConfig: BatchConfig + ): IO[ProjectLastUpdateWrites] = { + val source = Source { (offset: Offset) => elemStream(offset) } + val sink = ProjectLastUpdatesSink(store, batchConfig.maxElements, batchConfig.maxInterval) + val compiledProjection = CompiledProjection.compile( + projectionMetadata, + ExecutionStrategy.PersistentSingleNode, + source, + sink + ) + + IO.fromEither(compiledProjection) + .flatMap { projection => + supervisor.run(projection) + } + .as(dummy) + } + +} +// $COVERAGE-ON$ diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdatesSink.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdatesSink.scala index 3cca8989e0..6f0371939d 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdatesSink.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdatesSink.scala @@ -1,13 +1,11 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream -import cats.effect.{IO, Ref} -import cats.implicits.catsSyntaxPartialOrder +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.CollectionUtils.quote import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectLastUpdateStore import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate -import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate.ProjectLastUpdateMap import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectLastUpdatesSink.logger import fs2.Chunk @@ -19,9 +17,6 @@ import scala.concurrent.duration.FiniteDuration * Sink that computes new project last updates from the elem stream and push them to the database * @param store * the store to insert/update to the database - * @param mapRef - * a in-memory representation of the values of all the project This value is populated from the database at startup - * and kept in sync with the db while processing elems * @param chunkSize * the maximum number of elems to be processed at once * @param maxWindow @@ -29,7 +24,6 @@ import scala.concurrent.duration.FiniteDuration */ final class ProjectLastUpdatesSink( store: ProjectLastUpdateStore, - mapRef: Ref[IO, ProjectLastUpdateMap], override val chunkSize: Int, override val maxWindow: FiniteDuration ) extends Sink { @@ -40,26 +34,19 @@ final class ProjectLastUpdatesSink( override def inType: Typeable[Unit] = Typeable[Unit] override def apply(elements: Chunk[Elem[Unit]]): IO[Chunk[Elem[Unit]]] = { + val updates = computeUpdates(elements) for { - map <- mapRef.get - updates = computeUpdates(elements, map) - _ <- store.save(updates.values.toList) - _ <- mapRef.update(_ ++ updates) - _ <- logger.debug(s"Last updates have been computed for projects: ${quote(updates.keySet)}") + _ <- store.save(updates.values.toList) + _ <- logger.debug(s"Last updates have been computed for projects: ${quote(updates.keySet)}") } yield elements } private def computeUpdates( - elements: Chunk[Elem[Unit]], - map: Map[ProjectRef, ProjectLastUpdate] + elements: Chunk[Elem[Unit]] ): Map[ProjectRef, ProjectLastUpdate] = elements.foldLeft(Map.empty[ProjectRef, ProjectLastUpdate]) { case (acc, elem) => - val candidate = ProjectLastUpdate(elem.project, elem.instant, elem.offset) - map.get(candidate.project) match { - case Some(current) if current.lastOrdering > candidate.lastOrdering => acc - case Some(_) => acc.updated(candidate.project, candidate) - case None => acc.updated(candidate.project, candidate) - } + val newValue = ProjectLastUpdate(elem.project, elem.instant, elem.offset) + acc.updated(newValue.project, newValue) } } @@ -67,9 +54,7 @@ object ProjectLastUpdatesSink { private val logger = Logger[ProjectLastUpdatesSink] - def apply(store: ProjectLastUpdateStore, chunkSize: Int, maxWindow: FiniteDuration): IO[ProjectLastUpdatesSink] = - Ref.ofEffect(store.fetchAll).map { mapRef => - new ProjectLastUpdatesSink(store, mapRef, chunkSize, maxWindow) - } + def apply(store: ProjectLastUpdateStore, chunkSize: Int, maxWindow: FiniteDuration): ProjectLastUpdatesSink = + new ProjectLastUpdatesSink(store, chunkSize, maxWindow) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectSignals.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectSignals.scala new file mode 100644 index 0000000000..652e173be2 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectSignals.scala @@ -0,0 +1,62 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.effect.IO +import cats.effect.std.AtomicCell +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectSignals.emptySignals +import fs2.concurrent.SignallingRef + +/** + * Keeps track of a list of project signals based on given values + */ +final class ProjectSignals[A]( + values: AtomicCell[IO, Map[ProjectRef, A]], + signals: AtomicCell[IO, Map[ProjectRef, SignallingRef[IO, Boolean]]] +) { + + /** + * Return the signal for the given project + */ + def get(project: ProjectRef): IO[Option[SignallingRef[IO, Boolean]]] = + signals.get.map(_.get(project)) + + /** + * Push updates for values and recompute the signals for all values with the new predicate + */ + def refresh(updates: Map[ProjectRef, A], predicate: A => Boolean): IO[Unit] = + values.updateAndGet(_ ++ updates).flatMap { updatedValues => + signals.evalUpdate { signals => + updatedValues.toList.foldLeftM(emptySignals) { case (acc, (project, value)) => + val b = predicate(value) + signals.get(project) match { + case Some(signal) => signal.set(b).as(acc.updated(project, signal)) + case None => + SignallingRef.of[IO, Boolean](b).map(acc.updated(project, _)) + } + } + }.void + } + + /** + * Return a snapshot of the signals for all the projects + */ + def activityMap: IO[Map[ProjectRef, Boolean]] = + signals.get.flatMap { signals => + signals.toList.foldLeftM(Map.empty[ProjectRef, Boolean]) { case (acc, (project, signal)) => + signal.get.map(acc.updated(project, _)) + } + } +} + +object ProjectSignals { + + val emptySignals = Map.empty[ProjectRef, SignallingRef[IO, Boolean]] + + def apply[A]: IO[ProjectSignals[A]] = + for { + values <- AtomicCell[IO].of(Map.empty[ProjectRef, A]) + signals <- AtomicCell[IO].of(emptySignals) + } yield new ProjectSignals[A](values, signals) + +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStoreSuite.scala index 8268b86f1a..84fd01baba 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectLastUpdateStoreSuite.scala @@ -1,9 +1,11 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.projections +import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate +import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import munit.AnyFixture @@ -14,8 +16,9 @@ class ProjectLastUpdateStoreSuite extends NexusSuite with Doobie.Fixture { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) - private lazy val xas = doobie() - private lazy val store = ProjectLastUpdateStore(xas) + private lazy val xas = doobie() + private lazy val store = ProjectLastUpdateStore(xas) + private lazy val stream = ProjectLastUpdateStream(xas, QueryConfig(10, RefreshStrategy.Stop)) private val now = Instant.now().truncatedTo(ChronoUnit.SECONDS) @@ -26,23 +29,22 @@ class ProjectLastUpdateStoreSuite extends NexusSuite with Doobie.Fixture { val project2 = ProjectRef.unsafe("org", "proj2") val lastProject2 = ProjectLastUpdate(project2, now.minusSeconds(2L), Offset.at(123L)) - val threshold = now.minusSeconds(10L) - for { // Init _ <- store.save(List(lastProject1, lastProject2)) - expectedAll = Map(project1 -> lastProject1, project2 -> lastProject2) - _ <- store.fetchAll.assertEquals(expectedAll) - expectedAfter = Map(project2 -> lastProject2) - _ <- store.fetchUpdates(threshold).assertEquals(expectedAfter) + expectedAll = List(lastProject1, lastProject2) + _ <- stream(Offset.start).assert(expectedAll) + offset42 = Offset.at(42L) + expectedAfter42 = List(lastProject2) + _ <- stream(offset42).assert(expectedAfter42) // Update _ <- store.save(List(lastProject1Updated)) - expectedAfterUpdate = Map(project1 -> lastProject1Updated, project2 -> lastProject2) - _ <- store.fetchUpdates(threshold).assertEquals(expectedAfterUpdate) + expectedAfterUpdate = List(lastProject1Updated, lastProject2) + _ <- stream(offset42).assert(expectedAfterUpdate) // Deletion _ <- store.delete(project1) - expectedAfterDeletion = Map(project2 -> lastProject2) - _ <- store.fetchUpdates(threshold).assertEquals(expectedAfterDeletion) + expectedAfterDeletion = List(lastProject2) + _ <- stream(offset42).assert(expectedAfterDeletion) } yield () } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/ElemStreamingSuite.scala similarity index 93% rename from delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala rename to delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/ElemStreamingSuite.scala index 9b20d525cb..3b29e71280 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/ElemStreamingSuite.scala @@ -13,7 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie -import ch.epfl.bluebrain.nexus.delta.sourcing.query.StreamingQuerySuite.Release +import ch.epfl.bluebrain.nexus.delta.sourcing.query.ElemStreamingSuite.Release import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.ScopedState import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} @@ -22,7 +22,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.tombstone.TombstoneStore import ch.epfl.bluebrain.nexus.delta.sourcing.{PullRequest, Scope, Serializer} import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import doobie.syntax.all._ -import fs2.Chunk import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredCodec import io.circe.{Codec, DecodingFailure, Json} @@ -30,13 +29,14 @@ import munit.AnyFixture import java.time.Instant -class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { +class ElemStreamingSuite extends NexusSuite with Doobie.Fixture { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) private val qc = QueryConfig(2, RefreshStrategy.Stop) - private lazy val xas = doobie() + private lazy val xas = doobie() + private lazy val elemStreaming = ElemStreaming.stopping(xas, 2) private lazy val prStore = ScopedStateStore[Iri, PullRequestState]( PullRequest.entityType, @@ -112,8 +112,8 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { /** Returns streams that returns elems of Iri and elem of unit */ private def stream(project: ProjectRef, start: Offset, selectFilter: SelectFilter) = ( - StreamingQuery.elems[Iri](Scope(project), start, selectFilter, qc, xas, decodeValue), - StreamingQuery.elems(Scope(project), start, selectFilter, qc, xas) + elemStreaming(Scope(project), start, selectFilter, decodeValue), + elemStreaming(Scope(project), start, selectFilter) ) test("Running a stream on latest states on project 1 from the beginning") { @@ -199,8 +199,7 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { } } - val result = - StreamingQuery.elems[Iri](Scope(project1), Offset.start, SelectFilter.latest, qc, xas, incompleteDecode) + val result = elemStreaming(Scope(project1), Offset.start, SelectFilter.latest, incompleteDecode) val releaseDecodingFailure = decodingFailure(Release.entityType) result.compile.toList.assertEquals( List( @@ -259,16 +258,9 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { .remaining(Scope(ProjectRef.unsafe("xxx", "xxx")), SelectFilter.latest, Offset.at(6L), xas) .assertEquals(None) } - - test("Should only keep the last elem when elems with the same id appear several times") { - val input = List(1 -> "A", 2 -> "B", 3 -> "C", 1 -> "D", 2 -> "E", 1 -> "F") - val expected = Chunk(3 -> "C", 2 -> "E", 1 -> "F") - val obtained = StreamingQuery.dropDuplicates[(Int, String), Int](input, _._1) - assertEquals(obtained, expected) - } } -object StreamingQuerySuite { +object ElemStreamingSuite { final private case class Release( id: Iri, diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStopSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStopSuite.scala new file mode 100644 index 0000000000..03015a24b3 --- /dev/null +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/RefreshOrStopSuite.scala @@ -0,0 +1,85 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.query + +import cats.effect.{IO, Ref} +import ch.epfl.bluebrain.nexus.delta.sourcing.Scope +import ch.epfl.bluebrain.nexus.delta.sourcing.config.ElemQueryConfig.{DelayConfig, PassivationConfig, StopConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectActivitySignals +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import ch.epfl.bluebrain.nexus.testkit.mu.ce.PatienceConfig +import fs2.concurrent.SignallingRef + +import scala.concurrent.duration.DurationInt + +class RefreshOrStopSuite extends NexusSuite { + + implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 10.millis) + + private val org = Label.unsafe("org") + private val project = ProjectRef.unsafe("org", "proj") + private val stopConfig = StopConfig(20) + private val delayConfig = DelayConfig(20, 50.millis) + private val passivationConfig = PassivationConfig(20, 50.millis) + + private def activitySignals(signal: SignallingRef[IO, Boolean]) = + new ProjectActivitySignals { + override def apply(project: ProjectRef): IO[Option[SignallingRef[IO, Boolean]]] = IO.some(signal) + override def activityMap: IO[Map[ProjectRef, Boolean]] = IO.pure(Map.empty) + } + + test("A stop config returns a stop outcome for the different scopes") { + val expected = RefreshOrStop.Outcome.Stopped + for { + _ <- RefreshOrStop(Scope.root, stopConfig, ProjectActivitySignals.noop).run.assertEquals(expected) + _ <- RefreshOrStop(Scope.Org(org), stopConfig, ProjectActivitySignals.noop).run.assertEquals(expected) + _ <- RefreshOrStop(Scope.Project(project), stopConfig, ProjectActivitySignals.noop).run.assertEquals(expected) + } yield () + } + + test("A delay config returns a delayed outcome for the different scopes") { + val expected = RefreshOrStop.Outcome.Delayed + for { + _ <- RefreshOrStop(Scope.root, delayConfig, ProjectActivitySignals.noop).run.assertEquals(expected) + _ <- RefreshOrStop(Scope.Org(org), delayConfig, ProjectActivitySignals.noop).run.assertEquals(expected) + _ <- RefreshOrStop(Scope.Project(project), delayConfig, ProjectActivitySignals.noop).run.assertEquals(expected) + } yield () + } + + test("A passivation config fails for root and org scopes") { + for { + _ <- + RefreshOrStop(Scope.root, passivationConfig, ProjectActivitySignals.noop).run.intercept[IllegalStateException] + _ <- RefreshOrStop(Scope.Org(org), passivationConfig, ProjectActivitySignals.noop).run + .intercept[IllegalStateException] + } yield () + } + + test("A passivation config returns a no signal outcome when no signal is available") { + val expected = RefreshOrStop.Outcome.NoSignal + RefreshOrStop(Scope.Project(project), passivationConfig, ProjectActivitySignals.noop).run.assertEquals(expected) + } + + test("A passivation config returns a delayed passivation outcome when no signal is available") { + val expected = RefreshOrStop.Outcome.DelayedPassivation + for { + signal <- SignallingRef.of[IO, Boolean](true) + projectActivitySignal = activitySignals(signal) + _ <- RefreshOrStop(Scope.Project(project), passivationConfig, projectActivitySignal).run.assertEquals(expected) + } yield () + } + + test("A passivation config returns eventually a passivated outcome when a signal gets activated") { + val expected = RefreshOrStop.Outcome.Passivated + for { + signal <- SignallingRef.of[IO, Boolean](false) + obtained <- Ref.of[IO, Option[RefreshOrStop.Outcome]](None) + projectActivitySignal = activitySignals(signal) + _ <- RefreshOrStop(Scope.Project(project), passivationConfig, projectActivitySignal).run.flatTap { outcome => + obtained.set(Some(outcome)) + }.start + _ <- signal.set(true) + _ <- obtained.get.assertEquals(Some(expected)).eventually + } yield () + } + +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectActivitySignalsSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectActivitySignalsSuite.scala new file mode 100644 index 0000000000..44342c9b63 --- /dev/null +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectActivitySignalsSuite.scala @@ -0,0 +1,57 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate +import ch.epfl.bluebrain.nexus.testkit.clock.MutableClock +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import fs2.Stream +import munit.{AnyFixture, Location} + +import java.time.Instant +import scala.concurrent.duration.DurationInt + +class ProjectActivitySignalsSuite extends NexusSuite with MutableClock.Fixture { + + override def munitFixtures: Seq[AnyFixture[_]] = List(mutableClockFixture) + private lazy val mutableClock: MutableClock = mutableClockFixture() + + private def assertActivitySignal( + lastUpdateReads: ProjectActivitySignals, + project: ProjectRef, + expected: Option[Boolean] + )(implicit loc: Location) = + lastUpdateReads.apply(project).flatMap(_.traverse(_.get)).assertEquals(expected) + + test("Signals should be updated when the stream is processed") { + val now = Instant.now() + val inactiveInterval = 5.seconds + + val project1 = ProjectRef.unsafe("org", "project1") + val project2 = ProjectRef.unsafe("org", "project2") + val project3 = ProjectRef.unsafe("org", "project3") + val project4 = ProjectRef.unsafe("org", "project4") + + def stream = + Stream( + ProjectLastUpdate(project1, now.minusSeconds(15L), Offset.at(35L)), + ProjectLastUpdate(project2, now.minusSeconds(6L), Offset.at(42L)), + ProjectLastUpdate(project3, now, Offset.at(95L)), + ProjectLastUpdate(project1, now, Offset.at(100L)) + ) + + for { + signals <- ProjectSignals[ProjectLastUpdate] + _ <- mutableClock.set(now) + lastUpdateReads = ProjectActivitySignals(signals) + signalPipe = ProjectActivitySignals.signalPipe(signals, mutableClock, inactiveInterval) + _ <- stream.through(signalPipe).compile.drain + _ <- assertActivitySignal(lastUpdateReads, project1, Some(true)) + _ <- assertActivitySignal(lastUpdateReads, project2, Some(false)) + _ <- assertActivitySignal(lastUpdateReads, project3, Some(true)) + _ <- assertActivitySignal(lastUpdateReads, project4, None) + } yield () + } + +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdatesSinkSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdatesSinkSuite.scala index 0860b29fd0..0391764a9a 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdatesSinkSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectLastUpdatesSinkSuite.scala @@ -1,12 +1,13 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream -import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv +import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie -import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectLastUpdateStore +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectLastUpdateStore, ProjectLastUpdateStream} import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectLastUpdate +import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, SuccessElem} import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import fs2.Chunk @@ -20,8 +21,9 @@ class ProjectLastUpdatesSinkSuite extends NexusSuite with Doobie.Fixture { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) - private lazy val xas = doobie() - private lazy val store = ProjectLastUpdateStore(xas) + private lazy val xas = doobie() + private lazy val store = ProjectLastUpdateStore(xas) + private lazy val stream = ProjectLastUpdateStream(xas, QueryConfig(10, RefreshStrategy.Stop)) private val now = Instant.now().truncatedTo(ChronoUnit.SECONDS) @@ -46,16 +48,15 @@ class ProjectLastUpdatesSinkSuite extends NexusSuite with Doobie.Fixture { // Injecting initial updates _ <- store.save(existingUpdates) // Processing new incoming elems - sink <- ProjectLastUpdatesSink(store, 3, 10.millis) + sink = ProjectLastUpdatesSink(store, 3, 10.millis) // Checking new updates in the db _ <- sink(incoming) - expected = Map( - project1 -> ProjectLastUpdate(project1, now, Offset.at(100L)), - project2 -> ProjectLastUpdate(project2, now.minusSeconds(10L), Offset.at(35L)), - project3 -> ProjectLastUpdate(project3, now, Offset.at(95L)) + expected = List( + ProjectLastUpdate(project2, now.minusSeconds(10L), Offset.at(35L)), + ProjectLastUpdate(project3, now, Offset.at(95L)), + ProjectLastUpdate(project1, now, Offset.at(100L)) ) - _ <- store.fetchAll.flatTap(IO.println) - _ <- store.fetchAll.assertEquals(expected) + _ <- stream(Offset.start).assert(expected) } yield () } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectSignalsSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectSignalsSuite.scala new file mode 100644 index 0000000000..2f2a4b645d --- /dev/null +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectSignalsSuite.scala @@ -0,0 +1,41 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import munit.Location + +class ProjectSignalsSuite extends NexusSuite { + + private def assertSignal(signals: ProjectSignals[Int], project: ProjectRef, expected: Option[Boolean])(implicit + loc: Location + ) = + signals.get(project).flatMap(_.traverse(_.get)).assertEquals(expected) + + test("Should init and update the signals accordingly") { + val project1 = ProjectRef.unsafe("org", "proj1") + val project2 = ProjectRef.unsafe("org", "proj2") + val project3 = ProjectRef.unsafe("org", "proj3") + val project4 = ProjectRef.unsafe("org", "proj4") + + val init = Map( + project1 -> 1, + project2 -> 2, + project3 -> 3 + ) + + for { + signals <- ProjectSignals[Int] + _ <- signals.refresh(init, _ > 1) + _ <- assertSignal(signals, project1, Some(false)) + _ <- assertSignal(signals, project2, Some(true)) + _ <- assertSignal(signals, project3, Some(true)) + _ <- assertSignal(signals, project4, None) + updates = Map(project4 -> 4, project1 -> 5) + _ <- signals.refresh(updates, _ > 2) + expectedActivity = Map(project1 -> true, project2 -> false, project3 -> true, project4 -> true) + _ <- signals.activityMap.assertEquals(expectedActivity) + } yield () + } + +} diff --git a/tests/docker/config/delta-postgres.conf b/tests/docker/config/delta-postgres.conf index 9352e0205e..9e5db3bf4f 100644 --- a/tests/docker/config/delta-postgres.conf +++ b/tests/docker/config/delta-postgres.conf @@ -35,6 +35,27 @@ app { } } + elem-query { + type = "passivation" + batch-size = 30 + delay = 200 millis + } + + project-last-update { + batch { + # the maximum batching size, corresponding to the maximum number of elements being aggregated + # at the same time before pushing the update. + max-elements = 100 + # the maximum batching duration. + max-interval = 200 millis + } + query { + batch-size = 30 + refresh-strategy = 200 millis + } + inactive-interval = 5 seconds + } + defaults { database { access { @@ -43,7 +64,7 @@ app { } query { - refresh-strategy = 1s + refresh-strategy = 500 millis } }