Skip to content

Commit

Permalink
Now, ESClient is definitely polymorphic
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed May 8, 2024
1 parent 21f1da0 commit 94a59e0
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 72 deletions.
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ lazy val core = project
name := "lila-search-core",
libraryDependencies ++= Seq(
"com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % "8.11.5",
"joda-time" % "joda-time" % "2.12.7"
"org.typelevel" %% "cats-core" % "2.10.0",
"joda-time" % "joda-time" % "2.12.7",
)
)

Expand Down Expand Up @@ -74,12 +75,15 @@ lazy val app = (project in file("modules/app"))
libraryDependencies ++= Seq(
"com.disneystreaming.smithy4s" %% "smithy4s-http4s" % smithy4sVersion.value,
"com.disneystreaming.smithy4s" %% "smithy4s-http4s-swagger" % smithy4sVersion.value,
catsCore,
catsEffect,
http4sServer,
http4sEmberClient,
cirisCore,
cirisHtt4s,
logbackX
),
excludeDependencies ++= Seq("org.typelevel" % "cats-core_2.13", "org.typelevel" % "cats-kernel_2.13"),
Compile / run / fork := true,
)
.enablePlugins(JavaAppPackaging)
Expand Down
138 changes: 68 additions & 70 deletions modules/core/src/main/scala/ESClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package lila.search
import com.sksamuel.elastic4s.ElasticDsl.{ RichFuture => _, _ }
import com.sksamuel.elastic4s.fields.ElasticField
import com.sksamuel.elastic4s.{ ElasticClient, ElasticDsl, Index => ESIndex, Response }
import scala.concurrent.{ ExecutionContext, Future }
import com.sksamuel.elastic4s.requests.indexes.IndexResponse
import com.sksamuel.elastic4s.requests.delete.DeleteResponse
import com.sksamuel.elastic4s.requests.bulk.BulkResponse
import com.sksamuel.elastic4s.{ Executor, Functor }
import cats.syntax.all.*
import cats.MonadThrow

case class JsonObject(json: String) extends AnyVal

Expand All @@ -15,8 +17,8 @@ case class Index(name: String) extends AnyVal {
}

