Skip to content

Commit

Permalink
Additional logging
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Oct 4, 2023
1 parent f882a7d commit 236d25a
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ object Enrich {
.evalTap(chunk => Logger[F].debug(s"Starting to process chunk of size ${chunk.size}"))
.evalTap(chunk => env.metrics.rawCount(chunk.size))
.map(chunk => chunk.map(a => (a, env.getPayload(a))))
.evalTap { chunk =>
val maxSize = chunk.foldLeft(0) {
case (previousMax, event) =>
if (event._2.length > previousMax)
event._2.length
else
previousMax
}
Logger[F].debug(s"The biggest event in chunk of size ${chunk.size} has $maxSize bytes")
}
.evalMap(chunk =>
for {
begin <- Clock[F].realTime(TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package com.snowplowanalytics.snowplow.enrich.common

import cats.Monad
import cats.data.{Validated, ValidatedNel}
import cats.effect.Clock
import cats.effect.{Clock, Sync}
import cats.implicits._

import com.snowplowanalytics.iglu.client.IgluCirceClient
Expand All @@ -24,6 +24,9 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}

import org.joda.time.DateTime

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentManager, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
Expand All @@ -33,6 +36,9 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.HttpClient
/** Expresses the end-to-end event pipeline supported by the Scala Common Enrich project. */
object EtlPipeline {

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

/*
* Feature flags available in the current version of Enrich
* @param acceptInvalid Whether enriched events that are invalid against
Expand All @@ -59,7 +65,7 @@ object EtlPipeline {
* @return the ValidatedMaybeCanonicalOutput. Thanks to flatMap, will include any validation
* errors contained within the ValidatedMaybeCanonicalInput
*/
def processEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](
def processEvents[F[_]: RegistryLookup: Clock: HttpClient: Sync](
adapterRegistry: AdapterRegistry,
enrichmentRegistry: EnrichmentRegistry[F],
client: IgluCirceClient[F],
Expand All @@ -75,19 +81,20 @@ object EtlPipeline {
.toRawEvents(payload, client, processor)
.flatMap {
case Validated.Valid(rawEvents) =>
rawEvents.toList.traverse { event =>
EnrichmentManager
.enrichEvent(
enrichmentRegistry,
client,
processor,
etlTstamp,
event,
featureFlags,
invalidCount
)
.toValidated
}
Logger[F].debug(s"Collector payload contains ${rawEvents.size} events") *>
rawEvents.toList.traverse { event =>
EnrichmentManager
.enrichEvent(
enrichmentRegistry,
client,
processor,
etlTstamp,
event,
featureFlags,
invalidCount
)
.toValidated
}
case Validated.Invalid(badRow) =>
Monad[F].pure(List(badRow.invalid[EnrichedEvent]))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import org.joda.time.DateTime
import io.circe.Json
import cats.{Applicative, Monad}
import cats.data.{EitherT, NonEmptyList, OptionT, StateT}
import cats.effect.Clock
import cats.effect.{Clock, Sync}
import cats.implicits._

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.refererparser._

import com.snowplowanalytics.iglu.client.IgluCirceClient
Expand All @@ -47,6 +50,9 @@ import utils.{IgluUtils, ConversionUtils => CU}

object EnrichmentManager {

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

/**
* Run the enrichment workflow
* @param registry Contain configuration for all enrichments to apply
Expand All @@ -58,7 +64,7 @@ object EnrichmentManager {
* @param invalidCount Function to increment the count of invalid events
* @return Enriched event or bad row if a problem occured
*/
def enrichEvent[F[_]: Monad: RegistryLookup: Clock](
def enrichEvent[F[_]: RegistryLookup: Clock: Sync](
registry: EnrichmentRegistry[F],
client: IgluCirceClient[F],
processor: Processor,
Expand All @@ -69,11 +75,14 @@ object EnrichmentManager {
): EitherT[F, BadRow, EnrichedEvent] =
for {
enriched <- EitherT.fromEither[F](setupEnrichedEvent(raw, etlTstamp, processor))
_ <- EitherT.liftF(Logger[F].debug(s"Validating contexts of ${enriched.event_id}"))
extractResult <- IgluUtils.extractAndValidateInputJsons(enriched, client, raw, processor)
_ <- EitherT.liftF(Logger[F].debug(s"Contexts of ${enriched.event_id} validated"))
_ = {
ME.formatUnstructEvent(extractResult.unstructEvent).foreach(e => enriched.unstruct_event = e)
ME.formatContexts(extractResult.contexts).foreach(c => enriched.contexts = c)
}
_ <- EitherT.liftF(Logger[F].debug(s"Running the enrichments for ${enriched.event_id}"))
enrichmentsContexts <- runEnrichments(
registry,
processor,
Expand All @@ -83,9 +92,12 @@ object EnrichmentManager {
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
_ <- EitherT.liftF(Logger[F].debug(s"Enrichments run for ${enriched.event_id}"))
_ = ME.formatContexts(enrichmentsContexts ::: extractResult.validationInfoContexts).foreach(c => enriched.derived_contexts = c)
_ <- EitherT.liftF(Logger[F].debug(s"Validating the contexts added by enrichments for ${enriched.event_id}"))
_ <- IgluUtils
.validateEnrichmentsContexts[F](client, enrichmentsContexts, raw, processor, enriched)
_ <- EitherT.liftF(Logger[F].debug(s"Contexts added by enrichments validated for ${enriched.event_id}"))
_ <- EitherT.rightT[F, BadRow](
anonIp(enriched, registry.anonIp).foreach(enriched.user_ipaddress = _)
)
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ object Dependencies {
iabClient,
yauaa,
log4jToSlf4j,
log4cats,
guava,
circeOptics,
circeJackson,
Expand Down

0 comments on commit 236d25a

Please sign in to comment.