From 33cf07db920ecadfcdee1dd821e1af9b502168b8 Mon Sep 17 00:00:00 2001 From: dantb Date: Wed, 8 Nov 2023 18:01:54 +0100 Subject: [PATCH] Migrate Blazegraph module to Cats Effect (#4478) --- .../kernel/utils/ClasspathResourceUtils.scala | 9 --- .../nexus/delta/kernel/utils/IOUtils.scala | 8 +++ .../BlazegraphDecoderConfiguration.scala | 9 +-- .../blazegraph/BlazegraphPluginModule.scala | 22 +++--- .../BlazegraphServiceDependency.scala | 1 - .../plugins/blazegraph/BlazegraphViews.scala | 5 +- .../blazegraph/BlazegraphViewsQuery.scala | 7 +- .../blazegraph/client/BlazegraphClient.scala | 68 ++++++++++--------- .../blazegraph/client/SparqlClient.scala | 40 ++++++----- .../indexing/BlazegraphCoordinator.scala | 2 - .../blazegraph/indexing/BlazegraphSink.scala | 2 - .../blazegraph/model/DefaultProperties.scala | 3 + .../BlazegraphSlowQueryDeleter.scala | 3 +- .../BlazegraphSlowQueryLogger.scala | 42 ++++++------ .../BlazegraphSlowQueryStore.scala | 20 +++--- .../blazegraph/BlazegraphClientSetup.scala | 5 +- .../delta/plugins/blazegraph/Fixtures.scala | 6 ++ .../client/BlazegraphClientSpec.scala | 24 ++----- .../client/SparqlQueryClientDummy.scala | 4 +- .../indexing/BlazegraphSinkSuite.scala | 6 +- .../routes/BlazegraphViewsQueryDummy.scala | 3 +- .../routes/BlazegraphViewsQuerySpec.scala | 6 +- .../BlazegraphSlowQueryLoggerSuite.scala | 64 +++++++++-------- .../compositeviews/BlazegraphQuery.scala | 13 ++-- .../compositeviews/CompositeSink.scala | 4 +- .../CompositeViewsPluginModule.scala | 13 ++-- .../indexing/BatchQueryGraph.scala | 15 ++-- .../indexing/CompositeSpaces.scala | 9 ++- .../indexing/SingleQueryGraph.scala | 15 ++-- .../indexing/CompositeIndexingSuite.scala | 26 +++---- .../routes/BlazegraphQueryDummy.scala | 16 +++-- .../search/SearchSparqlQuerySpec.scala | 22 ++---- .../nexus/delta/sdk/http/HttpClient.scala | 29 ++++---- .../nexus/testkit/bio/IOFixedClock.scala | 7 +- 34 files changed, 272 insertions(+), 256 deletions(-) create mode 100644 delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/DefaultProperties.scala diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/ClasspathResourceUtils.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/ClasspathResourceUtils.scala index d51c30cc23..3ee2eefc20 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/ClasspathResourceUtils.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/ClasspathResourceUtils.scala @@ -90,15 +90,6 @@ trait ClasspathResourceUtils { props.asScala.toMap } - final def bioPropertiesOf(resourcePath: String)(implicit - classLoader: ClassLoader - ): BIO[ClasspathResourceError, Map[String, String]] = - bioStreamOf(resourcePath).map { is => - val props = new Properties() - props.load(is) - props.asScala.toMap - } - /** * Loads the content of the argument classpath resource as a string and replaces all the key matches of the * ''replacements'' with their values. The resulting string is parsed into a json value. diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/IOUtils.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/IOUtils.scala index fd4b9dd7ce..d434bdebc2 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/IOUtils.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/utils/IOUtils.scala @@ -7,6 +7,7 @@ import monix.bio.{Task, UIO} import java.time.Instant import java.util.UUID import java.util.concurrent.TimeUnit +import scala.concurrent.duration.{DurationLong, FiniteDuration, MILLISECONDS} trait IOUtils { @@ -22,6 +23,13 @@ object IOUtils extends IOUtils trait IOInstant { def now(implicit clock: Clock[IO]): IO[Instant] = clock.realTime(TimeUnit.MILLISECONDS).map(Instant.ofEpochMilli) + + def timed[A](io: IO[A])(implicit c: Clock[IO]): IO[(A, FiniteDuration)] = + for { + start <- c.monotonic(MILLISECONDS) + result <- io + finish <- c.monotonic(MILLISECONDS) + } yield (result, (finish - start).millis) } object IOInstant extends IOInstant diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphDecoderConfiguration.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphDecoderConfiguration.scala index c6aaea284f..289536bf63 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphDecoderConfiguration.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphDecoderConfiguration.scala @@ -1,16 +1,17 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, BlazegraphViewType} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, JsonLdContext, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.Configuration -import monix.bio.Task private[blazegraph] object BlazegraphDecoderConfiguration { - def apply(implicit jsonLdApi: JsonLdApi, rcr: RemoteContextResolution): Task[Configuration] = for { - contextValue <- Task.delay { ContextValue(contexts.blazegraph) } - jsonLdContext <- JsonLdContext(contextValue) + def apply(implicit jsonLdApi: JsonLdApi, rcr: RemoteContextResolution): IO[Configuration] = for { + contextValue <- IO.delay { ContextValue(contexts.blazegraph) } + jsonLdContext <- JsonLdContext(contextValue).toCatsIO } yield { val enhancedJsonLdContext = jsonLdContext .addAliasIdType("IndexingBlazegraphViewValue", BlazegraphViewType.IndexingBlazegraphView.tpe) diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala index 3bd7afec69..301e074acc 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala @@ -3,12 +3,12 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph import akka.actor.typed.ActorSystem import cats.effect.{Clock, ContextShift, IO, Timer} import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ -import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF +import ch.epfl.bluebrain.nexus.delta.kernel.utils.{CatsEffectsClasspathResourceUtils, UUIDF} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphCoordinator import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection.ProjectContextRejection -import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent} +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent, DefaultProperties} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphSlowQueryDeleter, BlazegraphSlowQueryLogger, BlazegraphSlowQueryStore} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi @@ -48,6 +48,10 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef { make[BlazegraphViewsConfig].from { BlazegraphViewsConfig.load(_) } + make[DefaultProperties].fromEffect { + CatsEffectsClasspathResourceUtils.ioPropertiesOf("blazegraph/index.properties").map(DefaultProperties) + } + make[HttpClient].named("http-indexing-client").from { (cfg: BlazegraphViewsConfig, as: ActorSystem[Nothing], sc: Scheduler) => HttpClient()(cfg.indexingClient, as.classicSystem, sc) @@ -69,17 +73,18 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef { )(timer) } - make[BlazegraphSlowQueryLogger].from { (cfg: BlazegraphViewsConfig, store: BlazegraphSlowQueryStore) => - BlazegraphSlowQueryLogger(store, cfg.slowQueries.slowQueryThreshold) + make[BlazegraphSlowQueryLogger].from { (cfg: BlazegraphViewsConfig, store: BlazegraphSlowQueryStore, c: Clock[IO]) => + BlazegraphSlowQueryLogger(store, cfg.slowQueries.slowQueryThreshold)(c) } make[BlazegraphClient].named("blazegraph-indexing-client").from { ( cfg: BlazegraphViewsConfig, client: HttpClient @Id("http-indexing-client"), - as: ActorSystem[Nothing] + as: ActorSystem[Nothing], + properties: DefaultProperties ) => - BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem) + BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout, properties.value)(as.classicSystem) } make[HttpClient].named("http-query-client").from { @@ -91,9 +96,10 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef { ( cfg: BlazegraphViewsConfig, client: HttpClient @Id("http-query-client"), - as: ActorSystem[Nothing] + as: ActorSystem[Nothing], + properties: DefaultProperties ) => - BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem) + BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout, properties.value)(as.classicSystem) } make[ValidateBlazegraphView].from { diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphServiceDependency.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphServiceDependency.scala index f2da3575d2..a5ee8d36c6 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphServiceDependency.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphServiceDependency.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.sdk.ServiceDependency import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala index 4cdae21097..a16c886012 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala @@ -6,7 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOInstant, UUIDF} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews._ -import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.{BlazegraphClient, SparqlClientError} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphView.IndexingBlazegraphView @@ -542,9 +542,8 @@ object BlazegraphViews { case i: IndexingBlazegraphView => client .createNamespace(BlazegraphViews.namespace(i, prefix)) - .mapError(WrappedBlazegraphClientError.apply) + .adaptError { case e: SparqlClientError => WrappedBlazegraphClientError(e) } .void - .toCatsIO case _ => IO.unit } apply(fetchContext, contextResolution, validate, createNameSpace, eventLogConfig, prefix, xas) diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViewsQuery.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViewsQuery.scala index 17edd74354..0fbd2ffa65 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViewsQuery.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViewsQuery.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph import cats.effect.IO +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils.ioContentOf @@ -182,7 +183,6 @@ object BlazegraphViewsQuery { AuthorizationFailed(i.ref.project, i.permission) ) .as(Set(i.index)) - .toBIO[BlazegraphViewRejection] case a: AggregateView => aclCheck .mapFilter[IndexingView, String]( @@ -190,11 +190,12 @@ object BlazegraphViewsQuery { v => ProjectAcl(v.ref.project) -> v.permission, _.index ) - .toUIO } qr <- logSlowQueries( BlazegraphQueryContext(ViewRef.apply(project, iri), query, caller.subject), - client.query(indices, query, responseType).mapError(WrappedBlazegraphClientError) + client.query(indices, query, responseType).adaptError { case e: SparqlClientError => + WrappedBlazegraphClientError(e) + } ) } yield qr diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClient.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClient.scala index 8573f45db4..93d94c9dd5 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClient.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClient.scala @@ -7,19 +7,20 @@ import akka.http.scaladsl.model.headers.{BasicHttpCredentials, HttpCredentials, import akka.http.scaladsl.model.{HttpEntity, HttpHeader, Uri} import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller import akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers.stringUnmarshaller -import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils +import cats.effect.IO +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient.timeoutHeader import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlClientError.{InvalidCountRequest, WrappedHttpClientError} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.{Aux, SparqlResultsJson} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig.Credentials import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery -import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient +import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientError} import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription.ResolvedServiceDescription import ch.epfl.bluebrain.nexus.delta.sdk.model.Name import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ -import monix.bio.{IO, UIO} import scala.concurrent.duration._ @@ -29,24 +30,20 @@ import scala.concurrent.duration._ class BlazegraphClient( client: HttpClient, endpoint: Uri, - queryTimeout: Duration + queryTimeout: Duration, + defaultProperties: Map[String, String] )(implicit credentials: Option[HttpCredentials], as: ActorSystem) extends SparqlClient(client, SparqlQueryEndpoint.blazegraph(endpoint)) { - implicit private val cl: ClassLoader = getClass.getClassLoader - private val serviceVersion = """(buildVersion">)([^<]*)""".r private val serviceName = Name.unsafe("blazegraph") - private val defaultProperties = - ClasspathResourceUtils.bioPropertiesOf("blazegraph/index.properties").hideErrors.memoizeOnSuccess - override def query[R <: SparqlQueryResponse]( indices: Iterable[String], q: SparqlQuery, responseType: Aux[R], additionalHeaders: Seq[HttpHeader] - ): IO[SparqlClientError, R] = { + ): IO[R] = { val headers = queryTimeout match { case finite: FiniteDuration => additionalHeaders :+ RawHeader(timeoutHeader, finite.toMillis.toString) case _ => additionalHeaders @@ -57,10 +54,11 @@ class BlazegraphClient( /** * Fetches the service description information (name and version) */ - def serviceDescription: UIO[ServiceDescription] = + def serviceDescription: IO[ServiceDescription] = client .fromEntityTo[ResolvedServiceDescription](Get(endpoint / "status")) .timeout(5.seconds) + .toCatsIO .redeem( _ => ServiceDescription.unresolved(serviceName), _.map(_.copy(name = serviceName)).getOrElse(ServiceDescription.unresolved(serviceName)) @@ -69,11 +67,13 @@ class BlazegraphClient( /** * Check whether the passed namespace ''namespace'' exists. */ - def existsNamespace(namespace: String): IO[SparqlClientError, Boolean] = - client(Get(endpoint / "namespace" / namespace)) { - case resp if resp.status == OK => UIO.delay(resp.discardEntityBytes()) >> IO.pure(true) - case resp if resp.status == NotFound => UIO.delay(resp.discardEntityBytes()) >> IO.pure(false) - }.mapError(WrappedHttpClientError) + def existsNamespace(namespace: String): IO[Boolean] = + client + .run(Get(endpoint / "namespace" / namespace)) { + case resp if resp.status == OK => IO.delay(resp.discardEntityBytes()).as(true) + case resp if resp.status == NotFound => IO.delay(resp.discardEntityBytes()).as(false) + } + .adaptError { case e: HttpClientError => WrappedHttpClientError(e) } /** * Attempts to create a namespace (if it doesn't exist) recovering gracefully when the namespace already exists. @@ -85,17 +85,19 @@ class BlazegraphClient( * @return * ''true'' wrapped on an IO when namespace has been created and ''false'' wrapped on an IO when it already existed */ - def createNamespace(namespace: String, properties: Map[String, String]): IO[SparqlClientError, Boolean] = + def createNamespace(namespace: String, properties: Map[String, String]): IO[Boolean] = existsNamespace(namespace).flatMap { case true => IO.pure(false) case false => val updated = properties + ("com.bigdata.rdf.sail.namespace" -> namespace) val payload = updated.map { case (key, value) => s"$key=$value" }.mkString("\n") val req = Post(endpoint / "namespace", HttpEntity(payload)) - client(req) { - case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> IO.pure(true) - case resp if resp.status == Conflict => UIO.delay(resp.discardEntityBytes()) >> IO.pure(false) - }.mapError(WrappedHttpClientError) + client + .run(req) { + case resp if resp.status.isSuccess() => IO.delay(resp.discardEntityBytes()).as(true) + case resp if resp.status == Conflict => IO.delay(resp.discardEntityBytes()).as(false) + } + .adaptError { case e: HttpClientError => WrappedHttpClientError(e) } } /** @@ -107,8 +109,7 @@ class BlazegraphClient( * @return * ''true'' wrapped on an IO when namespace has been created and ''false'' wrapped on an IO when it already existed */ - def createNamespace(namespace: String): IO[SparqlClientError, Boolean] = - defaultProperties.flatMap(createNamespace(namespace, _)) + def createNamespace(namespace: String): IO[Boolean] = createNamespace(namespace, defaultProperties) /** * Attempts to delete a namespace recovering gracefully when the namespace does not exists. @@ -116,16 +117,18 @@ class BlazegraphClient( * @return * ''true'' wrapped in ''F'' when namespace has been deleted and ''false'' wrapped in ''F'' when it does not existe */ - def deleteNamespace(namespace: String): IO[SparqlClientError, Boolean] = - client(Delete(endpoint / "namespace" / namespace)) { - case resp if resp.status == OK => UIO.delay(resp.discardEntityBytes()) >> IO.pure(true) - case resp if resp.status == NotFound => UIO.delay(resp.discardEntityBytes()) >> IO.pure(false) - }.mapError(WrappedHttpClientError) + def deleteNamespace(namespace: String): IO[Boolean] = + client + .run(Delete(endpoint / "namespace" / namespace)) { + case resp if resp.status == OK => IO.delay(resp.discardEntityBytes()).as(true) + case resp if resp.status == NotFound => IO.delay(resp.discardEntityBytes()).as(false) + } + .adaptError { case e: HttpClientError => WrappedHttpClientError(e) } /** * Count all the triples on an index */ - def count(index: String): IO[SparqlClientError, Long] = { + def count(index: String): IO[Long] = { val sparqlQuery = SparqlConstructQuery.unsafe("SELECT (COUNT(?s) AS ?count) WHERE { ?s ?p ?o }") query(Set(index), sparqlQuery, SparqlResultsJson) .flatMap { response => @@ -135,7 +138,7 @@ class BlazegraphClient( count <- countAsString.value.toLongOption } yield count - IO.fromOption(count, InvalidCountRequest(index, sparqlQuery.value)) + IO.fromOption(count)(InvalidCountRequest(index, sparqlQuery.value)) } } @@ -163,10 +166,11 @@ object BlazegraphClient { client: HttpClient, endpoint: Uri, credentials: Option[Credentials], - queryTimeout: Duration + queryTimeout: Duration, + defaultProperties: Map[String, String] )(implicit as: ActorSystem): BlazegraphClient = { implicit val cred: Option[BasicHttpCredentials] = credentials.map { cred => BasicHttpCredentials(cred.username, cred.password.value) } - new BlazegraphClient(client, endpoint, queryTimeout) + new BlazegraphClient(client, endpoint, queryTimeout, defaultProperties) } } diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/SparqlClient.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/SparqlClient.scala index fe7ea76837..bfb1c41a5c 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/SparqlClient.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/SparqlClient.scala @@ -5,21 +5,21 @@ import akka.http.scaladsl.client.RequestBuilding.Post import akka.http.scaladsl.model.Uri.Query import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.{Accept, HttpCredentials} -import cats.syntax.foldable._ +import cats.effect.IO +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlClientError.{InvalidUpdateRequest, WrappedHttpClientError} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponse.{SparqlJsonLdResponse, SparqlNTriplesResponse, SparqlRdfXmlResponse, SparqlResultsResponse, SparqlXmlResultsResponse} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.BNode import ch.epfl.bluebrain.nexus.delta.rdf.graph.NTriples import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery -import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient +import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientError} import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import io.circe.Json import io.circe.syntax._ -import monix.bio.IO import org.apache.jena.query.ParameterizedSparqlString -import scala.util.Try import scala.xml.{Elem, NodeSeq} trait SparqlQueryClient { @@ -42,7 +42,7 @@ trait SparqlQueryClient { q: SparqlQuery, responseType: SparqlQueryResponseType.Aux[R], additionalHeaders: Seq[HttpHeader] = Seq.empty - ): IO[SparqlClientError, R] + ): IO[R] } /** @@ -61,7 +61,7 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit q: SparqlQuery, responseType: SparqlQueryResponseType.Aux[R], additionalHeaders: Seq[HttpHeader] = Seq.empty - ): IO[SparqlClientError, R] = + ): IO[R] = responseType match { case SparqlResultsJson => sparqlResultsResponse(indices, q, additionalHeaders) case SparqlResultsXml => sparqlXmlResultsResponse(indices, q, additionalHeaders) @@ -80,17 +80,18 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit * @return * successful Future[Unit] if update succeeded, failure otherwise */ - def bulk(index: String, queries: Seq[SparqlWriteQuery]): IO[SparqlClientError, Unit] = { + def bulk(index: String, queries: Seq[SparqlWriteQuery]): IO[Unit] = { val queryString = queries.map(_.value).mkString("\n") val pss = new ParameterizedSparqlString pss.setCommandText(queryString) for { - _ <- IO.fromTry(Try(pss.asUpdate())).mapError(th => InvalidUpdateRequest(index, queryString, th.getMessage)) + _ <- IO(pss.asUpdate()).adaptError(e => InvalidUpdateRequest(index, queryString, e.getMessage)) queryOpt = uniqueGraph(queries).map(graph => Query("using-named-graph-uri" -> graph.toString)) formData = FormData("update" -> queryString) reqEndpoint = endpoint(index).withQuery(queryOpt.getOrElse(Query.Empty)) req = Post(reqEndpoint, formData).withHttpCredentials - result <- client.discardBytes(req, ()).mapError(WrappedHttpClientError) + result <- + client.discardBytes(req, ()).toCatsIO.adaptError { case e: HttpClientError => WrappedHttpClientError(e) } } yield result } @@ -105,7 +106,7 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit * @param data * the new graph as NTriples representation */ - def replace(index: String, graph: Uri, data: NTriples): IO[SparqlClientError, Unit] = + def replace(index: String, graph: Uri, data: NTriples): IO[Unit] = bulk(index, Seq(SparqlWriteQuery.replace(graph, data))) /** @@ -121,7 +122,7 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit * @param strategy * the patch strategy */ - def patch(index: String, graph: Uri, data: NTriples, strategy: PatchStrategy): IO[SparqlClientError, Unit] = + def patch(index: String, graph: Uri, data: NTriples, strategy: PatchStrategy): IO[Unit] = bulk(index, Seq(SparqlWriteQuery.patch(graph, data, strategy))) /** @@ -132,7 +133,7 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit * @param graph * the graph to drop */ - def drop(index: String, graph: Uri): IO[SparqlClientError, Unit] = + def drop(index: String, graph: Uri): IO[Unit] = bulk(index, Seq(SparqlWriteQuery.drop(graph))) private def uniqueGraph(query: Seq[SparqlWriteQuery]): Option[Uri] = @@ -145,7 +146,7 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit indices: Iterable[String], q: SparqlQuery, additionalHeaders: Seq[HttpHeader] - ): IO[SparqlClientError, SparqlResultsResponse] = + ): IO[SparqlResultsResponse] = indices.toList .foldLeftM(SparqlResults.empty) { (results, index) => val req = Post(endpoint(index), FormData("query" -> q.value)) @@ -154,12 +155,13 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit client.fromJsonTo[SparqlResults](req).mapError(WrappedHttpClientError).map(results ++ _) } .map(SparqlResultsResponse) + .toCatsIO private def sparqlXmlResultsResponse( indices: Iterable[String], q: SparqlQuery, additionalHeaders: Seq[HttpHeader] - ): IO[SparqlClientError, SparqlXmlResultsResponse] = + ): IO[SparqlXmlResultsResponse] = indices.toList .foldLeftM(None: Option[Elem]) { case (elem, index) => val req = Post(endpoint(index), FormData("query" -> q.value)) @@ -176,6 +178,7 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit } } } + .toCatsIO .map { case Some(elem) => SparqlXmlResultsResponse(elem) case None => SparqlXmlResultsResponse(NodeSeq.Empty) @@ -185,7 +188,7 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit indices: Iterable[String], q: SparqlQuery, additionalHeaders: Seq[HttpHeader] - ): IO[SparqlClientError, SparqlJsonLdResponse] = + ): IO[SparqlJsonLdResponse] = indices.toList .foldLeftM(Vector.empty[Json]) { (results, index) => val req = Post(endpoint(index), FormData("query" -> q.value)) @@ -196,13 +199,14 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit .mapError(WrappedHttpClientError) .map(results ++ _.arrayOrObject(Vector.empty[Json], identity, obj => Vector(obj.asJson))) } + .toCatsIO .map(vector => SparqlJsonLdResponse(Json.arr(vector: _*))) private def sparqlNTriplesResponse( indices: Iterable[String], q: SparqlQuery, additionalHeaders: Seq[HttpHeader] - ): IO[SparqlClientError, SparqlNTriplesResponse] = + ): IO[SparqlNTriplesResponse] = indices.toList .foldLeftM(NTriples.empty) { (results, index) => val req = Post(endpoint(index), FormData("query" -> q.value)) @@ -210,13 +214,14 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit .withHttpCredentials client.fromEntityTo[String](req).mapError(WrappedHttpClientError).map(s => results ++ NTriples(s, BNode.random)) } + .toCatsIO .map(SparqlNTriplesResponse) private def sparqlRdfXmlResponse( indices: Iterable[String], q: SparqlQuery, additionalHeaders: Seq[HttpHeader] - ): IO[SparqlClientError, SparqlRdfXmlResponse] = + ): IO[SparqlRdfXmlResponse] = indices.toList .foldLeftM(None: Option[Elem]) { case (elem, index) => val req = Post(endpoint(index), FormData("query" -> q.value)) @@ -229,6 +234,7 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit } } } + .toCatsIO .map { case Some(elem) => SparqlRdfXmlResponse(elem) case None => SparqlRdfXmlResponse(NodeSeq.Empty) diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinator.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinator.scala index 780b11d80d..57bb2eedf0 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinator.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinator.scala @@ -4,7 +4,6 @@ import cats.effect.{ContextShift, IO, Timer} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.cache.LocalCache -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig @@ -136,7 +135,6 @@ object BlazegraphCoordinator { (v: ActiveViewDef) => client .createNamespace(v.namespace) - .toCatsIO .onError { e => logger.error(e)(s"Namespace for view '${v.ref.project}/${v.ref.viewId}' could not be created.") } diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSink.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSink.scala index d76fdb11f5..b23198692b 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSink.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSink.scala @@ -15,7 +15,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import com.typesafe.scalalogging.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import fs2.Chunk import shapeless.Typeable @@ -61,7 +60,6 @@ final class BlazegraphSink( if (bulk.queries.nonEmpty) client .bulk(namespace, bulk.queries) - .toCatsIO .redeemWith( err => IO diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/DefaultProperties.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/DefaultProperties.scala new file mode 100644 index 0000000000..318a7a2d65 --- /dev/null +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/DefaultProperties.scala @@ -0,0 +1,3 @@ +package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model + +final case class DefaultProperties(value: Map[String, String]) diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryDeleter.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryDeleter.scala index 90f50ebba5..90797d48f9 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryDeleter.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryDeleter.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries import cats.effect.{Clock, IO, Timer} -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOInstant import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStrategy, ProjectionMetadata, Supervisor} import fs2.{INothing, Stream} @@ -13,7 +12,7 @@ class BlazegraphSlowQueryDeleter(store: BlazegraphSlowQueryStore, deletionThresh ) { def deleteOldQueries: IO[Unit] = { IOInstant.now.flatMap { now => - store.removeQueriesOlderThan(now.minusMillis(deletionThreshold.toMillis)).toCatsIO + store.removeQueriesOlderThan(now.minusMillis(deletionThreshold.toMillis)) } } } diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryLogger.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryLogger.scala index da148d07ab..dc38630cf0 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryLogger.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryLogger.scala @@ -1,11 +1,11 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries -import cats.effect.Clock -import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOUtils +import cats.effect.{Clock, IO} +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOInstant import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViewsQuery.BlazegraphQueryContext import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.model.BlazegraphSlowQuery -import com.typesafe.scalalogging.Logger -import monix.bio.{IO, UIO} import scala.concurrent.duration.{Duration, FiniteDuration} @@ -24,7 +24,7 @@ trait BlazegraphSlowQueryLogger { * @return * the query */ - def apply[E, A](context: BlazegraphQueryContext, query: IO[E, A]): IO[E, A] + def apply[A](context: BlazegraphQueryContext, query: IO[A]): IO[A] } object BlazegraphSlowQueryLogger { @@ -32,13 +32,14 @@ object BlazegraphSlowQueryLogger { private val logger = Logger[BlazegraphSlowQueryLogger] def apply(sink: BlazegraphSlowQueryStore, longQueryThreshold: Duration)(implicit - clock: Clock[UIO] + clock: Clock[IO] ): BlazegraphSlowQueryLogger = new BlazegraphSlowQueryLogger { - def apply[E, A](context: BlazegraphQueryContext, query: IO[E, A]): IO[E, A] = { - query.attempt.timed - .flatMap { case (duration, outcome) => - UIO - .when(duration >= longQueryThreshold)(logSlowQuery(context, outcome.isLeft, duration)) + def apply[A](context: BlazegraphQueryContext, query: IO[A]): IO[A] = { + IOInstant + .timed(query.attempt) + .flatMap { case (outcome, duration) => + IO + .whenA(duration >= longQueryThreshold)(logSlowQuery(context, outcome.isLeft, duration)) .flatMap(_ => IO.fromEither(outcome)) } } @@ -47,16 +48,13 @@ object BlazegraphSlowQueryLogger { context: BlazegraphQueryContext, isError: Boolean, duration: FiniteDuration - ): UIO[Unit] = { - IOUtils.instant - .tapEval(_ => - UIO.delay(logger.warn(s"Slow blazegraph query recorded: duration '$duration', view '${context.view}'")) - ) - .flatMap { now => - sink - .save(BlazegraphSlowQuery(context.view, context.query, isError, duration, now, context.subject)) - .onErrorHandleWith(e => UIO.delay(logger.error("error logging blazegraph slow query", e))) - } - } + ): IO[Unit] = + logger.warn(s"Slow blazegraph query recorded: duration '$duration', view '${context.view}'") >> + IOInstant.now + .flatMap { now => + sink + .save(BlazegraphSlowQuery(context.view, context.query, isError, duration, now, context.subject)) + .handleErrorWith(e => logger.error(e)("error logging blazegraph slow query")) + } } } diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryStore.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryStore.scala index 9ceaa8d205..5a03d7009b 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryStore.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryStore.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.model.BlazegraphSlowQuery import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors @@ -7,7 +8,6 @@ import doobie.implicits._ import doobie.postgres.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import io.circe.syntax.EncoderOps -import monix.bio.Task import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Database._ import java.time.Instant @@ -16,38 +16,38 @@ import java.time.Instant * Persistence operations for slow query logs */ trait BlazegraphSlowQueryStore { - def save(query: BlazegraphSlowQuery): Task[Unit] - def removeQueriesOlderThan(instant: Instant): Task[Unit] - def listForTestingOnly(view: ViewRef): Task[List[BlazegraphSlowQuery]] + def save(query: BlazegraphSlowQuery): IO[Unit] + def listForTestingOnly(view: ViewRef): IO[List[BlazegraphSlowQuery]] + def removeQueriesOlderThan(instant: Instant): IO[Unit] } object BlazegraphSlowQueryStore { def apply(xas: Transactors): BlazegraphSlowQueryStore = { new BlazegraphSlowQueryStore { - override def save(query: BlazegraphSlowQuery): Task[Unit] = { + override def save(query: BlazegraphSlowQuery): IO[Unit] = { sql""" INSERT INTO blazegraph_queries(project, view_id, instant, duration, subject, query, failed) | VALUES(${query.view.project}, ${query.view.viewId}, ${query.instant}, ${query.duration}, ${query.subject.asJson}, ${query.query.value}, ${query.failed}) """.stripMargin.update.run - .transact(xas.write) + .transact(xas.writeCE) .void } - override def listForTestingOnly(view: ViewRef): Task[List[BlazegraphSlowQuery]] = { + override def listForTestingOnly(view: ViewRef): IO[List[BlazegraphSlowQuery]] = { sql""" SELECT project, view_id, instant, duration, subject, query, failed FROM public.blazegraph_queries |WHERE view_id = ${view.viewId} AND project = ${view.project} """.stripMargin .query[BlazegraphSlowQuery] .stream - .transact(xas.read) + .transact(xas.readCE) .compile .toList } - override def removeQueriesOlderThan(instant: Instant): Task[Unit] = { + override def removeQueriesOlderThan(instant: Instant): IO[Unit] = { sql""" DELETE FROM public.blazegraph_queries |WHERE instant < $instant """.stripMargin.update.run - .transact(xas.write) + .transact(xas.writeCE) .void } } diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphClientSetup.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphClientSetup.scala index 2a0b199a67..2c4c87e2d4 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphClientSetup.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphClientSetup.scala @@ -13,7 +13,7 @@ import monix.execution.Scheduler import scala.concurrent.duration._ -object BlazegraphClientSetup { +object BlazegraphClientSetup extends Fixtures { def resource()(implicit s: Scheduler): Resource[Task, BlazegraphClient] = { for { @@ -25,7 +25,8 @@ object BlazegraphClientSetup { httpClient, s"http://${container.getHost}:${container.getMappedPort(9999)}/blazegraph", None, - 10.seconds + 10.seconds, + defaultProperties ) } } diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/Fixtures.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/Fixtures.scala index 9ad88b2d6d..717ca65a97 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/Fixtures.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/Fixtures.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.utils.CatsEffectsClasspathResourceUtils import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewValue import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.contexts.{blazegraph, blazegraphMetadata} import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary @@ -24,5 +25,10 @@ trait Fixtures { Vocabulary.contexts.search -> ContextValue.fromFile("contexts/search.json") ) + val defaultProperties: Map[String, String] = + CatsEffectsClasspathResourceUtils.ioPropertiesOf("blazegraph/index.properties").unsafeRunSync() + def alwaysValidate: ValidateBlazegraphView = (_: BlazegraphViewValue) => IO.unit } + +object Fixtures extends Fixtures diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClientSpec.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClientSpec.scala index 25ad10748d..bdd43a0e9d 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClientSpec.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClientSpec.scala @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client import akka.actor.ActorSystem import akka.http.scaladsl.model.Uri import akka.testkit.TestKit +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.Fixtures.defaultProperties import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.PatchStrategy._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlClientError.WrappedHttpClientError import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponse.SparqlResultsResponse @@ -22,16 +23,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescr import ch.epfl.bluebrain.nexus.delta.sdk.model.Name import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.testkit.blazegraph.BlazegraphDocker -import ch.epfl.bluebrain.nexus.testkit.scalatest.{EitherValues, TestMatchers} -import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsIOValues -import ch.epfl.bluebrain.nexus.testkit.TestHelpers -import ch.epfl.bluebrain.nexus.testkit.scalatest.bio.BIOValues +import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec import io.circe.Json import monix.execution.Scheduler import org.scalatest.concurrent.Eventually -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike -import org.scalatest.{CancelAfterFailure, DoNotDiscover, Inspectors, Suite} +import org.scalatest.{CancelAfterFailure, DoNotDiscover} import scala.concurrent.duration._ import scala.xml.Elem @@ -39,18 +35,10 @@ import scala.xml.Elem @DoNotDiscover class BlazegraphClientSpec(docker: BlazegraphDocker) extends TestKit(ActorSystem("BlazegraphClientSpec")) - with Suite - with AnyWordSpecLike - with Matchers + with CatsEffectSpec with ConfigFixtures - with EitherValues with CancelAfterFailure - with TestHelpers - with Eventually - with Inspectors - with TestMatchers - with BIOValues - with CatsIOValues { + with Eventually { implicit private val sc: Scheduler = Scheduler.global implicit private val httpCfg: HttpClientConfig = httpClientConfig @@ -59,7 +47,7 @@ class BlazegraphClientSpec(docker: BlazegraphDocker) private lazy val endpoint = docker.hostConfig.endpoint private lazy val client = - BlazegraphClient(HttpClient(), endpoint, None, 10.seconds) + BlazegraphClient(HttpClient(), endpoint, None, 10.seconds, defaultProperties) private lazy val graphId = endpoint / "graphs" / "myid" private def nTriples(id: String = genString(), label: String = genString(), value: String = genString()) = { diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/SparqlQueryClientDummy.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/SparqlQueryClientDummy.scala index 3647337346..2b2ab664a6 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/SparqlQueryClientDummy.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/SparqlQueryClientDummy.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client import akka.http.scaladsl.model.HttpHeader +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryClientDummy.bNode import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponse._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType._ @@ -7,7 +8,6 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.BNode import ch.epfl.bluebrain.nexus.delta.rdf.graph.NTriples import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery import io.circe.Json -import monix.bio.IO import scala.xml.NodeSeq @@ -23,7 +23,7 @@ class SparqlQueryClientDummy( q: SparqlQuery, responseType: Aux[R], additionalHeaders: Seq[HttpHeader] = Seq.empty - ): IO[SparqlClientError, R] = + ): IO[R] = responseType match { case SparqlResultsJson => IO.pure(SparqlResultsResponse(sparqlResults(indices))) diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSinkSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSinkSuite.scala index 931b3d3634..a495cffc2f 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSinkSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSinkSuite.scala @@ -1,6 +1,5 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri @@ -73,7 +72,6 @@ class BlazegraphSinkSuite private def query(namespace: String) = client .query(Set(namespace), constructQuery, SparqlQueryResponseType.SparqlNTriples) - .toCatsIO .map { response => Graph(response.value).toOption } test("Create the namespace") { @@ -127,7 +125,7 @@ class BlazegraphSinkSuite val expected = createGraph(Chunk(resource2Id -> resource2Ntriples, resource1Id -> resource1NtriplesUpdated)) for { - _ <- client.createNamespace(namespace).toCatsIO.assertEquals(true) + _ <- client.createNamespace(namespace).assertEquals(true) _ <- sink.apply(asElems(input)) _ <- query(namespace).assertSome(expected) } yield () @@ -150,7 +148,7 @@ class BlazegraphSinkSuite val expected = createGraph(Chunk.singleton(resource2Id -> resource2Ntriples)) for { - _ <- client.createNamespace(namespace).toCatsIO.assertEquals(true) + _ <- client.createNamespace(namespace).assertEquals(true) _ <- sink.apply(chunk) _ <- query(namespace).assertSome(expected) } yield () diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsQueryDummy.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsQueryDummy.scala index 188473ec69..c84adefa7b 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsQueryDummy.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsQueryDummy.scala @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes import cats.effect.IO import cats.implicits._ -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.Aux import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.{SparqlClientError, SparqlQueryClient, SparqlQueryResponse} @@ -47,7 +46,7 @@ private[routes] class BlazegraphViewsQueryDummy( for { view <- views.fetch(id, project) _ <- IO.raiseWhen(view.deprecated)(ViewIsDeprecated(view.id)) - response <- client.query(Set(id.toString), query, responseType).toCatsIO.adaptError { case e: SparqlClientError => + response <- client.query(Set(id.toString), query, responseType).adaptError { case e: SparqlClientError => WrappedBlazegraphClientError(e) } } yield response diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsQuerySpec.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsQuerySpec.scala index 5465563edd..32009dda9e 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsQuerySpec.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsQuerySpec.scala @@ -4,6 +4,7 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.Uri import akka.testkit.TestKit import cats.data.NonEmptySet +import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategyConfig.AlwaysGiveUp import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ @@ -42,7 +43,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label, ResourceRe import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture import ch.epfl.bluebrain.nexus.testkit.blazegraph.BlazegraphDocker import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec -import monix.bio.{IO => BIO} import monix.execution.Scheduler import org.scalatest.concurrent.Eventually import org.scalatest.{CancelAfterFailure, DoNotDiscover, Inspectors} @@ -63,7 +63,7 @@ class BlazegraphViewsQuerySpec(docker: BlazegraphDocker) implicit override def patienceConfig: PatienceConfig = PatienceConfig(6.seconds, 100.millis) private val noopSlowQueryLogger: BlazegraphSlowQueryLogger = new BlazegraphSlowQueryLogger { - override def apply[E, A](context: BlazegraphQueryContext, query: BIO[E, A]): BIO[E, A] = query + override def apply[A](context: BlazegraphQueryContext, query: IO[A]): IO[A] = query } implicit private val sc: Scheduler = Scheduler.global @@ -74,7 +74,7 @@ class BlazegraphViewsQuerySpec(docker: BlazegraphDocker) private lazy val endpoint = docker.hostConfig.endpoint private lazy val client = - BlazegraphClient(HttpClient(), endpoint, None, 10.seconds) + BlazegraphClient(HttpClient(), endpoint, None, 10.seconds, defaultProperties) private val realm = Label.unsafe("myrealm") implicit private val alice: Caller = Caller(User("Alice", realm), Set(User("Alice", realm), Group("users", realm))) diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryLoggerSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryLoggerSuite.scala index 4e8d65c79c..f4848cb40e 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryLoggerSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/slowqueries/BlazegraphSlowQueryLoggerSuite.scala @@ -1,5 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries +import cats.effect.IO +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViewsQuery.BlazegraphQueryContext import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.BlazegraphSlowQueryLoggerSuite._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.model.BlazegraphSlowQuery @@ -8,22 +10,21 @@ import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie -import ch.epfl.bluebrain.nexus.testkit.mu.bio.BioSuite -import monix.bio.Task +import ch.epfl.bluebrain.nexus.testkit.mu.ce.CatsEffectSuite import munit.AnyFixture import java.time.Instant -import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.{DurationInt, FiniteDuration} object BlazegraphSlowQueryLoggerSuite { private val LongQueryThreshold = 100.milliseconds private val StoreWhichFails: BlazegraphSlowQueryStore = new BlazegraphSlowQueryStore { - override def save(query: BlazegraphSlowQuery): Task[Unit] = - Task.raiseError(new RuntimeException("error saving slow log")) + override def save(query: BlazegraphSlowQuery): IO[Unit] = + IO.raiseError(new RuntimeException("error saving slow log")) - override def removeQueriesOlderThan(instant: Instant): Task[Unit] = Task.unit + override def removeQueriesOlderThan(instant: Instant): IO[Unit] = IO.unit - override def listForTestingOnly(view: ViewRef): Task[List[BlazegraphSlowQuery]] = Task.pure(Nil) + override def listForTestingOnly(view: ViewRef): IO[List[BlazegraphSlowQuery]] = IO.pure(Nil) } private val view = ViewRef(ProjectRef.unsafe("epfl", "blue-brain"), Iri.unsafe("hippocampus")) @@ -31,7 +32,7 @@ object BlazegraphSlowQueryLoggerSuite { private val user = Identity.User("Ted Lasso", Label.unsafe("epfl")) } -class BlazegraphSlowQueryLoggerSuite extends BioSuite with Doobie.Fixture with BlazegraphSlowQueryStoreFixture { +class BlazegraphSlowQueryLoggerSuite extends CatsEffectSuite with Doobie.Fixture with BlazegraphSlowQueryStoreFixture { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie, blazegraphSlowQueryStore) @@ -44,6 +45,15 @@ class BlazegraphSlowQueryLoggerSuite extends BioSuite with Doobie.Fixture with B (logger, store.listForTestingOnly(view)) } + private def assertSavedQuery(actual: BlazegraphSlowQuery, failed: Boolean, minDuration: FiniteDuration): Unit = { + assertEquals(actual.view, view) + assertEquals(actual.query, sparqlQuery) + assertEquals(actual.subject, user) + assertEquals(actual.failed, failed) + assertEquals(actual.instant, Instant.EPOCH) + assert(actual.duration >= minDuration) + } + test("slow query logged") { val (logSlowQuery, getLoggedQueries) = fixture @@ -55,12 +65,13 @@ class BlazegraphSlowQueryLoggerSuite extends BioSuite with Doobie.Fixture with B sparqlQuery, user ), - Task.sleep(101.milliseconds) + IO.sleep(101.milliseconds) ) saved <- getLoggedQueries } yield { assertEquals(saved.size, 1) - val onlyRecord = saved.head + assertSavedQuery(saved.head, failed = false, 101.millis) + val onlyRecord: BlazegraphSlowQuery = saved.head assertEquals(onlyRecord.view, view) assertEquals(onlyRecord.query, sparqlQuery) assertEquals(onlyRecord.subject, user) @@ -75,24 +86,19 @@ class BlazegraphSlowQueryLoggerSuite extends BioSuite with Doobie.Fixture with B val (logSlowQuery, getLoggedQueries) = fixture for { - _ <- logSlowQuery( - BlazegraphQueryContext( - view, - sparqlQuery, - user - ), - Task.sleep(101.milliseconds) >> Task.raiseError(new RuntimeException()) - ).failed - saved <- getLoggedQueries + maybeResult <- logSlowQuery( + BlazegraphQueryContext( + view, + sparqlQuery, + user + ), + IO.sleep(101.milliseconds) >> IO.raiseError(new RuntimeException()) + ).attempt + saved <- getLoggedQueries } yield { + assert(maybeResult.isLeft) assertEquals(saved.size, 1) - val onlyRecord = saved.head - assertEquals(onlyRecord.view, view) - assertEquals(onlyRecord.query, sparqlQuery) - assertEquals(onlyRecord.subject, user) - assertEquals(onlyRecord.failed, true) - assertEquals(onlyRecord.instant, Instant.EPOCH) - assert(onlyRecord.duration > 100.milliseconds) + assertSavedQuery(saved.head, failed = true, 101.millis) } } @@ -107,7 +113,7 @@ class BlazegraphSlowQueryLoggerSuite extends BioSuite with Doobie.Fixture with B sparqlQuery, user ), - Task.sleep(50.milliseconds) + IO.sleep(50.milliseconds) ) saved <- getLoggedQueries } yield { @@ -127,7 +133,7 @@ class BlazegraphSlowQueryLoggerSuite extends BioSuite with Doobie.Fixture with B sparqlQuery, user ), - Task.sleep(101.milliseconds).as("result") - ).assert("result") + IO.sleep(101.milliseconds).as("result") + ).assertEquals("result") } } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/BlazegraphQuery.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/BlazegraphQuery.scala index 79861217e5..c54bba32f4 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/BlazegraphQuery.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/BlazegraphQuery.scala @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews import cats.effect.IO import cats.syntax.all._ -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.Aux import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client._ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef @@ -120,7 +119,9 @@ object BlazegraphQuery { AuthorizationFailed(s"Defined permissions on sparql projection on '${view.ref}' are missing.") ) namespace = commonNamespace(view.uuid, view.indexingRev, prefix) - result <- client.query(Set(namespace), query, responseType).mapError(WrappedBlazegraphClientError) + result <- client.query(Set(namespace), query, responseType).adaptError { case e: SparqlClientError => + WrappedBlazegraphClientError(e) + } } yield result override def query[R <: SparqlQueryResponse]( @@ -136,7 +137,9 @@ object BlazegraphQuery { _ <- aclCheck.authorizeForOr(project, projection.permission)(AuthorizationFailed(project, projection.permission)) namespace = projectionNamespace(projection, view.uuid, prefix) - result <- client.query(Set(namespace), query, responseType).mapError(WrappedBlazegraphClientError) + result <- client.query(Set(namespace), query, responseType).adaptError { case e: SparqlClientError => + WrappedBlazegraphClientError(e) + } } yield result override def queryProjections[R <: SparqlQueryResponse]( @@ -148,7 +151,9 @@ object BlazegraphQuery { for { view <- fetchView(id, project) namespaces <- allowedProjections(view, project) - result <- client.query(namespaces, query, responseType).mapError(WrappedBlazegraphClientError) + result <- client.query(namespaces, query, responseType).adaptError { case e: SparqlClientError => + WrappedBlazegraphClientError(e) + } } yield result private def fetchProjection(view: ActiveViewDef, projectionId: IdSegment) = diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala index 22195ea3a9..51dbe94e95 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala @@ -65,7 +65,7 @@ final class Single[SinkFormat]( private def queryTransform: GraphResource => IO[Option[SinkFormat]] = gr => for { - graph <- queryGraph(gr).toCatsIO + graph <- queryGraph(gr) transformed <- graph.flatTraverse(transform) } yield transformed @@ -115,7 +115,7 @@ final class Batch[SinkFormat]( /** Performs the sparql query only using [[SuccessElem]]s from the chunk */ private def query(elements: Chunk[Elem[GraphResource]]): IO[Option[Graph]] = elements.mapFilter(elem => elem.map(_.id).toOption) match { - case ids if ids.nonEmpty => queryGraph(ids).toCatsIO + case ids if ids.nonEmpty => queryGraph(ids) case _ => IO.none } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala index 1edd331b57..8fd731d4aa 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala @@ -6,6 +6,7 @@ import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.DefaultProperties import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.client.DeltaClient import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.deletion.CompositeViewsDeletionTask @@ -72,13 +73,15 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { ( cfg: CompositeViewsConfig, client: HttpClient @Id("http-indexing-client"), - as: ActorSystem[Nothing] + as: ActorSystem[Nothing], + properties: DefaultProperties ) => BlazegraphClient( client, cfg.blazegraphAccess.base, cfg.blazegraphAccess.credentials, - cfg.blazegraphAccess.queryTimeout + cfg.blazegraphAccess.queryTimeout, + properties.value )(as.classicSystem) } @@ -86,13 +89,15 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { ( cfg: CompositeViewsConfig, client: HttpClient @Id("http-query-client"), - as: ActorSystem[Nothing] + as: ActorSystem[Nothing], + properties: DefaultProperties ) => BlazegraphClient( client, cfg.blazegraphAccess.base, cfg.blazegraphAccess.credentials, - cfg.blazegraphAccess.queryTimeout + cfg.blazegraphAccess.queryTimeout, + properties.value )(as.classicSystem) } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala index b8dcda686b..0961242488 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.SparqlNTriples import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.idTemplating @@ -9,7 +9,6 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.graph.{Graph, NTriples} import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery import fs2.Chunk -import monix.bio.Task import java.util.regex.Pattern.quote @@ -27,16 +26,16 @@ final class BatchQueryGraph(client: BlazegraphClient, namespace: String, query: private val logger = Logger[BatchQueryGraph] - private def newGraph(ntriples: NTriples): Task[Option[Graph]] = - if (ntriples.isEmpty) Task.none - else Task.fromEither(Graph(ntriples)).map(Some(_)) + private def newGraph(ntriples: NTriples): IO[Option[Graph]] = + if (ntriples.isEmpty) IO.none + else IO.fromEither(Graph(ntriples)).map(Some(_)) - def apply(ids: Chunk[Iri]): Task[Option[Graph]] = + def apply(ids: Chunk[Iri]): IO[Option[Graph]] = for { ntriples <- client.query(Set(namespace), replaceIds(query, ids), SparqlNTriples) graphResult <- newGraph(ntriples.value) - _ <- Task.when(graphResult.isEmpty)( - logger.debug(s"Querying blazegraph did not return any triples, '$ids' will be dropped.").toUIO + _ <- IO.whenA(graphResult.isEmpty)( + logger.debug(s"Querying blazegraph did not return any triples, '$ids' will be dropped.") ) } yield graphResult diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala index b038c4f339..7741f6618f 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing import cats.effect.IO import cats.implicits.catsSyntaxFlatMapOps import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection @@ -43,21 +42,21 @@ object CompositeSpaces { ): CompositeSpaces = new CompositeSpaces { override def init(view: ActiveViewDef): IO[Unit] = { val common = commonNamespace(view.uuid, view.indexingRev, prefix) - val createCommon = blazeClient.createNamespace(common).toCatsIO.void + val createCommon = blazeClient.createNamespace(common).void val result = view.value.projections.foldLeft[IO[Unit]](createCommon) { case (acc, e: ElasticSearchProjection) => val index = projectionIndex(e, view.uuid, prefix) acc >> esClient.createIndex(index, Some(e.mapping), e.settings).void case (acc, s: SparqlProjection) => val namespace = projectionNamespace(s, view.uuid, prefix) - acc >> blazeClient.createNamespace(namespace).toCatsIO.void + acc >> blazeClient.createNamespace(namespace).void } logger.debug(s"Creating namespaces and indices for composite view ${view.ref}") >> result } override def destroyAll(view: ActiveViewDef): IO[Unit] = { val common = commonNamespace(view.uuid, view.indexingRev, prefix) - val deleteCommon = blazeClient.deleteNamespace(common).toCatsIO.void + val deleteCommon = blazeClient.deleteNamespace(common).void val result = view.value.projections.foldLeft[IO[Unit]](deleteCommon) { case (acc, p) => acc >> destroyProjection(view, p) } @@ -72,7 +71,7 @@ object CompositeSpaces { esClient.deleteIndex(index).void case s: SparqlProjection => val namespace = projectionNamespace(s, view.uuid, prefix) - blazeClient.deleteNamespace(namespace).toCatsIO.void + blazeClient.deleteNamespace(namespace).void } } } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala index b183e17bac..dd14fe85e3 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.SparqlNTriples import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.idTemplating @@ -9,7 +9,6 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.graph.{Graph, NTriples} import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource -import monix.bio.Task import java.util.regex.Pattern.quote @@ -27,21 +26,21 @@ final class SingleQueryGraph(client: BlazegraphClient, namespace: String, query: private val logger = Logger[SingleQueryGraph] - private def newGraph(ntriples: NTriples, id: Iri): Task[Option[Graph]] = + private def newGraph(ntriples: NTriples, id: Iri): IO[Option[Graph]] = if (ntriples.isEmpty) { // If nothing is returned by the query, we skip - Task.none + IO.none } else - Task.fromEither(Graph(ntriples.copy(rootNode = id))).map { g => + IO.fromEither(Graph(ntriples.copy(rootNode = id))).map { g => Some(g.replaceRootNode(id)) } - def apply(graphResource: GraphResource): Task[Option[GraphResource]] = + def apply(graphResource: GraphResource): IO[Option[GraphResource]] = for { ntriples <- client.query(Set(namespace), replaceId(query, graphResource.id), SparqlNTriples) graphResult <- newGraph(ntriples.value, graphResource.id) - _ <- Task.when(graphResult.isEmpty)( - logger.debug(s"Querying blazegraph did not return any triples, '$graphResource' will be dropped.").toUIO + _ <- IO.whenA(graphResult.isEmpty)( + logger.debug(s"Querying blazegraph did not return any triples, '$graphResource' will be dropped.") ) } yield graphResult.map(g => graphResource.copy(graph = g)) diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala index a788d7543d..f610f81013 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala @@ -337,13 +337,13 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst for { // Initialise the namespaces and indices _ <- spaces.init(view) - _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true) - _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(true) + _ <- bgClient.existsNamespace(commonNs).assertEquals(true) + _ <- bgClient.existsNamespace(sparqlNamespace).assertEquals(true) _ <- esClient.existsIndex(elasticIndex).assertEquals(true) // Delete them on destroy _ <- spaces.destroyAll(view) - _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(false) - _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(false) + _ <- bgClient.existsNamespace(commonNs).assertEquals(false) + _ <- bgClient.existsNamespace(sparqlNamespace).assertEquals(false) _ <- esClient.existsIndex(elasticIndex).assertEquals(false) } yield () } @@ -362,18 +362,18 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst for { // Initialise the namespaces and indices _ <- spaces.init(view) - _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true) - _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(true) + _ <- bgClient.existsNamespace(commonNs).assertEquals(true) + _ <- bgClient.existsNamespace(sparqlNamespace).assertEquals(true) _ <- esClient.existsIndex(elasticIndex).assertEquals(true) // Delete the blazegraph projection _ <- spaces.destroyProjection(view, blazegraphProjection) - _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true) - _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(false) + _ <- bgClient.existsNamespace(commonNs).assertEquals(true) + _ <- bgClient.existsNamespace(sparqlNamespace).assertEquals(false) _ <- esClient.existsIndex(elasticIndex).assertEquals(true) // Delete the elasticsearch projection _ <- spaces.destroyProjection(view, elasticSearchProjection) - _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true) - _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(false) + _ <- bgClient.existsNamespace(commonNs).assertEquals(true) + _ <- bgClient.existsNamespace(sparqlNamespace).assertEquals(false) _ <- esClient.existsIndex(elasticIndex).assertEquals(false) } yield () } @@ -436,7 +436,7 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst resultMuse, resultRedHot ).eventually(()) - _ <- checkBlazegraphTriples(sparqlNamespace, contentOf("indexing/result.nt")).toCatsIO + _ <- checkBlazegraphTriples(sparqlNamespace, contentOf("indexing/result.nt")) } yield () } @@ -475,7 +475,7 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst resultMuseMetadata, resultRedHotMetadata ).eventually(()) - _ <- checkBlazegraphTriples(sparqlNamespace, contentOf("indexing/result_metadata.nt")).toCatsIO.eventually(()) + _ <- checkBlazegraphTriples(sparqlNamespace, contentOf("indexing/result_metadata.nt")).eventually(()) } yield () } @@ -521,7 +521,7 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst resultMuse, resultRedHot ).eventually(()) - _ <- checkBlazegraphTriples(sparqlNamespace, contentOf("indexing/result.nt")).toCatsIO.eventually(()) + _ <- checkBlazegraphTriples(sparqlNamespace, contentOf("indexing/result.nt")).eventually(()) } yield () for { diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/BlazegraphQueryDummy.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/BlazegraphQueryDummy.scala index bfe3cd6be7..fec3f94fce 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/BlazegraphQueryDummy.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/BlazegraphQueryDummy.scala @@ -1,9 +1,9 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ +import cats.implicits.catsSyntaxMonadError import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.Aux -import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.{SparqlQueryClient, SparqlQueryResponse} +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.{SparqlClientError, SparqlQueryClient, SparqlQueryResponse} import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewRejection.{ViewIsDeprecated, WrappedBlazegraphClientError} import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.{BlazegraphQuery, CompositeViews} import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery @@ -22,7 +22,9 @@ class BlazegraphQueryDummy(client: SparqlQueryClient, views: CompositeViews) ext for { view <- views.fetch(id, project) _ <- IO.raiseWhen(view.deprecated)(ViewIsDeprecated(view.id)) - res <- client.query(Set("queryCommonNs"), query, responseType).mapError(WrappedBlazegraphClientError) + res <- client.query(Set("queryCommonNs"), query, responseType).adaptError { case e: SparqlClientError => + WrappedBlazegraphClientError(e) + } } yield res override def query[R <: SparqlQueryResponse]( @@ -35,7 +37,9 @@ class BlazegraphQueryDummy(client: SparqlQueryClient, views: CompositeViews) ext for { view <- views.fetch(id, project) _ <- IO.raiseWhen(view.deprecated)(ViewIsDeprecated(view.id)) - res <- client.query(Set("queryProjection"), query, responseType).mapError(WrappedBlazegraphClientError) + res <- client.query(Set("queryProjection"), query, responseType).adaptError { case e: SparqlClientError => + WrappedBlazegraphClientError(e) + } } yield res override def queryProjections[R <: SparqlQueryResponse]( @@ -47,7 +51,9 @@ class BlazegraphQueryDummy(client: SparqlQueryClient, views: CompositeViews) ext for { view <- views.fetch(id, project) _ <- IO.raiseWhen(view.deprecated)(ViewIsDeprecated(view.id)) - res <- client.query(Set("queryProjections"), query, responseType).mapError(WrappedBlazegraphClientError) + res <- client.query(Set("queryProjections"), query, responseType).adaptError { case e: SparqlClientError => + WrappedBlazegraphClientError(e) + } } yield res } diff --git a/delta/plugins/search/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/SearchSparqlQuerySpec.scala b/delta/plugins/search/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/SearchSparqlQuerySpec.scala index bd9defd8c1..f60698f399 100644 --- a/delta/plugins/search/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/SearchSparqlQuerySpec.scala +++ b/delta/plugins/search/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/SearchSparqlQuerySpec.scala @@ -4,6 +4,7 @@ import akka.actor.ActorSystem import akka.testkit.TestKit import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.Fixtures.defaultProperties import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.SparqlNTriples import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode @@ -17,32 +18,23 @@ import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientConfig} import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.testkit.blazegraph.BlazegraphDocker -import ch.epfl.bluebrain.nexus.testkit.scalatest.{EitherValues, TestMatchers} -import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsIOValues -import ch.epfl.bluebrain.nexus.testkit.TestHelpers -import ch.epfl.bluebrain.nexus.testkit.scalatest.bio.BIOValues +import ch.epfl.bluebrain.nexus.testkit.scalatest.EitherValues +import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.{CatsEffectSpec, CatsIOValues} import io.circe.Json import monix.execution.Scheduler +import org.scalatest.CancelAfterFailure import org.scalatest.concurrent.Eventually -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike -import org.scalatest.{CancelAfterFailure, Inspectors} import java.util.regex.Pattern.quote import scala.concurrent.duration._ class SearchSparqlQuerySpec extends TestKit(ActorSystem("SearchSparqlQuerySpec")) - with AnyWordSpecLike - with Matchers + with CatsEffectSpec with ConfigFixtures with EitherValues with CancelAfterFailure - with TestHelpers with Eventually - with Inspectors - with TestMatchers - with BIOValues with CatsIOValues with BlazegraphDocker { @@ -58,7 +50,7 @@ class SearchSparqlQuerySpec ) private lazy val endpoint = hostConfig.endpoint - private lazy val client = BlazegraphClient(HttpClient(), endpoint, None, 10.seconds) + private lazy val client = BlazegraphClient(HttpClient(), endpoint, None, 10.seconds, defaultProperties) private def toNTriples(json: Json): NTriples = { for { @@ -94,7 +86,7 @@ class SearchSparqlQuerySpec val q = contentOf("construct-query.sparql").replaceAll(quote("{resource_id}"), traceId.rdfFormat) val query = SparqlConstructQuery(q).rightValue val compacted = for { - ntriples <- toCatsIO(client.query(Set(index), query, SparqlNTriples)) + ntriples <- client.query(Set(index), query, SparqlNTriples) graph <- IO.fromEither(Graph(ntriples.value.copy(rootNode = traceId))) compacted <- graph.toCompactedJsonLd(ctx) } yield compacted diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClient.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClient.scala index 9092b8419b..8ac97cb9cb 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClient.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClient.scala @@ -9,14 +9,15 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller import akka.stream.StreamTcpException import akka.util.ByteString -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.{toCatsIOOps, toMonixBIOOps} import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling._ import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError._ import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import io.circe.{Decoder, Json} -import monix.bio.{IO, Task, UIO} +import monix.bio.{IO => BIO, Task, UIO} +import cats.effect.IO import monix.execution.Scheduler import java.net.UnknownHostException @@ -34,13 +35,13 @@ trait HttpClient { */ def apply[A](req: HttpRequest)(handleResponse: PartialFunction[HttpResponse, HttpResult[A]]): HttpResult[A] - def run[A](req: HttpRequest)(handleResponse: PartialFunction[HttpResponse, cats.effect.IO[A]]): HttpResult[A] = - apply(req) { case r if handleResponse.isDefinedAt(r) => handleResponse(r).toBIO[HttpClientError] } + def run[A](req: HttpRequest)(handleResponse: PartialFunction[HttpResponse, IO[A]]): IO[A] = + apply(req) { case r if handleResponse.isDefinedAt(r) => handleResponse(r).toBIO[HttpClientError] }.toCatsIO /** * Execute the argument request and unmarshal the response Json response. */ - def toJson(req: HttpRequest): HttpResult[Json] = + def toJson(req: HttpRequest): HttpResult[Json] = fromJsonTo[Json](req) /** @@ -69,7 +70,7 @@ trait HttpClient { object HttpClient { - type HttpResult[A] = IO[HttpClientError, A] + type HttpResult[A] = BIO[HttpClientError, A] private val acceptEncoding = AcceptEncoding.create(HttpEncodingRange.create(HttpEncodings.gzip), HttpEncodingRange.create(HttpEncodings.deflate)) @@ -103,12 +104,12 @@ object HttpClient { )(implicit httpConfig: HttpClientConfig, as: ActorSystem, scheduler: Scheduler): HttpClient = new HttpClient { - private def decodeResponse(req: HttpRequest, response: HttpResponse): IO[InvalidEncoding, HttpResponse] = { + private def decodeResponse(req: HttpRequest, response: HttpResponse): BIO[InvalidEncoding, HttpResponse] = { val decoder = response.encoding match { - case HttpEncodings.gzip => IO.pure(Coders.Gzip) - case HttpEncodings.deflate => IO.pure(Coders.Deflate) - case HttpEncodings.identity => IO.pure(Coders.NoCoding) - case encoding => IO.raiseError(InvalidEncoding(req, encoding)) + case HttpEncodings.gzip => BIO.pure(Coders.Gzip) + case HttpEncodings.deflate => BIO.pure(Coders.Deflate) + case HttpEncodings.identity => BIO.pure(Coders.NoCoding) + case encoding => BIO.raiseError(InvalidEncoding(req, encoding)) } decoder.map(_.decodeMessage(response)) } @@ -154,7 +155,7 @@ object HttpClient { override def discardBytes[A](req: HttpRequest, returnValue: => A): HttpResult[A] = apply(req) { case resp if resp.status.isSuccess() => - UIO.delay(resp.discardEntityBytes()) >> IO.pure(returnValue) + UIO.delay(resp.discardEntityBytes()) >> BIO.pure(returnValue) } private def consumeEntity[A](req: HttpRequest, resp: HttpResponse): HttpResult[A] = @@ -163,8 +164,8 @@ object HttpClient { resp.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String) ) .redeemWith( - error => IO.raiseError(HttpUnexpectedError(req, error.getMessage)), - consumedString => IO.raiseError(HttpClientError(req, resp.status, consumedString)) + error => BIO.raiseError(HttpUnexpectedError(req, error.getMessage)), + consumedString => BIO.raiseError(HttpClientError(req, resp.status, consumedString)) ) } diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/IOFixedClock.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/IOFixedClock.scala index 9a036851a3..1c06e195fd 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/IOFixedClock.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/IOFixedClock.scala @@ -13,12 +13,13 @@ trait IOFixedClock { override def monotonic(unit: TimeUnit): UIO[Long] = UIO.pure(instant.toEpochMilli) } + private val realClock: Clock[IO] = Clock.create + implicit def bioClock: Clock[UIO] = bioClock(Instant.EPOCH) def ceClock(instant: Instant): Clock[IO] = new Clock[IO] { - override def realTime(unit: TimeUnit): IO[Long] = IO.pure(instant.toEpochMilli) - - override def monotonic(unit: TimeUnit): IO[Long] = IO.pure(instant.toEpochMilli) + override def realTime(unit: TimeUnit): IO[Long] = IO.pure(instant.toEpochMilli) + override def monotonic(unit: TimeUnit): IO[Long] = realClock.monotonic(unit) } implicit def ceClock: Clock[IO] = ceClock(Instant.EPOCH)