Skip to content

Commit

Permalink
Introduce passivation for projections on inactive projects (#5175)
Browse files Browse the repository at this point in the history
* Introduce passivation for projections on inactive projects

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Oct 11, 2024
1 parent 8878484 commit 187090e
Show file tree
Hide file tree
Showing 43 changed files with 1,126 additions and 486 deletions.
35 changes: 21 additions & 14 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,27 @@ app {
slow-query-threshold = 2 seconds
}

elem-query {
type = "delay"
batch-size = 30
delay = 2s
}

project-last-update {
batch {
# the maximum batching size, corresponding to the maximum number of elements being aggregated
# at the same time before pushing the update.
max-elements = 100
# the maximum batching duration.
max-interval = 1 second
}
query {
batch-size = 30
refresh-strategy = 1 second
}
inactive-interval = 10 minutes
}

# Database export configuration
export {
batch-size = 30
Expand Down Expand Up @@ -295,20 +316,6 @@ app {
}
}

project-last-update {
batch {
# the maximum batching size, corresponding to the maximum number of elements being aggregated
# at the same time before pushing the update.
max-elements = 100
# the maximum batching duration.
max-interval = 1 seconds
}
query {
batch-size = 30
refresh-strategy = 1s
}
}

# Type hierarchy configuration
type-hierarchy {
# the type hierarchy event-log configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.ResourcesConfig
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.SchemasConfig
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseConfig
import ch.epfl.bluebrain.nexus.delta.sdk.typehierarchy.TypeHierarchyConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectLastUpdateConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ElemQueryConfig, ProjectLastUpdateConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportConfig
import com.typesafe.config.Config
import pureconfig.ConfigReader
Expand Down Expand Up @@ -50,6 +50,7 @@ final case class AppConfig(
schemas: SchemasConfig,
typeHierarchy: TypeHierarchyConfig,
serviceAccount: ServiceAccountConfig,
elemQuery: ElemQueryConfig,
sse: SseConfig,
projections: ProjectionConfig,
projectLastUpdate: ProjectLastUpdateConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.{projects, supervision}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ProjectHealer, ProjectRejection, ProjectsHealth}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisedDescription
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectActivitySignals, SupervisedDescription}
import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax.KeyOps
import io.circe.syntax.{EncoderOps, KeyOps}
import io.circe.{Encoder, Json}

class SupervisionRoutes(
identities: Identities,
aclCheck: AclCheck,
supervised: IO[List[SupervisedDescription]],
projectsHealth: ProjectsHealth,
projectHealer: ProjectHealer
projectHealer: ProjectHealer,
activitySignals: ProjectActivitySignals
)(implicit
baseUri: BaseUri,
cr: RemoteContextResolution,
Expand All @@ -54,6 +55,9 @@ class SupervisionRoutes(
if (projects.isEmpty) emit(StatusCodes.OK, IO.pure(allProjectsAreHealthy))
else emit(StatusCodes.InternalServerError, IO.pure(unhealthyProjectsEncoder(projects)))
}
},
(pathPrefix("activity") & pathPrefix("projects") & get & pathEndOrSingleSlash) {
emit(activitySignals.activityMap.map(_.asJson))
}
)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.plugin.PluginDef
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{OwnerPermissionsScopeInitialization, ProjectsConfig, ScopeInitializationErrorStore}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectLastUpdateConfig, ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ElemQueryConfig, ProjectLastUpdateConfig, ProjectionConfig, QueryConfig}
import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings
import com.typesafe.config.Config
import izumi.distage.model.definition.{Id, ModuleDef}
Expand All @@ -57,6 +57,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
make[FusionConfig].from { appCfg.fusion }
make[ProjectsConfig].from { appCfg.projects }
make[ProjectionConfig].from { appCfg.projections }
make[ElemQueryConfig].from { appCfg.elemQuery }
make[ProjectLastUpdateConfig].from { appCfg.projectLastUpdate }
make[QueryConfig].from { appCfg.projections.query }
make[BaseUri].from { appCfg.http.baseUri }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.organizations.Organizations
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
import ch.epfl.bluebrain.nexus.delta.sdk.sse.{SseElemStream, SseEncoder, SseEventLog}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.query.ElemStreaming
import izumi.distage.model.definition.{Id, ModuleDef}

