Skip to content

Commit

Permalink
Add incomplete events (close #)
Browse files Browse the repository at this point in the history
Before this change, any error in the enriching workflow would short
circuit and a bad row would be emitted. After this change, if incomplete events
are enabled, the enriching goes to the end with what is possible,
accumulating errors as it goes. Errors get attached in derived_contexts.

There are now 3 main steps :

- Mapping and validating the input. This includes mapping fields of
  payload_data to the atomic event (e.g. tr_tt to tr_total while converting
  from string to number) and validating the contexts and unstruct event.
  Everything that goes wrong gets wrapped up in a SchemaViolations bad row.

- Running the enrichments. Everything that goes wrong gets wrapped up in an
  EnrichmentFailures bad row.

- Validating the output. This includes validating the enrichments contexts
  and the atomic fields lengths. Everything that goes wrong gets wrapped up
  in a SchemaViolations EnrichmentFailures bad row.
  • Loading branch information
benjben committed Mar 6, 2024
1 parent 378fe6c commit b2f8067
Show file tree
Hide file tree
Showing 46 changed files with 1,439 additions and 565 deletions.
8 changes: 8 additions & 0 deletions config/config.file.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
"file": "/var/bad"
"maxBytes": 1000000
}

# Incomplete events output
"incomplete": {
# Local FS supported for testing purposes
"type": "FileSystem"
"file": "/var/incomplete"
"maxBytes": 1000000
}
}

# Optional. Concurrency of the app
Expand Down
19 changes: 19 additions & 0 deletions config/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@
"acks": "all"
}
}

# Optional. Incomplete events output.
# If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row
"incomplete": {
"type": "Kafka"

# Name of the Kafka topic to write to
"topicName": "incomplete"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka producer configuration
# See https://kafka.apache.org/documentation/#producerconfigs for all properties
"producerConf": {
"acks": "all"
}
}
}

# Optional. Concurrency of the app
Expand Down
49 changes: 46 additions & 3 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy to retry if writing to kinesis fails with unexepected errors
# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
Expand Down Expand Up @@ -144,7 +144,7 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy to retry if writing to kinesis fails with unexepcted errors
# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
Expand Down Expand Up @@ -186,7 +186,50 @@
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional. Policy to retry if writing to kinesis fails with unexepcted errors
# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput.
"throttledBackoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 1 second
}

# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
# Maximum allowed: 500
"recordLimit": 500

# Optional. Limits the number of bytes in a single PutRecords request,
# including records and partition keys.
# Several requests are made in parallel
# Maximum allowed: 5 MB
"byteLimit": 5242880

# Optional. Use a custom Kinesis endpoint.
# Can be used for instance to work locally with localstack
# "customEndpoint": "https://localhost:4566"
}

# Optional. Incomplete events output.
# If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row
"incomplete": {
"type": "Kinesis"

# Name of the Kinesis stream to write to
"streamName": "incomplete"

# Optional. Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
Expand Down
21 changes: 21 additions & 0 deletions config/config.nsq.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@
"maxRetries": 10
}
}

# Incomplete events output
"incomplete": {
"type": "Nsq"

# Name of the NSQ topic that will receive the incomplete events
"topic": "incomplete"

# The host name of nsqd application
"nsqdHost": "127.0.0.1"

# The port number of nsqd application
"nsqdPort": 4150

# Optional. Policy to retry if writing to NSQ fails
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
}
}

# Optional. Concurrency of the app
Expand Down
25 changes: 25 additions & 0 deletions config/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@
# Note the PubSub maximum is 10MB
"maxBatchBytes": 8000000
}

# Optional. Incomplete events output.
# If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row
"incomplete": {
"type": "PubSub"

# Name of the PubSub topic that will receive the incomplete events
"topic": "projects/test-project/topics/incomplete"

# Optional. Delay threshold to use for batching.
# After this amount of time has elapsed,
# before maxBatchSize and maxBatchBytes have been reached,
# messages from the buffer will be sent.
"delayThreshold": 200 milliseconds

# Optional. Maximum number of messages sent within a batch.
# When the buffer reaches this number of messages they are sent.
# PubSub maximum : 1000
"maxBatchSize": 1000

# Optional. Maximum number of bytes sent within a batch.
# When the buffer reaches this size messages are sent.
# Note the PubSub maximum is 10MB
"maxBatchBytes": 8000000
}
}

