Skip to content

Commit

Permalink
Update commons module for cats effect 3
Browse files Browse the repository at this point in the history
  • Loading branch information
oermolaev committed Feb 27, 2025
1 parent 0dc870f commit 3464ebd
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments
import cats.Monad
import cats.data.{EitherT, NonEmptyList, ValidatedNel}

import cats.effect.{Async, Blocker, Clock, ContextShift}
import cats.effect.{Async, Clock}
import cats.implicits._

import io.circe._
Expand Down Expand Up @@ -100,9 +100,8 @@ object EnrichmentRegistry {
} yield configs).toValidated

// todo: ValidatedNel?
def build[F[_]: Async: Clock: ContextShift](
def build[F[_]: Async: Clock](
confs: List[EnrichmentConf],
blocker: Blocker,
shifter: ShiftExecution[F],
httpClient: HttpClient[F]
): EitherT[F, String, EnrichmentRegistry[F]] =
Expand All @@ -116,7 +115,7 @@ object EnrichmentRegistry {
case c: PiiPseudonymizerConf => er.map(_.copy(piiPseudonymizer = c.enrichment.some))
case c: SqlQueryConf =>
for {
enrichment <- EitherT.right(c.enrichment[F](blocker, shifter))
enrichment <- EitherT.right(c.enrichment[F](shifter))
registry <- er
} yield registry.copy(sqlQuery = enrichment.some)
case c: AnonIpConf => er.map(_.copy(anonIp = c.enrichment.some))
Expand All @@ -136,7 +135,7 @@ object EnrichmentRegistry {
} yield registry.copy(iab = enrichment.some)
case c: IpLookupsConf =>
for {
enrichment <- EitherT.right(c.enrichment[F](blocker))
enrichment <- EitherT.right(c.enrichment[F]())
registry <- er
} yield registry.copy(ipLookups = enrichment.some)
case c: JavascriptScriptConf => er.map(_.copy(javascriptScript = c.enrichment.some))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import cats.Monad
import cats.effect.Clock
import cats.implicits._
import java.util.concurrent.TimeUnit

import cats.{Functor, Monad}
import com.snowplowanalytics.lrumap.{CreateLruMap, LruMap}

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.CachingEvaluator.{Cache, CachedItem, GetResult, Value}

final class CachingEvaluator[F[_], K, V](
Expand Down Expand Up @@ -99,7 +96,8 @@ final class CachingEvaluator[F[_], K, V](
case _: Value.Error[V] => config.errorTtl
}

private def getCurrentTime(implicit C: Clock[F]): F[Long] = C.realTime(TimeUnit.SECONDS)
private def getCurrentTime(implicit C: Clock[F], F: Functor[F]): F[Long] =
C.realTime.map(_.toSeconds)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,19 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import java.net.URI
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.FiniteDuration

import cats.data.EitherT

import cats.effect.{Async, Blocker, Clock, ContextShift, Sync}

import org.joda.money.CurrencyUnit

import com.snowplowanalytics.iglu.core.SchemaKey

import cats.effect.{Async, Clock, Sync}
import com.snowplowanalytics.forex.model.AccountType

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.{ApiRequestEnrichment, HttpApi}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.{Rdbms, SqlQueryEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
import org.joda.money.CurrencyUnit

import java.net.URI
import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

sealed trait EnrichmentConf {

Expand Down Expand Up @@ -84,7 +79,7 @@ object EnrichmentConf {
cache: SqlQueryEnrichment.Cache,
ignoreOnError: Boolean
) extends EnrichmentConf {
def enrichment[F[_]: Async: Clock: ContextShift](blocker: Blocker, shifter: ShiftExecution[F]): F[SqlQueryEnrichment[F]] =
def enrichment[F[_]: Async: Clock](shifter: ShiftExecution[F]): F[SqlQueryEnrichment[F]] =
SqlQueryEnrichment.create[F](
schemaKey,
inputs,
Expand All @@ -93,7 +88,6 @@ object EnrichmentConf {
output,
cache,
ignoreOnError,
blocker,
shifter
)
}
Expand Down Expand Up @@ -184,13 +178,16 @@ object EnrichmentConf {
) extends EnrichmentConf {
override val filesToCache: List[(URI, String)] =
List(geoFile, ispFile, domainFile, connectionTypeFile).flatten
def enrichment[F[_]: Async: ContextShift](blocker: Blocker): F[IpLookupsEnrichment[F]] =

val blocker: ExecutionContext = ExecutionContext.global

def enrichment[F[_]: Async](): F[IpLookupsEnrichment[F]] =
IpLookupsEnrichment.create[F](
blocker,
geoFile.map(_._2),
ispFile.map(_._2),
domainFile.map(_._2),
connectionTypeFile.map(_._2)
connectionTypeFile.map(_._2),
blocker
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,19 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import java.net.URI

import cats.data.{NonEmptyList, ValidatedNel}
import cats.implicits._

import cats.effect.{Async, Blocker, ContextShift}

import cats.effect.Async
import io.circe._

import inet.ipaddr.HostName

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey}

import com.snowplowanalytics.maxmind.iplookups._
import com.snowplowanalytics.maxmind.iplookups.model._

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.IpLookupsConf
import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils

import scala.concurrent.ExecutionContext

/** Companion object. Lets us create an IpLookupsEnrichment instance from a Json. */
object IpLookupsEnrichment extends ParseableEnrichment {
override val supportedSchema =
Expand Down Expand Up @@ -95,12 +90,12 @@ object IpLookupsEnrichment extends ParseableEnrichment {
} yield IpLookupsDatabase(name, uri, uriAndDb._2)).toValidated.some
} else None

def create[F[_]: Async: ContextShift](
blocker: Blocker,
def create[F[_]: Async](
geoFilePath: Option[String],
ispFilePath: Option[String],
domainFilePath: Option[String],
connectionFilePath: Option[String]
connectionFilePath: Option[String],
blocker: ExecutionContext
): F[IpLookupsEnrichment[F]] =
CreateIpLookups[F]
.createFromFilenames(
Expand All @@ -119,17 +114,18 @@ object IpLookupsEnrichment extends ParseableEnrichment {
* @param ipLookups IP lookups client
* @param blocker Runs db lookups on a separate thread pool
*/
final case class IpLookupsEnrichment[F[_]: ContextShift](ipLookups: IpLookups[F], blocker: Blocker) extends Enrichment {
final case class IpLookupsEnrichment[F[_]: Async](ipLookups: IpLookups[F], blocker: ExecutionContext) extends Enrichment {

/**
* Extract the geo-location using the client IP address.
* @param ip The client's IP address to use to lookup the client's geo-location
* @return an IpLookupResult
*/
def extractIpInformation(ip: String): F[IpLookupResult] =
blocker.blockOn {
ipLookups.performLookups(Either.catchNonFatal(new HostName(ip).toAddress).fold(_ => ip, addr => addr.toString))
}
Async[F].evalOn(
ipLookups.performLookups(Either.catchNonFatal(new HostName(ip).toAddress).fold(_ => ip, addr => addr.toString)),
blocker
)
}

private[enrichments] final case class IpLookupsDatabase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,23 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery

import scala.collection.immutable.IntMap

import java.sql.{Connection, PreparedStatement, ResultSet, ResultSetMetaData}
import javax.sql.DataSource

import io.circe.Json

import cats.Monad
import cats.data.EitherT
import cats.effect.{Async, Resource, Sync}
import cats.implicits._

import cats.effect.{Async, Blocker, Bracket, ContextShift, Resource, Sync}

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.Input.ExtractedValue
import io.circe.Json

import java.sql.{Connection, PreparedStatement, ResultSet, ResultSetMetaData}
import javax.sql.DataSource
import scala.collection.immutable.IntMap

// DbExecutor must have much smaller interface, ideally without any JDBC types
/** Side-effecting ability to connect to database */
trait DbExecutor[F[_]] {

/** Get a connection from the Hikari data source */
def getConnection(dataSource: DataSource, blocker: Blocker): Resource[F, Connection]
def getConnection(dataSource: DataSource): Resource[F, Connection]

/** Execute a SQL query */
def execute(query: PreparedStatement): EitherT[F, Throwable, ResultSet]
Expand Down Expand Up @@ -76,45 +72,45 @@ object DbExecutor {

def apply[F[_]](implicit ev: DbExecutor[F]): DbExecutor[F] = ev

def async[F[_]: Async: ContextShift]: DbExecutor[F] = sync[F]
def async[F[_]: Async]: DbExecutor[F] = sync[F]

def sync[F[_]: ContextShift: Sync]: DbExecutor[F] =
def sync[F[_]: Async]: DbExecutor[F] =
new DbExecutor[F] {
def getConnection(dataSource: DataSource, blocker: Blocker): Resource[F, Connection] =
Resource.fromAutoCloseable(blocker.blockOn(Sync[F].delay(dataSource.getConnection())))
def getConnection(dataSource: DataSource): Resource[F, Connection] =
Resource.eval(Async[F].delay(dataSource.getConnection()))

def execute(query: PreparedStatement): EitherT[F, Throwable, ResultSet] =
Sync[F].delay(query.executeQuery()).attemptT

def convert(resultSet: ResultSet, names: JsonOutput.PropertyNameMode): EitherT[F, Throwable, List[Json]] =
EitherT(Bracket[F, Throwable].bracket(Sync[F].pure(resultSet)) { set =>
val hasNext = Sync[F].delay(set.next()).attemptT
val convert = transform(set, names)(this, Monad[F])
convert.whileM[List](hasNext).value
} { set =>
Sync[F].delay(set.close())
})
EitherT(
Resource
.make(Async[F].pure(resultSet))(rs => Async[F].delay(rs.close()))
.use { set =>
val hasNext = EitherT(Async[F].delay(set.next()).attempt)
val convert = transform(set, names)(this, Async[F])
convert.whileM[List](hasNext).value
}
)

def getMetaData(rs: ResultSet): EitherT[F, Throwable, ResultSetMetaData] =
Sync[F].delay(rs.getMetaData).attemptT
EitherT(Async[F].delay(rs.getMetaData).attempt)

def getColumnCount(rsMeta: ResultSetMetaData): EitherT[F, Throwable, Int] =
Sync[F].delay(rsMeta.getColumnCount).attemptT
EitherT(Async[F].delay(rsMeta.getColumnCount).attempt)

def getColumnLabel(column: Int, rsMeta: ResultSetMetaData): EitherT[F, Throwable, String] =
Sync[F].delay(rsMeta.getColumnLabel(column)).attemptT
EitherT(Async[F].delay(rsMeta.getColumnLabel(column)).attempt)

def getColumnType(column: Int, rsMeta: ResultSetMetaData): EitherT[F, Throwable, String] =
Sync[F].delay(rsMeta.getColumnClassName(column)).attemptT
EitherT(Async[F].delay(rsMeta.getColumnClassName(column)).attempt)

def getColumnValue(
datatype: String,
columnIdx: Int,
rs: ResultSet
): EitherT[F, Throwable, Json] =
Sync[F]
.delay(rs.getObject(columnIdx))
.attemptT
EitherT(Async[F].delay(rs.getObject(columnIdx)).attempt)
.map(Option.apply)
.map {
case Some(any) => JsonOutput.getValue(any, datatype)
Expand Down Expand Up @@ -194,6 +190,6 @@ object DbExecutor {
if (intMap.keys.size == placeholderCount) true else false
}

def getConnection[F[_]: Monad: DbExecutor](dataSource: DataSource, blocker: Blocker): Resource[F, Connection] =
DbExecutor[F].getConnection(dataSource, blocker)
def getConnection[F[_]: Monad: DbExecutor](dataSource: DataSource): Resource[F, Connection] =
DbExecutor[F].getConnection(dataSource)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.circe.generic.semiauto._
import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel}
import cats.implicits._

import cats.effect.{Async, Blocker, Clock, ContextShift}
import cats.effect.{Async, Clock}

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}

Expand Down Expand Up @@ -101,15 +101,14 @@ object SqlQueryEnrichment extends ParseableEnrichment {
implicit val cacheCirceDecoder: Decoder[Cache] =
deriveDecoder[Cache]

def create[F[_]: Async: Clock: ContextShift](
def create[F[_]: Async: Clock](
schemaKey: SchemaKey,
inputs: List[Input],
db: Rdbms,
query: Query,
output: Output,
cache: Cache,
ignoreOnError: Boolean,
blocker: Blocker,
shifter: ShiftExecution[F]
): F[SqlQueryEnrichment[F]] = {
val cacheConfig = CachingEvaluator.Config(
Expand All @@ -131,7 +130,6 @@ object SqlQueryEnrichment extends ParseableEnrichment {
output,
evaluator,
executor,
blocker,
shifter,
getDataSource(db),
ignoreOnError
Expand All @@ -155,7 +153,6 @@ object SqlQueryEnrichment extends ParseableEnrichment {
* @param output configuration of output context
* @param ttl cache TTL in milliseconds
* @param cache actual mutable LRU cache
* @param blocker Allows running blocking enrichments on a dedicated thread pool
*/
final case class SqlQueryEnrichment[F[_]: Async: Clock](
schemaKey: SchemaKey,
Expand All @@ -165,7 +162,6 @@ final case class SqlQueryEnrichment[F[_]: Async: Clock](
output: Output,
sqlQueryEvaluator: SqlQueryEvaluator[F],
dbExecutor: DbExecutor[F],
blocker: Blocker,
shifter: ShiftExecution[F],
dataSource: DataSource,
ignoreOnError: Boolean
Expand Down Expand Up @@ -217,7 +213,7 @@ final case class SqlQueryEnrichment[F[_]: Async: Clock](

private def runLookup(intMap: Input.ExtractedValueMap): F[Either[Throwable, List[SelfDescribingData[Json]]]] =
dbExecutor
.getConnection(dataSource, blocker)
.getConnection(dataSource)
.use { connection =>
maybeRunWithConnection(connection, intMap)
}
Expand Down
Loading

0 comments on commit 3464ebd

Please sign in to comment.