/**
Expand All @@ -40,9 +40,7 @@ object EventsModule extends ModuleDef {
)(jo)
}

make[SseElemStream].from { (qc: QueryConfig, xas: Transactors) =>
SseElemStream(qc, xas)
}
make[SseElemStream].from { (elemStreaming: ElemStreaming) => SseElemStream(elemStreaming) }

make[EventsRoutes].from {
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.provisioning.ProjectProvisioning
import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectLastUpdateStore
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import izumi.distage.model.definition.{Id, ModuleDef}

Expand Down Expand Up @@ -100,6 +101,7 @@ object ProjectsModule extends ModuleDef {
config: AppConfig,
serviceAccount: ServiceAccount,
supervisor: Supervisor,
projectLastUpdateStore: ProjectLastUpdateStore,
xas: Transactors,
clock: Clock[IO]
) =>
Expand All @@ -109,6 +111,7 @@ object ProjectsModule extends ModuleDef {
config.projects.deletion,
serviceAccount,
supervisor,
projectLastUpdateStore,
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package ch.epfl.bluebrain.nexus.delta.wiring
import cats.effect.{Clock, IO, Sync}
import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectLastUpdateConfig, ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectLastUpdateStore, ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ElemQueryConfig, ProjectLastUpdateConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectLastUpdateStore, ProjectLastUpdateStream, ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.ElemStreaming
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._
Expand All @@ -18,8 +19,13 @@ import izumi.distage.model.definition.ModuleDef
object StreamModule extends ModuleDef {
addImplicit[Sync[IO]]

make[GraphResourceStream].from { (qc: QueryConfig, xas: Transactors, shifts: ResourceShifts) =>
GraphResourceStream(qc, xas, shifts)
make[ElemStreaming].from {
(xas: Transactors, queryConfig: ElemQueryConfig, activitySignals: ProjectActivitySignals) =>
new ElemStreaming(xas, queryConfig, activitySignals)
}

make[GraphResourceStream].from { (elemStreaming: ElemStreaming, shifts: ResourceShifts) =>
GraphResourceStream(elemStreaming, shifts)
}

many[PipeDef].add(DiscardMetadata)
Expand Down Expand Up @@ -54,10 +60,18 @@ object StreamModule extends ModuleDef {
}

make[ProjectLastUpdateStore].from { (xas: Transactors) => ProjectLastUpdateStore(xas) }
make[ProjectLastUpdateStream].from { (xas: Transactors, config: ProjectLastUpdateConfig) =>
ProjectLastUpdateStream(xas, config.query)
}

make[ProjectLastUpdateProjection].fromEffect {
make[ProjectLastUpdateWrites].fromEffect {
(supervisor: Supervisor, store: ProjectLastUpdateStore, xas: Transactors, config: ProjectLastUpdateConfig) =>
ProjectLastUpdateProjection(supervisor, store, xas, config.batch, config.query)
ProjectLastUpdateWrites(supervisor, store, xas, config.batch)
}

make[ProjectActivitySignals].fromEffect {
(supervisor: Supervisor, stream: ProjectLastUpdateStream, clock: Clock[IO], config: ProjectLastUpdateConfig) =>
ProjectActivitySignals(supervisor, stream, clock, config.inactiveInterval)
}

make[PurgeProjectionCoordinator.type].fromEffect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ProjectHealer, ProjectsHealth}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectActivitySignals, Supervisor}
import izumi.distage.model.definition.{Id, ModuleDef}

/**
Expand All @@ -31,14 +31,16 @@ object SupervisionModule extends ModuleDef {
rc: RemoteContextResolution @Id("aggregate"),
jo: JsonKeyOrdering,
projectsHealth: ProjectsHealth,
projectHealer: ProjectHealer
projectHealer: ProjectHealer,
projectActivitySignals: ProjectActivitySignals
) =>
new SupervisionRoutes(
identities,
aclCheck,
supervisor.getRunningProjections(),
projectsHealth,
projectHealer
projectHealer,
projectActivitySignals
)(baseUri, rc, jo)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,29 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import fs2.concurrent.SignallingRef
import org.scalatest.Assertion

import java.time.Instant

class SupervisionRoutesSpec extends BaseRouteSpec {

private val superviser = User("superviser", realm)
private val supervisor = User("supervisor", realm)

implicit private val callerSuperviser: Caller =
Caller(superviser, Set(superviser, Anonymous, Authenticated(realm), Group("group", realm)))
implicit private val callerSupervisor: Caller =
Caller(supervisor, Set(supervisor, Anonymous, Authenticated(realm), Group("group", realm)))

private val asSuperviser = addCredentials(OAuth2BearerToken("superviser"))
private val asSupervisor = addCredentials(OAuth2BearerToken("supervisor"))

private val identities = IdentitiesDummy(callerSuperviser)
private val identities = IdentitiesDummy(callerSupervisor)
private val aclCheck = AclSimpleCheck().accepted

private val projectRef = ProjectRef(Label.unsafe("myorg"), Label.unsafe("myproject"))
private val projectRef2 = ProjectRef(Label.unsafe("myorg"), Label.unsafe("myproject2"))
private val project = ProjectRef.unsafe("myorg", "myproject")
private val project2 = ProjectRef.unsafe("myorg", "myproject2")

private val unhealthyProjects = Set(projectRef, projectRef2)
private val unhealthyProjects = Set(project, project2)

private val metadata = ProjectionMetadata("module", "name", Some(projectRef), None)
private val metadata = ProjectionMetadata("module", "name", Some(project), None)
private val progress = ProjectionProgress(Offset.start, Instant.EPOCH, 1L, 1L, 1L)
private val description1 =
SupervisedDescription(metadata, ExecutionStrategy.PersistentSingleNode, 1, ExecutionStatus.Running, progress)
Expand Down Expand Up @@ -69,21 +70,27 @@ class SupervisionRoutesSpec extends BaseRouteSpec {
override def heal(project: ProjectRef): IO[Unit] = IO.unit
}

private val activitySignals = new ProjectActivitySignals {
override def apply(project: ProjectRef): IO[Option[SignallingRef[IO, Boolean]]] = IO.none
override def activityMap: IO[Map[ProjectRef, Boolean]] = IO.pure(Map(project -> true, project2 -> false))
}

private def routesTemplate(unhealthyProjects: Set[ProjectRef], healer: ProjectHealer) = Route.seal(
new SupervisionRoutes(
identities,
aclCheck,
IO.pure { List(description1, description2) },
projectsHealth(unhealthyProjects),
healer
healer,
activitySignals
).routes
)

private val routes = routesTemplate(Set.empty, noopHealer)

override def beforeAll(): Unit = {
super.beforeAll()
aclCheck.append(AclAddress.Root, superviser -> Set(supervision.read, projects.write)).accepted
aclCheck.append(AclAddress.Root, supervisor -> Set(supervision.read, projects.write)).accepted
}

"The supervision projection endpoint" should {
Expand All @@ -95,7 +102,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec {
}

"be accessible with supervision/read permission and return expected payload" in {
Get("/v1/supervision/projections") ~> asSuperviser ~> routes ~> check {
Get("/v1/supervision/projections") ~> asSupervisor ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual jsonContentOf("supervision/supervision-running-proj-response.json")
}
Expand All @@ -113,14 +120,14 @@ class SupervisionRoutesSpec extends BaseRouteSpec {

"return a successful http code when there are no unhealthy projects" in {
val routesWithHealthyProjects = routesTemplate(Set.empty, noopHealer)
Get("/v1/supervision/projects") ~> asSuperviser ~> routesWithHealthyProjects ~> check {
Get("/v1/supervision/projects") ~> asSupervisor ~> routesWithHealthyProjects ~> check {
response.status shouldEqual StatusCodes.OK
}
}

"return an error code when there are unhealthy projects" in {
val routesWithUnhealthyProjects = routesTemplate(unhealthyProjects, noopHealer)
Get("/v1/supervision/projects") ~> asSuperviser ~> routesWithUnhealthyProjects ~> check {
Get("/v1/supervision/projects") ~> asSupervisor ~> routesWithUnhealthyProjects ~> check {
response.status shouldEqual StatusCodes.InternalServerError
response.asJson shouldEqual
json"""
Expand Down Expand Up @@ -154,7 +161,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec {
val project = ProjectRef(Label.unsafe("myorg"), Label.unsafe("myproject"))
val routesWithHealer = routesTemplate(Set.empty, projectHealer)

Post(s"/v1/supervision/projects/$project/heal") ~> asSuperviser ~> routesWithHealer ~> check {
Post(s"/v1/supervision/projects/$project/heal") ~> asSupervisor ~> routesWithHealer ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual
json"""
Expand All @@ -168,7 +175,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec {

"return an error if the healing failed" in {
val routesWithFailingHealer = routesTemplate(Set.empty, failingHealer)
Post("/v1/supervision/projects/myorg/myproject/heal") ~> asSuperviser ~> routesWithFailingHealer ~> check {
Post("/v1/supervision/projects/myorg/myproject/heal") ~> asSupervisor ~> routesWithFailingHealer ~> check {
response.status shouldEqual StatusCodes.InternalServerError
response.asJson shouldEqual
json"""
Expand All @@ -184,4 +191,21 @@ class SupervisionRoutesSpec extends BaseRouteSpec {

}

"The supervision project activity endpoint" should {

"be forbidden without supervision/read permission" in {
Get("/v1/supervision/activity/projects") ~> routes ~> check {
response.shouldBeForbidden
}
}

"be accessible with supervision/read permission and return expected payload" in {
Get("/v1/supervision/activity/projects") ~> asSupervisor ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual json"""{ "$project": true, "$project2": false }"""
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, Projects}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.Projections
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.query.{ElemStreaming, SelectFilter}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import izumi.distage.model.definition.{Id, ModuleDef}

Expand All @@ -33,8 +32,8 @@ class GraphAnalyticsPluginModule(priority: Int) extends ModuleDef {
GraphAnalytics(client, fetchContext, config.prefix, config.termAggregations)
}

make[GraphAnalyticsStream].from { (qc: QueryConfig, xas: Transactors) =>
GraphAnalyticsStream(qc, xas)
make[GraphAnalyticsStream].from { (elemStreaming: ElemStreaming, xas: Transactors) =>
GraphAnalyticsStream(elemStreaming, xas)
}

make[GraphAnalyticsCoordinator].fromEffect {
Expand Down
Loading

0 comments on commit 187090e

Please sign in to comment.