# Optional. Concurrency of the app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.util.Base64

import org.joda.time.DateTime

import cats.data.{NonEmptyList, ValidatedNel}
import cats.data.{Ior, NonEmptyList, ValidatedNel}
import cats.{Monad, Parallel}
import cats.implicits._

Expand Down Expand Up @@ -72,7 +72,8 @@ object Enrich {
env.featureFlags,
env.metrics.invalidCount,
env.registryLookup,
env.atomicFields
env.atomicFields,
env.sinkIncomplete.isDefined
)

val enriched =
Expand Down Expand Up @@ -119,7 +120,8 @@ object Enrich {
featureFlags: FeatureFlags,
invalidCount: F[Unit],
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields
atomicFields: AtomicFields,
emitIncomplete: Boolean
)(
row: Array[Byte]
): F[Result] = {
Expand All @@ -140,7 +142,8 @@ object Enrich {
FeatureFlags.toCommon(featureFlags),
invalidCount,
registryLookup,
atomicFields
atomicFields,
emitIncomplete
)
} yield (enriched, collectorTstamp)

Expand Down Expand Up @@ -170,7 +173,7 @@ object Enrich {
case None =>
Sync[F].unit
}
} yield (List(badRow.invalid), collectorTstamp)
} yield (List(Ior.left(badRow)), collectorTstamp)

/** Build a `generic_error` bad row for unhandled runtime errors */
def genericBadRow(
Expand All @@ -189,17 +192,29 @@ object Enrich {
chunk: List[Result],
env: Environment[F, A]
): F[Unit] = {
val (bad, enriched) =
val (bad, enriched, incomplete) =
chunk
.flatMap(_._1)
.map(_.toEither)
.separate
.foldLeft((List.empty[BadRow], List.empty[EnrichedEvent], List.empty[EnrichedEvent])) {
case (previous, item) =>
val (bad, enriched, incomplete) = previous
item match {
case Ior.Right(e) => (bad, e :: enriched, incomplete)
case Ior.Left(br) => (br :: bad, enriched, incomplete)
case Ior.Both(br, i) => (br :: bad, enriched, i :: incomplete)
}
}

val (moreBad, good) = enriched.map { e =>
serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize)
.map(bytes => (e, AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e))))
}.separate

val (incompleteTooBig, incompleteBytes) = incomplete.map { e =>
serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize)
.map(bytes => AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e)))
}.separate

val allBad = (bad ++ moreBad).map(badRowResize(env, _))

List(
Expand All @@ -214,7 +229,10 @@ object Enrich {
env.processor,
env.streamsSettings.maxRecordSize
) *> env.metrics.enrichLatency(chunk.headOption.flatMap(_._2)),
sinkBad(allBad, env.sinkBad, env.metrics.badCount)
sinkBad(allBad, env.sinkBad, env.metrics.badCount),
if (incompleteTooBig.nonEmpty) Logger[F].warn(s"${incompleteTooBig.size} incomplete events discarded because they are too big")
else Sync[F].unit,
sinkIncomplete(incompleteBytes, env.sinkIncomplete)
).parSequence_
}

Expand Down Expand Up @@ -272,6 +290,15 @@ object Enrich {
Sync[F].unit
}

def sinkIncomplete[F[_]: Sync](
incomplete: List[AttributedData[Array[Byte]]],
maybeSink: Option[AttributedByteSink[F]]
): F[Unit] =
maybeSink match {
case Some(sink) => sink(incomplete)
case None => Sync[F].unit
}

