Skip to content

Commit

Permalink
Migrate Blazegraph module to Cats Effect (#4478)
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb authored Nov 8, 2023
1 parent 6f96c00 commit 33cf07d
Show file tree
Hide file tree
Showing 34 changed files with 272 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,6 @@ trait ClasspathResourceUtils {
props.asScala.toMap
}

final def bioPropertiesOf(resourcePath: String)(implicit
classLoader: ClassLoader
): BIO[ClasspathResourceError, Map[String, String]] =
bioStreamOf(resourcePath).map { is =>
val props = new Properties()
props.load(is)
props.asScala.toMap
}

/**
* Loads the content of the argument classpath resource as a string and replaces all the key matches of the
* ''replacements'' with their values. The resulting string is parsed into a json value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import monix.bio.{Task, UIO}
import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{DurationLong, FiniteDuration, MILLISECONDS}

trait IOUtils {

Expand All @@ -22,6 +23,13 @@ object IOUtils extends IOUtils
trait IOInstant {
def now(implicit clock: Clock[IO]): IO[Instant] =
clock.realTime(TimeUnit.MILLISECONDS).map(Instant.ofEpochMilli)

def timed[A](io: IO[A])(implicit c: Clock[IO]): IO[(A, FiniteDuration)] =
for {
start <- c.monotonic(MILLISECONDS)
result <- io
finish <- c.monotonic(MILLISECONDS)
} yield (result, (finish - start).millis)
}

object IOInstant extends IOInstant
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, BlazegraphViewType}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, JsonLdContext, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.Configuration
import monix.bio.Task

private[blazegraph] object BlazegraphDecoderConfiguration {

def apply(implicit jsonLdApi: JsonLdApi, rcr: RemoteContextResolution): Task[Configuration] = for {
contextValue <- Task.delay { ContextValue(contexts.blazegraph) }
jsonLdContext <- JsonLdContext(contextValue)
def apply(implicit jsonLdApi: JsonLdApi, rcr: RemoteContextResolution): IO[Configuration] = for {
contextValue <- IO.delay { ContextValue(contexts.blazegraph) }
jsonLdContext <- JsonLdContext(contextValue).toCatsIO
} yield {
val enhancedJsonLdContext = jsonLdContext
.addAliasIdType("IndexingBlazegraphViewValue", BlazegraphViewType.IndexingBlazegraphView.tpe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph
import akka.actor.typed.ActorSystem
import cats.effect.{Clock, ContextShift, IO, Timer}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{CatsEffectsClasspathResourceUtils, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphCoordinator
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent, DefaultProperties}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphSlowQueryDeleter, BlazegraphSlowQueryLogger, BlazegraphSlowQueryStore}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
Expand Down Expand Up @@ -48,6 +48,10 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {

make[BlazegraphViewsConfig].from { BlazegraphViewsConfig.load(_) }

make[DefaultProperties].fromEffect {
CatsEffectsClasspathResourceUtils.ioPropertiesOf("blazegraph/index.properties").map(DefaultProperties)
}

make[HttpClient].named("http-indexing-client").from {
(cfg: BlazegraphViewsConfig, as: ActorSystem[Nothing], sc: Scheduler) =>
HttpClient()(cfg.indexingClient, as.classicSystem, sc)
Expand All @@ -69,17 +73,18 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
)(timer)
}

make[BlazegraphSlowQueryLogger].from { (cfg: BlazegraphViewsConfig, store: BlazegraphSlowQueryStore) =>
BlazegraphSlowQueryLogger(store, cfg.slowQueries.slowQueryThreshold)
make[BlazegraphSlowQueryLogger].from { (cfg: BlazegraphViewsConfig, store: BlazegraphSlowQueryStore, c: Clock[IO]) =>
BlazegraphSlowQueryLogger(store, cfg.slowQueries.slowQueryThreshold)(c)
}

make[BlazegraphClient].named("blazegraph-indexing-client").from {
(
cfg: BlazegraphViewsConfig,
client: HttpClient @Id("http-indexing-client"),
as: ActorSystem[Nothing]
as: ActorSystem[Nothing],
properties: DefaultProperties
) =>
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem)
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout, properties.value)(as.classicSystem)
}

make[HttpClient].named("http-query-client").from {
Expand All @@ -91,9 +96,10 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
(
cfg: BlazegraphViewsConfig,
client: HttpClient @Id("http-query-client"),
as: ActorSystem[Nothing]
as: ActorSystem[Nothing],
properties: DefaultProperties
) =>
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem)
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout, properties.value)(as.classicSystem)
}

make[ValidateBlazegraphView].from {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.sdk.ServiceDependency
import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOInstant, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.{BlazegraphClient, SparqlClientError}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphView.IndexingBlazegraphView
Expand Down Expand Up @@ -542,9 +542,8 @@ object BlazegraphViews {
case i: IndexingBlazegraphView =>
client
.createNamespace(BlazegraphViews.namespace(i, prefix))
.mapError(WrappedBlazegraphClientError.apply)
.adaptError { case e: SparqlClientError => WrappedBlazegraphClientError(e) }
.void
.toCatsIO
case _ => IO.unit
}
apply(fetchContext, contextResolution, validate, createNameSpace, eventLogConfig, prefix, xas)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils.ioContentOf
Expand Down Expand Up @@ -182,19 +183,19 @@ object BlazegraphViewsQuery {
AuthorizationFailed(i.ref.project, i.permission)
)
.as(Set(i.index))
.toBIO[BlazegraphViewRejection]
case a: AggregateView =>
aclCheck
.mapFilter[IndexingView, String](
a.views,
v => ProjectAcl(v.ref.project) -> v.permission,
_.index
)
.toUIO
}
qr <- logSlowQueries(
BlazegraphQueryContext(ViewRef.apply(project, iri), query, caller.subject),
client.query(indices, query, responseType).mapError(WrappedBlazegraphClientError)
client.query(indices, query, responseType).adaptError { case e: SparqlClientError =>
WrappedBlazegraphClientError(e)
}
)
} yield qr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ import akka.http.scaladsl.model.headers.{BasicHttpCredentials, HttpCredentials,
import akka.http.scaladsl.model.{HttpEntity, HttpHeader, Uri}
import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller
import akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers.stringUnmarshaller
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient.timeoutHeader
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlClientError.{InvalidCountRequest, WrappedHttpClientError}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.{Aux, SparqlResultsJson}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig.Credentials
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient
import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientError}
import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription
import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription.ResolvedServiceDescription
import ch.epfl.bluebrain.nexus.delta.sdk.model.Name
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import monix.bio.{IO, UIO}

import scala.concurrent.duration._

Expand All @@ -29,24 +30,20 @@ import scala.concurrent.duration._
class BlazegraphClient(
client: HttpClient,
endpoint: Uri,
queryTimeout: Duration
queryTimeout: Duration,
defaultProperties: Map[String, String]
)(implicit credentials: Option[HttpCredentials], as: ActorSystem)
extends SparqlClient(client, SparqlQueryEndpoint.blazegraph(endpoint)) {

implicit private val cl: ClassLoader = getClass.getClassLoader

private val serviceVersion = """(buildVersion">)([^<]*)""".r
private val serviceName = Name.unsafe("blazegraph")

private val defaultProperties =
ClasspathResourceUtils.bioPropertiesOf("blazegraph/index.properties").hideErrors.memoizeOnSuccess

override def query[R <: SparqlQueryResponse](
indices: Iterable[String],
q: SparqlQuery,
responseType: Aux[R],
additionalHeaders: Seq[HttpHeader]
): IO[SparqlClientError, R] = {
): IO[R] = {
val headers = queryTimeout match {
case finite: FiniteDuration => additionalHeaders :+ RawHeader(timeoutHeader, finite.toMillis.toString)
case _ => additionalHeaders
Expand All @@ -57,10 +54,11 @@ class BlazegraphClient(
/**
* Fetches the service description information (name and version)
*/
def serviceDescription: UIO[ServiceDescription] =
def serviceDescription: IO[ServiceDescription] =
client
.fromEntityTo[ResolvedServiceDescription](Get(endpoint / "status"))
.timeout(5.seconds)
.toCatsIO
.redeem(
_ => ServiceDescription.unresolved(serviceName),
_.map(_.copy(name = serviceName)).getOrElse(ServiceDescription.unresolved(serviceName))
Expand All @@ -69,11 +67,13 @@ class BlazegraphClient(
/**
* Check whether the passed namespace ''namespace'' exists.
*/
def existsNamespace(namespace: String): IO[SparqlClientError, Boolean] =
client(Get(endpoint / "namespace" / namespace)) {
case resp if resp.status == OK => UIO.delay(resp.discardEntityBytes()) >> IO.pure(true)
case resp if resp.status == NotFound => UIO.delay(resp.discardEntityBytes()) >> IO.pure(false)
}.mapError(WrappedHttpClientError)
def existsNamespace(namespace: String): IO[Boolean] =
client
.run(Get(endpoint / "namespace" / namespace)) {
case resp if resp.status == OK => IO.delay(resp.discardEntityBytes()).as(true)
case resp if resp.status == NotFound => IO.delay(resp.discardEntityBytes()).as(false)
}
.adaptError { case e: HttpClientError => WrappedHttpClientError(e) }

/**
* Attempts to create a namespace (if it doesn't exist) recovering gracefully when the namespace already exists.
Expand All @@ -85,17 +85,19 @@ class BlazegraphClient(
* @return
* ''true'' wrapped on an IO when namespace has been created and ''false'' wrapped on an IO when it already existed
*/
def createNamespace(namespace: String, properties: Map[String, String]): IO[SparqlClientError, Boolean] =
def createNamespace(namespace: String, properties: Map[String, String]): IO[Boolean] =
existsNamespace(namespace).flatMap {
case true => IO.pure(false)
case false =>
val updated = properties + ("com.bigdata.rdf.sail.namespace" -> namespace)
val payload = updated.map { case (key, value) => s"$key=$value" }.mkString("\n")
val req = Post(endpoint / "namespace", HttpEntity(payload))
client(req) {
case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> IO.pure(true)
case resp if resp.status == Conflict => UIO.delay(resp.discardEntityBytes()) >> IO.pure(false)
}.mapError(WrappedHttpClientError)
client
.run(req) {
case resp if resp.status.isSuccess() => IO.delay(resp.discardEntityBytes()).as(true)
case resp if resp.status == Conflict => IO.delay(resp.discardEntityBytes()).as(false)
}
.adaptError { case e: HttpClientError => WrappedHttpClientError(e) }
}

/**
Expand All @@ -107,25 +109,26 @@ class BlazegraphClient(
* @return
* ''true'' wrapped on an IO when namespace has been created and ''false'' wrapped on an IO when it already existed
*/
def createNamespace(namespace: String): IO[SparqlClientError, Boolean] =
defaultProperties.flatMap(createNamespace(namespace, _))
def createNamespace(namespace: String): IO[Boolean] = createNamespace(namespace, defaultProperties)

/**
* Attempts to delete a namespace recovering gracefully when the namespace does not exists.
*
* @return
* ''true'' wrapped in ''F'' when namespace has been deleted and ''false'' wrapped in ''F'' when it does not existe
*/
def deleteNamespace(namespace: String): IO[SparqlClientError, Boolean] =
client(Delete(endpoint / "namespace" / namespace)) {
case resp if resp.status == OK => UIO.delay(resp.discardEntityBytes()) >> IO.pure(true)
case resp if resp.status == NotFound => UIO.delay(resp.discardEntityBytes()) >> IO.pure(false)
}.mapError(WrappedHttpClientError)
def deleteNamespace(namespace: String): IO[Boolean] =
client
.run(Delete(endpoint / "namespace" / namespace)) {
case resp if resp.status == OK => IO.delay(resp.discardEntityBytes()).as(true)
case resp if resp.status == NotFound => IO.delay(resp.discardEntityBytes()).as(false)
}
.adaptError { case e: HttpClientError => WrappedHttpClientError(e) }

/**
* Count all the triples on an index
*/
def count(index: String): IO[SparqlClientError, Long] = {
def count(index: String): IO[Long] = {
val sparqlQuery = SparqlConstructQuery.unsafe("SELECT (COUNT(?s) AS ?count) WHERE { ?s ?p ?o }")
query(Set(index), sparqlQuery, SparqlResultsJson)
.flatMap { response =>
Expand All @@ -135,7 +138,7 @@ class BlazegraphClient(
count <- countAsString.value.toLongOption
} yield count

IO.fromOption(count, InvalidCountRequest(index, sparqlQuery.value))
IO.fromOption(count)(InvalidCountRequest(index, sparqlQuery.value))
}
}

Expand Down Expand Up @@ -163,10 +166,11 @@ object BlazegraphClient {
client: HttpClient,
endpoint: Uri,
credentials: Option[Credentials],
queryTimeout: Duration
queryTimeout: Duration,
defaultProperties: Map[String, String]
)(implicit as: ActorSystem): BlazegraphClient = {
implicit val cred: Option[BasicHttpCredentials] =
credentials.map { cred => BasicHttpCredentials(cred.username, cred.password.value) }
new BlazegraphClient(client, endpoint, queryTimeout)
new BlazegraphClient(client, endpoint, queryTimeout, defaultProperties)
}
}
Loading

0 comments on commit 33cf07d

Please sign in to comment.