trait ESClient[F[_]] {
def search[A](index: Index, query: A, from: From, size: Size)(implicit q: Queryable[A]): F[SearchResponse]

def search[A](index: Index, query: A, from: From, size: Size)(implicit q: Queryable[A]): F[SearchResponse]
def count[A](index: Index, query: A)(implicit q: Queryable[A]): F[CountResponse]
def store(index: Index, id: Id, obj: JsonObject): F[Response[IndexResponse]]
def storeBulk(index: Index, objs: List[(String, JsonObject)]): F[Unit]
Expand All @@ -28,87 +30,83 @@ trait ESClient[F[_]] {
}

object ESClient {
def makeFuture(client: ElasticClient)(implicit ec: ExecutionContext): ESClient[Future] =
new ESClientFuture(client)
}

final private class ESClientFuture(client: ElasticClient)(implicit ec: ExecutionContext)
extends ESClient[Future] {
def apply[F[_]: MonadThrow: Functor: Executor](client: ElasticClient) = new ESClient[F] {

private def toResult[A](response: Response[A]): Future[A] =
response.fold[Future[A]](Future.failed(new Exception(response.error.reason)))(Future.successful)
def toResult[A](response: Response[A]): F[A] =
response
.fold[F[A]](MonadThrow[F].raiseError[A](new Exception(response.error.reason)))(MonadThrow[F].pure)

def search[A](index: Index, query: A, from: From, size: Size)(implicit
q: Queryable[A]
): Future[SearchResponse] =
client
.execute {
q.searchDef(query)(from, size)(index)
}
.flatMap(toResult)
.map(SearchResponse.apply)
def search[A](index: Index, query: A, from: From, size: Size)(implicit
q: Queryable[A]
): F[SearchResponse] =
client
.execute { q.searchDef(query)(from, size)(index) }
.flatMap(toResult)
.map(SearchResponse.apply)

def count[A](index: Index, query: A)(implicit q: Queryable[A]): F[CountResponse] =
client
.execute {
q.countDef(query)(index)
}
.flatMap(toResult)
.map(CountResponse.apply)

def count[A](index: Index, query: A)(implicit q: Queryable[A]): Future[CountResponse] =
client
.execute {
q.countDef(query)(index)
def store(index: Index, id: Id, obj: JsonObject): F[Response[IndexResponse]] =
client.execute {
indexInto(index.name).source(obj.json).id(id.value)
}
.flatMap(toResult)
.map(CountResponse.apply)

def store(index: Index, id: Id, obj: JsonObject): Future[Response[IndexResponse]] =
client.execute {
indexInto(index.name).source(obj.json).id(id.value)
}
def storeBulk(index: Index, objs: List[(String, JsonObject)]): F[Unit] =
if (objs.isEmpty) ().pure[F]
else
client.execute {
ElasticDsl.bulk {
objs.map { case (id, obj) =>
indexInto(index.name).source(obj.json).id(id)
}
}
}.void

def deleteOne(index: Index, id: Id): F[Response[DeleteResponse]] =
client.execute {
deleteById(index.toES, id.value)
}

def storeBulk(index: Index, objs: List[(String, JsonObject)]): Future[Unit] =
if (objs.isEmpty) funit
else
def deleteMany(index: Index, ids: List[Id]): F[Response[BulkResponse]] =
client.execute {
ElasticDsl.bulk {
objs.map { case (id, obj) =>
indexInto(index.name).source(obj.json).id(id)
ids.map { id =>
deleteById(index.toES, id.value)
}
}
}.void
}

def deleteOne(index: Index, id: Id): Future[Response[DeleteResponse]] =
client.execute {
deleteById(index.toES, id.value)
}
def putMapping(index: Index, fields: Seq[ElasticField]): F[Unit] =
dropIndex(index) >> client.execute {
createIndex(index.name)
.mapping(
properties(fields).source(false) // all false
)
.shards(5)
.replicas(0)
.refreshInterval(Which.refreshInterval(index))
}.void

def deleteMany(index: Index, ids: List[Id]): Future[Response[BulkResponse]] =
client.execute {
ElasticDsl.bulk {
ids.map { id =>
deleteById(index.toES, id.value)
def refreshIndex(index: Index): F[Unit] =
client
.execute {
ElasticDsl.refreshIndex(index.name)
}
.void
.recover { case _: Exception =>
println(s"Failed to refresh index $index")
}
}
}

def putMapping(index: Index, fields: Seq[ElasticField]): Future[Unit] =
dropIndex(index) >> client.execute {
createIndex(index.name)
.mapping(
properties(fields).source(false) // all false
)
.shards(5)
.replicas(0)
.refreshInterval(Which.refreshInterval(index))
}.void

def refreshIndex(index: Index): Future[Unit] =
client
.execute {
ElasticDsl.refreshIndex(index.name)
}
.void
.recover { case _: Exception =>
println(s"Failed to refresh index $index")
}

private def dropIndex(index: Index) =
client.execute {
deleteIndex(index.name)
}
private def dropIndex(index: Index) =
client.execute {
deleteIndex(index.name)
}
}
}
4 changes: 3 additions & 1 deletion play/app/src/main/scala/AppLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import play.api.routing.Router
import lila.search.ESClient
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.{ ElasticClient, ElasticProperties }
import scala.concurrent.Future
import cats.instances.future._

class AppLoader extends ApplicationLoader {
private var components: AppComponents = _
Expand All @@ -17,7 +19,7 @@ class AppComponents(context: ApplicationLoader.Context) extends BuiltInComponent

def httpFilters = Nil

lazy val client = ESClient.makeFuture({
lazy val client = ESClient.apply[Future]({

val c = ElasticClient(JavaClient(ElasticProperties(configuration.get[String]("elasticsearch.uri"))))

Expand Down

0 comments on commit 94a59e0

Please sign in to comment.