def serializeEnriched(
enriched: EnrichedEvent,
processor: Processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata
* @param sinkGood function that sinks enriched event
* @param sinkPii function that sinks pii event
* @param sinkBad function that sinks an event that failed validation or enrichment
* @param sinkIncomplete function that sinks incomplete events
* @param checkpoint function that checkpoints input stream records
* @param getPayload function that extracts the collector payload bytes from a record
* @param sentry optional sentry client
Expand Down Expand Up @@ -111,6 +112,7 @@ final case class Environment[F[_], A](
sinkGood: AttributedByteSink[F],
sinkPii: Option[AttributedByteSink[F]],
sinkBad: ByteSink[F],
sinkIncomplete: Option[AttributedByteSink[F]],
checkpoint: List[A] => F[Unit],
getPayload: A => Array[Byte],
sentry: Option[SentryClient],
Expand Down Expand Up @@ -187,6 +189,7 @@ object Environment {
sinkGood: Resource[F, AttributedByteSink[F]],
sinkPii: Option[Resource[F, AttributedByteSink[F]]],
sinkBad: Resource[F, ByteSink[F]],
sinkIncomplete: Option[Resource[F, AttributedByteSink[F]]],
clients: Resource[F, List[Client[F]]],
checkpoint: List[A] => F[Unit],
getPayload: A => Array[Byte],
Expand All @@ -204,6 +207,7 @@ object Environment {
good <- sinkGood
bad <- sinkBad
pii <- sinkPii.sequence
incomplete <- sinkIncomplete.sequence
http4s <- Clients.mkHttp()
clts <- clients.map(Clients.init(http4s, _))
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
Expand Down Expand Up @@ -231,6 +235,7 @@ object Environment {
good,
pii,
bad,
incomplete,
checkpoint,
getPayload,
sentry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object Run {
mkSinkGood: Output => Resource[F, AttributedByteSink[F]],
mkSinkPii: Output => Resource[F, AttributedByteSink[F]],
mkSinkBad: Output => Resource[F, ByteSink[F]],
mkSinkIncomplete: Output => Resource[F, AttributedByteSink[F]],
checkpoint: List[A] => F[Unit],
mkClients: BlobStorageClients => List[Resource[F, Client[F]]],
getPayload: A => Array[Byte],
Expand Down Expand Up @@ -89,6 +90,7 @@ object Run {
case _ =>
mkSinkBad(file.output.bad)
}
sinkIncomplete = file.output.incomplete.map(out => initAttributedSink(out, mkSinkIncomplete))
clients = mkClients(file.blobStorage).sequence
exit <- file.input match {
case p: Input.FileSystem =>
Expand All @@ -100,6 +102,7 @@ object Run {
sinkGood,
sinkPii,
sinkBad,
sinkIncomplete,
clients,
_ => Sync[F].unit,
identity,
Expand Down Expand Up @@ -130,6 +133,7 @@ object Run {
sinkGood,
sinkPii,
sinkBad,
sinkIncomplete,
clients,
checkpointing,
getPayload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,23 @@ object ConfigFile {

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _, _, _, _) if aup._1 <= 0L =>
case c: ConfigFile if c.assetsUpdatePeriod.exists(_.length <= 0L) =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _, _, _, _)
if output.streamName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _, _) if t.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _, _, _)
if topicName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
case other => other.asRight
case c: ConfigFile =>
val Outputs(good, pii, bad, incomplete) = c.output
val piiCleaned = pii match {
case Some(ki: Output.Kinesis) if ki.streamName.isEmpty => None
case Some(p: Output.PubSub) if p.topic.isEmpty => None
case Some(ka: Output.Kafka) if ka.topicName.isEmpty => None
case _ => pii
}
val incompleteCleaned = incomplete match {
case Some(ki: Output.Kinesis) if ki.streamName.isEmpty => None
case Some(p: Output.PubSub) if p.topic.isEmpty => None
case Some(ka: Output.Kafka) if ka.topicName.isEmpty => None
case _ => incomplete
}
c.copy(output = Outputs(good, piiCleaned, bad, incompleteCleaned)).asRight
}

/* Defines where to look for default values if they are not in the provided file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ object io {
case class Outputs(
good: Output,
pii: Option[Output],
bad: Output
bad: Output,
incomplete: Option[Output]
)
object Outputs {
implicit val outputsDecoder: Decoder[Outputs] = deriveConfiguredDecoder[Outputs]
Expand Down
Loading

0 comments on commit b2f8067

Please sign in to comment.