diff --git a/build.sbt b/build.sbt index 22e752f3..93ea84f1 100644 --- a/build.sbt +++ b/build.sbt @@ -32,7 +32,8 @@ lazy val core = project name := "lila-search-core", libraryDependencies ++= Seq( "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % "8.11.5", - "joda-time" % "joda-time" % "2.12.7" + "org.typelevel" %% "cats-core" % "2.10.0", + "joda-time" % "joda-time" % "2.12.7", ) ) @@ -74,12 +75,15 @@ lazy val app = (project in file("modules/app")) libraryDependencies ++= Seq( "com.disneystreaming.smithy4s" %% "smithy4s-http4s" % smithy4sVersion.value, "com.disneystreaming.smithy4s" %% "smithy4s-http4s-swagger" % smithy4sVersion.value, + catsCore, + catsEffect, http4sServer, http4sEmberClient, cirisCore, cirisHtt4s, logbackX ), + excludeDependencies ++= Seq("org.typelevel" % "cats-core_2.13", "org.typelevel" % "cats-kernel_2.13"), Compile / run / fork := true, ) .enablePlugins(JavaAppPackaging) diff --git a/modules/core/src/main/scala/ESClient.scala b/modules/core/src/main/scala/ESClient.scala index c7c9f1b4..45e66909 100644 --- a/modules/core/src/main/scala/ESClient.scala +++ b/modules/core/src/main/scala/ESClient.scala @@ -3,10 +3,12 @@ package lila.search import com.sksamuel.elastic4s.ElasticDsl.{ RichFuture => _, _ } import com.sksamuel.elastic4s.fields.ElasticField import com.sksamuel.elastic4s.{ ElasticClient, ElasticDsl, Index => ESIndex, Response } -import scala.concurrent.{ ExecutionContext, Future } import com.sksamuel.elastic4s.requests.indexes.IndexResponse import com.sksamuel.elastic4s.requests.delete.DeleteResponse import com.sksamuel.elastic4s.requests.bulk.BulkResponse +import com.sksamuel.elastic4s.{ Executor, Functor } +import cats.syntax.all.* +import cats.MonadThrow case class JsonObject(json: String) extends AnyVal @@ -15,8 +17,8 @@ case class Index(name: String) extends AnyVal { } trait ESClient[F[_]] { - def search[A](index: Index, query: A, from: From, size: Size)(implicit q: Queryable[A]): F[SearchResponse] + def search[A](index: Index, query: A, from: From, size: Size)(implicit q: Queryable[A]): F[SearchResponse] def count[A](index: Index, query: A)(implicit q: Queryable[A]): F[CountResponse] def store(index: Index, id: Id, obj: JsonObject): F[Response[IndexResponse]] def storeBulk(index: Index, objs: List[(String, JsonObject)]): F[Unit] @@ -28,87 +30,83 @@ trait ESClient[F[_]] { } object ESClient { - def makeFuture(client: ElasticClient)(implicit ec: ExecutionContext): ESClient[Future] = - new ESClientFuture(client) -} -final private class ESClientFuture(client: ElasticClient)(implicit ec: ExecutionContext) - extends ESClient[Future] { + def apply[F[_]: MonadThrow: Functor: Executor](client: ElasticClient) = new ESClient[F] { - private def toResult[A](response: Response[A]): Future[A] = - response.fold[Future[A]](Future.failed(new Exception(response.error.reason)))(Future.successful) + def toResult[A](response: Response[A]): F[A] = + response + .fold[F[A]](MonadThrow[F].raiseError[A](new Exception(response.error.reason)))(MonadThrow[F].pure) - def search[A](index: Index, query: A, from: From, size: Size)(implicit - q: Queryable[A] - ): Future[SearchResponse] = - client - .execute { - q.searchDef(query)(from, size)(index) - } - .flatMap(toResult) - .map(SearchResponse.apply) + def search[A](index: Index, query: A, from: From, size: Size)(implicit + q: Queryable[A] + ): F[SearchResponse] = + client + .execute { q.searchDef(query)(from, size)(index) } + .flatMap(toResult) + .map(SearchResponse.apply) + + def count[A](index: Index, query: A)(implicit q: Queryable[A]): F[CountResponse] = + client + .execute { + q.countDef(query)(index) + } + .flatMap(toResult) + .map(CountResponse.apply) - def count[A](index: Index, query: A)(implicit q: Queryable[A]): Future[CountResponse] = - client - .execute { - q.countDef(query)(index) + def store(index: Index, id: Id, obj: JsonObject): F[Response[IndexResponse]] = + client.execute { + indexInto(index.name).source(obj.json).id(id.value) } - .flatMap(toResult) - .map(CountResponse.apply) - def store(index: Index, id: Id, obj: JsonObject): Future[Response[IndexResponse]] = - client.execute { - indexInto(index.name).source(obj.json).id(id.value) - } + def storeBulk(index: Index, objs: List[(String, JsonObject)]): F[Unit] = + if (objs.isEmpty) ().pure[F] + else + client.execute { + ElasticDsl.bulk { + objs.map { case (id, obj) => + indexInto(index.name).source(obj.json).id(id) + } + } + }.void + + def deleteOne(index: Index, id: Id): F[Response[DeleteResponse]] = + client.execute { + deleteById(index.toES, id.value) + } - def storeBulk(index: Index, objs: List[(String, JsonObject)]): Future[Unit] = - if (objs.isEmpty) funit - else + def deleteMany(index: Index, ids: List[Id]): F[Response[BulkResponse]] = client.execute { ElasticDsl.bulk { - objs.map { case (id, obj) => - indexInto(index.name).source(obj.json).id(id) + ids.map { id => + deleteById(index.toES, id.value) } } - }.void + } - def deleteOne(index: Index, id: Id): Future[Response[DeleteResponse]] = - client.execute { - deleteById(index.toES, id.value) - } + def putMapping(index: Index, fields: Seq[ElasticField]): F[Unit] = + dropIndex(index) >> client.execute { + createIndex(index.name) + .mapping( + properties(fields).source(false) // all false + ) + .shards(5) + .replicas(0) + .refreshInterval(Which.refreshInterval(index)) + }.void - def deleteMany(index: Index, ids: List[Id]): Future[Response[BulkResponse]] = - client.execute { - ElasticDsl.bulk { - ids.map { id => - deleteById(index.toES, id.value) + def refreshIndex(index: Index): F[Unit] = + client + .execute { + ElasticDsl.refreshIndex(index.name) + } + .void + .recover { case _: Exception => + println(s"Failed to refresh index $index") } - } - } - - def putMapping(index: Index, fields: Seq[ElasticField]): Future[Unit] = - dropIndex(index) >> client.execute { - createIndex(index.name) - .mapping( - properties(fields).source(false) // all false - ) - .shards(5) - .replicas(0) - .refreshInterval(Which.refreshInterval(index)) - }.void - - def refreshIndex(index: Index): Future[Unit] = - client - .execute { - ElasticDsl.refreshIndex(index.name) - } - .void - .recover { case _: Exception => - println(s"Failed to refresh index $index") - } - private def dropIndex(index: Index) = - client.execute { - deleteIndex(index.name) - } + private def dropIndex(index: Index) = + client.execute { + deleteIndex(index.name) + } + } } diff --git a/play/app/src/main/scala/AppLoader.scala b/play/app/src/main/scala/AppLoader.scala index fa37f04c..93cab4bd 100644 --- a/play/app/src/main/scala/AppLoader.scala +++ b/play/app/src/main/scala/AppLoader.scala @@ -3,6 +3,8 @@ import play.api.routing.Router import lila.search.ESClient import com.sksamuel.elastic4s.http.JavaClient import com.sksamuel.elastic4s.{ ElasticClient, ElasticProperties } +import scala.concurrent.Future +import cats.instances.future._ class AppLoader extends ApplicationLoader { private var components: AppComponents = _ @@ -17,7 +19,7 @@ class AppComponents(context: ApplicationLoader.Context) extends BuiltInComponent def httpFilters = Nil - lazy val client = ESClient.makeFuture({ + lazy val client = ESClient.apply[Future]({ val c = ElasticClient(JavaClient(ElasticProperties(configuration.get[String]("elasticsearch.uri"))))