From 30c5cdc2fde8288bb7703b9bd942be892540a516 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Wed, 19 Jul 2023 19:28:19 +0200 Subject: [PATCH] Add tracking scenario ID in observed_event if defined --- .../common/fs2/io/experimental/Metadata.scala | 47 ++++++++++++-- .../enrich/common/fs2/io/MetadataSpec.scala | 64 ++++++++++++++++--- 2 files changed, 94 insertions(+), 17 deletions(-) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala index 73093a947..7f9e8822a 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala @@ -62,6 +62,9 @@ object Metadata { ) } + case class TrackingScenarioInfo(schemaVendor: String, schemaName: String, field: String) + private val trackingScenarioInfo = TrackingScenarioInfo("com.snowplowanalytics.snowplow", "tracking_scenario", "id") + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] @@ -171,15 +174,17 @@ object Metadata { * @param source - `app_id` for given event * @param tracker - `v_tracker` for given event * @param platform - The platform the app runs on for given event (`platform` field) + * @param scenarioId - Identifier for the tracking scenario the event is being tracked for */ case class MetadataEvent( schema: SchemaKey, source: Option[String], tracker: Option[String], - platform: Option[String] + platform: Option[String], + scenarioId: Option[String] ) object MetadataEvent { - def apply(event: EnrichedEvent): MetadataEvent = + def apply(event: EnrichedEvent, scenarioId: Option[String]): MetadataEvent = MetadataEvent( SchemaKey( Option(event.event_vendor).getOrElse("unknown-vendor"), @@ -189,7 +194,8 @@ object Metadata { ), Option(event.app_id), Option(event.v_tracker), - Option(event.platform) + Option(event.platform), + scenarioId ) } @@ -231,7 +237,7 @@ object Metadata { } yield MetadataSnapshot(aggregates, periodStart, periodEnd) } - def unwrapEntities(event: EnrichedEvent): Set[SchemaKey] = { + def unwrapEntities(event: EnrichedEvent): (Set[SchemaKey], Option[String]) = { def unwrap(str: String) = decode[SelfDescribingData[Json]](str) .traverse( @@ -243,7 +249,29 @@ object Metadata { .flatMap(_.toList) .toSet - unwrap(event.contexts) ++ unwrap(event.derived_contexts) + def getScenarioId(str: String): Option[String] = + (decode[SelfDescribingData[Json]](str) match { + case Right(contexts) => + contexts.data.as[List[SelfDescribingData[Json]]] match { + case Right(entities) => + entities.collectFirst { case sdj if sdj.schema.vendor == trackingScenarioInfo.schemaVendor && sdj.schema.name == trackingScenarioInfo.schemaName => + sdj.data.hcursor.downField(trackingScenarioInfo.field).as[String] match { + case Right(scenarioId) => + Some(scenarioId) + case _ => + None + } + } + case _ => None + } + case _ => None + }).flatten + + val scenarioId = getScenarioId(event.contexts) + val entities = (unwrap(event.contexts) ++ unwrap(event.derived_contexts)) + .filterNot(schema => schema.vendor == trackingScenarioInfo.schemaVendor && schema.name == trackingScenarioInfo.schemaName) + + (entities, scenarioId) } def schema(event: EnrichedEvent): SchemaKey = @@ -255,7 +283,11 @@ object Metadata { ) def recalculate(previous: Aggregates, events: List[EnrichedEvent]): Aggregates = - previous |+| events.map(e => Map(MetadataEvent(e) -> EntitiesAndCount(unwrapEntities(e), 1))).combineAll + previous |+| events.map { e => + val (entities, scenarioId) = unwrapEntities(e) + Map(MetadataEvent(e, scenarioId) -> EntitiesAndCount(entities, 1)) + } + .combineAll def mkWebhookEvent( organizationId: UUID, @@ -266,7 +298,7 @@ object Metadata { count: Int ): SelfDescribingData[Json] = SelfDescribingData( - SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(6, 0, 0)), + SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(6, 0, 1)), Json.obj( "organizationId" -> organizationId.asJson, "pipelineId" -> pipelineId.asJson, @@ -276,6 +308,7 @@ object Metadata { "source" -> event.source.getOrElse("unknown-source").asJson, "tracker" -> event.tracker.getOrElse("unknown-tracker").asJson, "platform" -> event.platform.getOrElse("unknown-platform").asJson, + "scenario_id" -> event.scenarioId.asJson, "eventVolume" -> Json.fromInt(count), "periodStart" -> periodStart.asJson, "periodEnd" -> periodEnd.asJson diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala index c9a71f57c..48c6fe0ee 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala @@ -84,14 +84,22 @@ class MetadataSpec extends Specification with CatsIO { "parse schemas for event's entities" in { val event = new EnrichedEvent() event.contexts = - """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}},{"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0","data":{"navigationStart":1581931694397,"unloadEventStart":1581931696046,"unloadEventEnd":1581931694764,"redirectStart":0,"redirectEnd":0,"fetchStart":1581931694397,"domainLookupStart":1581931694440,"domainLookupEnd":1581931694513,"connectStart":1581931694513,"connectEnd":1581931694665,"secureConnectionStart":1581931694572,"requestStart":1581931694665,"responseStart":1581931694750,"responseEnd":1581931694750,"domLoading":1581931694762,"domInteractive":1581931695963,"domContentLoadedEventStart":1581931696039,"domContentLoadedEventEnd":1581931696039,"domComplete":0,"loadEventStart":0,"loadEventEnd":0}}]}""" - val expected = + """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[ + {"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}, + {"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0","data":{"navigationStart":1581931694397,"unloadEventStart":1581931696046,"unloadEventEnd":1581931694764,"redirectStart":0,"redirectEnd":0,"fetchStart":1581931694397,"domainLookupStart":1581931694440,"domainLookupEnd":1581931694513,"connectStart":1581931694513,"connectEnd":1581931694665,"secureConnectionStart":1581931694572,"requestStart":1581931694665,"responseStart":1581931694750,"responseEnd":1581931694750,"domLoading":1581931694762,"domInteractive":1581931695963,"domContentLoadedEventStart":1581931696039,"domContentLoadedEventEnd":1581931696039,"domComplete":0,"loadEventStart":0,"loadEventEnd":0}}, + {"schema": "iglu:com.snowplowanalytics.snowplow/tracking_scenario/jsonschema/1-0-0", "data": {"id": "tracking_scenario"}} + ]}""" + val expectedEntitites = Seq( SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 0)), SchemaKey("org.w3", "PerformanceTiming", "jsonschema", SchemaVer.Full(1, 0, 0)) ) + val expectedScenarioId = Some("tracking_scenario") + + val (actualEntities, actualScenarioId) = Metadata.unwrapEntities(event) - Metadata.unwrapEntities(event) should containTheSameElementsAs(expected) + actualEntities should containTheSameElementsAs(expectedEntitites) + actualScenarioId should beEqualTo(expectedScenarioId) } "recalculate event aggregates" should { @@ -99,7 +107,7 @@ class MetadataSpec extends Specification with CatsIO { "add metadata event to empty state" in { val enriched = MetadataSpec.enriched Metadata.recalculate(Map.empty, List(enriched)) should containTheSameElementsAs( - Seq(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty, 1)) + Seq(MetadataEvent(enriched, None) -> EntitiesAndCount(Set.empty, 1)) ) } @@ -108,9 +116,9 @@ class MetadataSpec extends Specification with CatsIO { val other = MetadataSpec.enriched val v1_0_1 = SchemaVer.Full(1, 0, 1) other.event_version = v1_0_1.asString - val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty[SchemaKey], 1)) + val previous = Map(MetadataEvent(enriched, None) -> EntitiesAndCount(Set.empty[SchemaKey], 1)) Metadata.recalculate(previous, List(other)) should containTheSameElementsAs( - previous.toSeq ++ Seq(MetadataEvent(other) -> EntitiesAndCount(Set.empty[SchemaKey], 1)) + previous.toSeq ++ Seq(MetadataEvent(other, None) -> EntitiesAndCount(Set.empty[SchemaKey], 1)) ) } @@ -126,9 +134,9 @@ class MetadataSpec extends Specification with CatsIO { enrichedBis.contexts = """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-1","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}""" val entityBis = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 1)) - val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1)) + val previous = Map(MetadataEvent(enriched, None) -> EntitiesAndCount(entities, 1)) Metadata.recalculate(previous, List(enrichedBis)) should containTheSameElementsAs( - Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis, 2)) + Seq(MetadataEvent(enriched, None) -> EntitiesAndCount(entities + entityBis, 2)) ) } @@ -148,12 +156,48 @@ class MetadataSpec extends Specification with CatsIO { enrichedTer.contexts = """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-2","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}""" val entityTer = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 2)) - val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1)) + val previous = Map(MetadataEvent(enriched, None) -> EntitiesAndCount(entities, 1)) Metadata.recalculate(previous, List(enrichedBis, enrichedTer)) should containTheSameElementsAs( - Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis + entityTer, 3)) + Seq(MetadataEvent(enriched, None) -> EntitiesAndCount(entities + entityBis + entityTer, 3)) ) } } + + "put scenario_id in the JSON if defined" in { + val json = Metadata.mkWebhookEvent( + UUID.randomUUID(), + UUID.randomUUID(), + Instant.now(), + Instant.now(), + MetadataEvent( + SchemaKey("com.snowplowanalytics.snowplow", "whatever", "jsonschema", SchemaVer.Full(1, 0, 2)), + None, + None, + None, + Some("hello") + ), + 42 + ).toString + json.contains("\"scenario_id\" : \"hello\",") must beTrue + } + + "put null as scenario_id in the JSON if not defined" in { + val json = Metadata.mkWebhookEvent( + UUID.randomUUID(), + UUID.randomUUID(), + Instant.now(), + Instant.now(), + MetadataEvent( + SchemaKey("com.snowplowanalytics.snowplow", "whatever", "jsonschema", SchemaVer.Full(1, 0, 2)), + None, + None, + None, + None + ), + 42 + ).toString + json.contains("\"scenario_id\" : null,") must beTrue + } } }