Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Blazegraph module to Cats Effect #4478

Merged
merged 11 commits into from
Nov 8, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,6 @@ trait ClasspathResourceUtils {
props.asScala.toMap
}

final def bioPropertiesOf(resourcePath: String)(implicit
classLoader: ClassLoader
): BIO[ClasspathResourceError, Map[String, String]] =
bioStreamOf(resourcePath).map { is =>
val props = new Properties()
props.load(is)
props.asScala.toMap
}

/**
* Loads the content of the argument classpath resource as a string and replaces all the key matches of the
* ''replacements'' with their values. The resulting string is parsed into a json value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import monix.bio.{Task, UIO}
import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{DurationLong, FiniteDuration, MILLISECONDS}

trait IOUtils {

Expand All @@ -22,6 +23,13 @@ object IOUtils extends IOUtils
trait IOInstant {
def now(implicit clock: Clock[IO]): IO[Instant] =
clock.realTime(TimeUnit.MILLISECONDS).map(Instant.ofEpochMilli)

def timed[A](io: IO[A])(implicit c: Clock[IO]): IO[(A, FiniteDuration)] =
for {
start <- c.monotonic(MILLISECONDS)
result <- io
finish <- c.monotonic(MILLISECONDS)
} yield (result, (finish - start).millis)
Comment on lines +27 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth a test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reluctant since it's essentially copied stdlib code which you wouldn't normally test... Kinda covered in those slow query tests as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think copied code is still worth testing if we rely on it. Tests are there to make sure it works going forward as well as now. It should be super easy to write

}

object IOInstant extends IOInstant
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, BlazegraphViewType}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, JsonLdContext, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.Configuration
import monix.bio.Task

private[blazegraph] object BlazegraphDecoderConfiguration {

def apply(implicit jsonLdApi: JsonLdApi, rcr: RemoteContextResolution): Task[Configuration] = for {
contextValue <- Task.delay { ContextValue(contexts.blazegraph) }
jsonLdContext <- JsonLdContext(contextValue)
def apply(implicit jsonLdApi: JsonLdApi, rcr: RemoteContextResolution): IO[Configuration] = for {
contextValue <- IO.delay { ContextValue(contexts.blazegraph) }
jsonLdContext <- JsonLdContext(contextValue).toCatsIO
} yield {
val enhancedJsonLdContext = jsonLdContext
.addAliasIdType("IndexingBlazegraphViewValue", BlazegraphViewType.IndexingBlazegraphView.tpe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph
import akka.actor.typed.ActorSystem
import cats.effect.{Clock, ContextShift, IO, Timer}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{CatsEffectsClasspathResourceUtils, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphCoordinator
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent, DefaultProperties}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphSlowQueryDeleter, BlazegraphSlowQueryLogger, BlazegraphSlowQueryStore}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
Expand Down Expand Up @@ -48,6 +48,10 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {

make[BlazegraphViewsConfig].from { BlazegraphViewsConfig.load(_) }

make[DefaultProperties].fromEffect {
CatsEffectsClasspathResourceUtils.ioPropertiesOf("blazegraph/index.properties").map(DefaultProperties)
}

make[HttpClient].named("http-indexing-client").from {
(cfg: BlazegraphViewsConfig, as: ActorSystem[Nothing], sc: Scheduler) =>
HttpClient()(cfg.indexingClient, as.classicSystem, sc)
Expand All @@ -69,17 +73,18 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
)(timer)
}

make[BlazegraphSlowQueryLogger].from { (cfg: BlazegraphViewsConfig, store: BlazegraphSlowQueryStore) =>
BlazegraphSlowQueryLogger(store, cfg.slowQueries.slowQueryThreshold)
make[BlazegraphSlowQueryLogger].from { (cfg: BlazegraphViewsConfig, store: BlazegraphSlowQueryStore, c: Clock[IO]) =>
BlazegraphSlowQueryLogger(store, cfg.slowQueries.slowQueryThreshold)(c)
}

make[BlazegraphClient].named("blazegraph-indexing-client").from {
(
cfg: BlazegraphViewsConfig,
client: HttpClient @Id("http-indexing-client"),
as: ActorSystem[Nothing]
as: ActorSystem[Nothing],
properties: DefaultProperties
) =>
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem)
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout, properties.value)(as.classicSystem)
}

make[HttpClient].named("http-query-client").from {
Expand All @@ -91,9 +96,10 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
(
cfg: BlazegraphViewsConfig,
client: HttpClient @Id("http-query-client"),
as: ActorSystem[Nothing]
as: ActorSystem[Nothing],
properties: DefaultProperties
) =>
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem)
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout, properties.value)(as.classicSystem)
}

make[ValidateBlazegraphView].from {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.sdk.ServiceDependency
import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOInstant, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.{BlazegraphClient, SparqlClientError}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphView.IndexingBlazegraphView
Expand Down Expand Up @@ -542,9 +542,8 @@ object BlazegraphViews {
case i: IndexingBlazegraphView =>
client
.createNamespace(BlazegraphViews.namespace(i, prefix))
.mapError(WrappedBlazegraphClientError.apply)
.adaptError { case e: SparqlClientError => WrappedBlazegraphClientError(e) }
.void
.toCatsIO
case _ => IO.unit
}
apply(fetchContext, contextResolution, validate, createNameSpace, eventLogConfig, prefix, xas)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils.ioContentOf
Expand Down Expand Up @@ -173,28 +174,28 @@ object BlazegraphViewsQuery {
)(implicit caller: Caller): IO[R] =
for {
view <- viewsStore.fetch(id, project)
p <- fetchContext.onRead(project)
iri <- expandIri(id, p)
p <- fetchContext.onRead(project).toCatsIO
iri <- expandIri(id, p).toCatsIO
indices <- view match {
case i: IndexingView =>
aclCheck
.authorizeForOr(i.ref.project, i.permission)(
AuthorizationFailed(i.ref.project, i.permission)
)
.as(Set(i.index))
.toBIO[BlazegraphViewRejection]
case a: AggregateView =>
aclCheck
.mapFilter[IndexingView, String](
a.views,
v => ProjectAcl(v.ref.project) -> v.permission,
_.index
)
.toUIO
}
qr <- logSlowQueries(
BlazegraphQueryContext(ViewRef.apply(project, iri), query, caller.subject),
client.query(indices, query, responseType).mapError(WrappedBlazegraphClientError)
client.query(indices, query, responseType).adaptError { case e: SparqlClientError =>
WrappedBlazegraphClientError(e)
}
)
} yield qr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ import akka.http.scaladsl.model.headers.{BasicHttpCredentials, HttpCredentials,
import akka.http.scaladsl.model.{HttpEntity, HttpHeader, Uri}
import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller
import akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers.stringUnmarshaller
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient.timeoutHeader
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlClientError.{InvalidCountRequest, WrappedHttpClientError}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.{Aux, SparqlResultsJson}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig.Credentials
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient
import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientError}
import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription
import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription.ResolvedServiceDescription
import ch.epfl.bluebrain.nexus.delta.sdk.model.Name
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import monix.bio.{IO, UIO}

import scala.concurrent.duration._

Expand All @@ -29,24 +30,20 @@ import scala.concurrent.duration._
class BlazegraphClient(
client: HttpClient,
endpoint: Uri,
queryTimeout: Duration
queryTimeout: Duration,
defaultProperties: Map[String, String]
)(implicit credentials: Option[HttpCredentials], as: ActorSystem)
extends SparqlClient(client, SparqlQueryEndpoint.blazegraph(endpoint)) {

implicit private val cl: ClassLoader = getClass.getClassLoader

private val serviceVersion = """(buildVersion">)([^<]*)""".r
private val serviceName = Name.unsafe("blazegraph")

private val defaultProperties =
ClasspathResourceUtils.bioPropertiesOf("blazegraph/index.properties").hideErrors.memoizeOnSuccess

override def query[R <: SparqlQueryResponse](
indices: Iterable[String],
q: SparqlQuery,
responseType: Aux[R],
additionalHeaders: Seq[HttpHeader]
): IO[SparqlClientError, R] = {
): IO[R] = {
val headers = queryTimeout match {
case finite: FiniteDuration => additionalHeaders :+ RawHeader(timeoutHeader, finite.toMillis.toString)
case _ => additionalHeaders
Expand All @@ -57,10 +54,11 @@ class BlazegraphClient(
/**
* Fetches the service description information (name and version)
*/
def serviceDescription: UIO[ServiceDescription] =
def serviceDescription: IO[ServiceDescription] =
client
.fromEntityTo[ResolvedServiceDescription](Get(endpoint / "status"))
.timeout(5.seconds)
.toCatsIO
.redeem(
_ => ServiceDescription.unresolved(serviceName),
_.map(_.copy(name = serviceName)).getOrElse(ServiceDescription.unresolved(serviceName))
Expand All @@ -69,11 +67,13 @@ class BlazegraphClient(
/**
* Check whether the passed namespace ''namespace'' exists.
*/
def existsNamespace(namespace: String): IO[SparqlClientError, Boolean] =
client(Get(endpoint / "namespace" / namespace)) {
case resp if resp.status == OK => UIO.delay(resp.discardEntityBytes()) >> IO.pure(true)
case resp if resp.status == NotFound => UIO.delay(resp.discardEntityBytes()) >> IO.pure(false)
}.mapError(WrappedHttpClientError)
def existsNamespace(namespace: String): IO[Boolean] =
client
.run(Get(endpoint / "namespace" / namespace)) {
case resp if resp.status == OK => IO.delay(resp.discardEntityBytes()).as(true)
case resp if resp.status == NotFound => IO.delay(resp.discardEntityBytes()).as(false)
}
.adaptError { case e: HttpClientError => WrappedHttpClientError(e) }

/**
* Attempts to create a namespace (if it doesn't exist) recovering gracefully when the namespace already exists.
Expand All @@ -85,17 +85,19 @@ class BlazegraphClient(
* @return
* ''true'' wrapped on an IO when namespace has been created and ''false'' wrapped on an IO when it already existed
*/
def createNamespace(namespace: String, properties: Map[String, String]): IO[SparqlClientError, Boolean] =
def createNamespace(namespace: String, properties: Map[String, String]): IO[Boolean] =
existsNamespace(namespace).flatMap {
case true => IO.pure(false)
case false =>
val updated = properties + ("com.bigdata.rdf.sail.namespace" -> namespace)
val payload = updated.map { case (key, value) => s"$key=$value" }.mkString("\n")
val req = Post(endpoint / "namespace", HttpEntity(payload))
client(req) {
case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> IO.pure(true)
case resp if resp.status == Conflict => UIO.delay(resp.discardEntityBytes()) >> IO.pure(false)
}.mapError(WrappedHttpClientError)
client
.run(req) {
case resp if resp.status.isSuccess() => IO.delay(resp.discardEntityBytes()).as(true)
case resp if resp.status == Conflict => IO.delay(resp.discardEntityBytes()).as(false)
}
.adaptError { case e: HttpClientError => WrappedHttpClientError(e) }
}

/**
Expand All @@ -107,25 +109,26 @@ class BlazegraphClient(
* @return
* ''true'' wrapped on an IO when namespace has been created and ''false'' wrapped on an IO when it already existed
*/
def createNamespace(namespace: String): IO[SparqlClientError, Boolean] =
defaultProperties.flatMap(createNamespace(namespace, _))
def createNamespace(namespace: String): IO[Boolean] = createNamespace(namespace, defaultProperties)

/**
* Attempts to delete a namespace recovering gracefully when the namespace does not exists.
*
* @return
* ''true'' wrapped in ''F'' when namespace has been deleted and ''false'' wrapped in ''F'' when it does not existe
*/
def deleteNamespace(namespace: String): IO[SparqlClientError, Boolean] =
client(Delete(endpoint / "namespace" / namespace)) {
case resp if resp.status == OK => UIO.delay(resp.discardEntityBytes()) >> IO.pure(true)
case resp if resp.status == NotFound => UIO.delay(resp.discardEntityBytes()) >> IO.pure(false)
}.mapError(WrappedHttpClientError)
def deleteNamespace(namespace: String): IO[Boolean] =
client
.run(Delete(endpoint / "namespace" / namespace)) {
case resp if resp.status == OK => IO.delay(resp.discardEntityBytes()).as(true)
case resp if resp.status == NotFound => IO.delay(resp.discardEntityBytes()).as(false)
}
.adaptError { case e: HttpClientError => WrappedHttpClientError(e) }

/**
* Count all the triples on an index
*/
def count(index: String): IO[SparqlClientError, Long] = {
def count(index: String): IO[Long] = {
val sparqlQuery = SparqlConstructQuery.unsafe("SELECT (COUNT(?s) AS ?count) WHERE { ?s ?p ?o }")
query(Set(index), sparqlQuery, SparqlResultsJson)
.flatMap { response =>
Expand All @@ -135,7 +138,7 @@ class BlazegraphClient(
count <- countAsString.value.toLongOption
} yield count

IO.fromOption(count, InvalidCountRequest(index, sparqlQuery.value))
IO.fromOption(count)(InvalidCountRequest(index, sparqlQuery.value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will never forgive the Cats authors for this method signature

}
}

Expand Down Expand Up @@ -163,10 +166,11 @@ object BlazegraphClient {
client: HttpClient,
endpoint: Uri,
credentials: Option[Credentials],
queryTimeout: Duration
queryTimeout: Duration,
defaultProperties: Map[String, String]
)(implicit as: ActorSystem): BlazegraphClient = {
implicit val cred: Option[BasicHttpCredentials] =
credentials.map { cred => BasicHttpCredentials(cred.username, cred.password.value) }
new BlazegraphClient(client, endpoint, queryTimeout)
new BlazegraphClient(client, endpoint, queryTimeout, defaultProperties)
}
}
Loading