Skip to content

Commit

Permalink
Add tracking scenario ID in observed_event if defined
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jul 19, 2023
1 parent f882a7d commit 30c5cdc
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ object Metadata {
)
}

case class TrackingScenarioInfo(schemaVendor: String, schemaName: String, field: String)
private val trackingScenarioInfo = TrackingScenarioInfo("com.snowplowanalytics.snowplow", "tracking_scenario", "id")

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

Expand Down Expand Up @@ -171,15 +174,17 @@ object Metadata {
* @param source - `app_id` for given event
* @param tracker - `v_tracker` for given event
* @param platform - The platform the app runs on for given event (`platform` field)
* @param scenarioId - Identifier for the tracking scenario the event is being tracked for
*/
case class MetadataEvent(
schema: SchemaKey,
source: Option[String],
tracker: Option[String],
platform: Option[String]
platform: Option[String],
scenarioId: Option[String]
)
object MetadataEvent {
def apply(event: EnrichedEvent): MetadataEvent =
def apply(event: EnrichedEvent, scenarioId: Option[String]): MetadataEvent =
MetadataEvent(
SchemaKey(
Option(event.event_vendor).getOrElse("unknown-vendor"),
Expand All @@ -189,7 +194,8 @@ object Metadata {
),
Option(event.app_id),
Option(event.v_tracker),
Option(event.platform)
Option(event.platform),
scenarioId
)
}

Expand Down Expand Up @@ -231,7 +237,7 @@ object Metadata {
} yield MetadataSnapshot(aggregates, periodStart, periodEnd)
}

def unwrapEntities(event: EnrichedEvent): Set[SchemaKey] = {
def unwrapEntities(event: EnrichedEvent): (Set[SchemaKey], Option[String]) = {
def unwrap(str: String) =
decode[SelfDescribingData[Json]](str)
.traverse(
Expand All @@ -243,7 +249,29 @@ object Metadata {
.flatMap(_.toList)
.toSet

unwrap(event.contexts) ++ unwrap(event.derived_contexts)
def getScenarioId(str: String): Option[String] =
(decode[SelfDescribingData[Json]](str) match {
case Right(contexts) =>
contexts.data.as[List[SelfDescribingData[Json]]] match {
case Right(entities) =>
entities.collectFirst { case sdj if sdj.schema.vendor == trackingScenarioInfo.schemaVendor && sdj.schema.name == trackingScenarioInfo.schemaName =>
sdj.data.hcursor.downField(trackingScenarioInfo.field).as[String] match {
case Right(scenarioId) =>
Some(scenarioId)
case _ =>
None
}
}
case _ => None
}
case _ => None
}).flatten

val scenarioId = getScenarioId(event.contexts)
val entities = (unwrap(event.contexts) ++ unwrap(event.derived_contexts))
.filterNot(schema => schema.vendor == trackingScenarioInfo.schemaVendor && schema.name == trackingScenarioInfo.schemaName)

(entities, scenarioId)
}

def schema(event: EnrichedEvent): SchemaKey =
Expand All @@ -255,7 +283,11 @@ object Metadata {
)

def recalculate(previous: Aggregates, events: List[EnrichedEvent]): Aggregates =
previous |+| events.map(e => Map(MetadataEvent(e) -> EntitiesAndCount(unwrapEntities(e), 1))).combineAll
previous |+| events.map { e =>
val (entities, scenarioId) = unwrapEntities(e)
Map(MetadataEvent(e, scenarioId) -> EntitiesAndCount(entities, 1))
}
.combineAll

def mkWebhookEvent(
organizationId: UUID,
Expand All @@ -266,7 +298,7 @@ object Metadata {
count: Int
): SelfDescribingData[Json] =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(6, 0, 0)),
SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(6, 0, 1)),
Json.obj(
"organizationId" -> organizationId.asJson,
"pipelineId" -> pipelineId.asJson,
Expand All @@ -276,6 +308,7 @@ object Metadata {
"source" -> event.source.getOrElse("unknown-source").asJson,
"tracker" -> event.tracker.getOrElse("unknown-tracker").asJson,
"platform" -> event.platform.getOrElse("unknown-platform").asJson,
"scenario_id" -> event.scenarioId.asJson,
"eventVolume" -> Json.fromInt(count),
"periodStart" -> periodStart.asJson,
"periodEnd" -> periodEnd.asJson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,30 @@ class MetadataSpec extends Specification with CatsIO {
"parse schemas for event's entities" in {
val event = new EnrichedEvent()
event.contexts =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}},{"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0","data":{"navigationStart":1581931694397,"unloadEventStart":1581931696046,"unloadEventEnd":1581931694764,"redirectStart":0,"redirectEnd":0,"fetchStart":1581931694397,"domainLookupStart":1581931694440,"domainLookupEnd":1581931694513,"connectStart":1581931694513,"connectEnd":1581931694665,"secureConnectionStart":1581931694572,"requestStart":1581931694665,"responseStart":1581931694750,"responseEnd":1581931694750,"domLoading":1581931694762,"domInteractive":1581931695963,"domContentLoadedEventStart":1581931696039,"domContentLoadedEventEnd":1581931696039,"domComplete":0,"loadEventStart":0,"loadEventEnd":0}}]}"""
val expected =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[
{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}},
{"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0","data":{"navigationStart":1581931694397,"unloadEventStart":1581931696046,"unloadEventEnd":1581931694764,"redirectStart":0,"redirectEnd":0,"fetchStart":1581931694397,"domainLookupStart":1581931694440,"domainLookupEnd":1581931694513,"connectStart":1581931694513,"connectEnd":1581931694665,"secureConnectionStart":1581931694572,"requestStart":1581931694665,"responseStart":1581931694750,"responseEnd":1581931694750,"domLoading":1581931694762,"domInteractive":1581931695963,"domContentLoadedEventStart":1581931696039,"domContentLoadedEventEnd":1581931696039,"domComplete":0,"loadEventStart":0,"loadEventEnd":0}},
{"schema": "iglu:com.snowplowanalytics.snowplow/tracking_scenario/jsonschema/1-0-0", "data": {"id": "tracking_scenario"}}
]}"""
val expectedEntitites =
Seq(
SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 0)),
SchemaKey("org.w3", "PerformanceTiming", "jsonschema", SchemaVer.Full(1, 0, 0))
)
val expectedScenarioId = Some("tracking_scenario")

val (actualEntities, actualScenarioId) = Metadata.unwrapEntities(event)

Metadata.unwrapEntities(event) should containTheSameElementsAs(expected)
actualEntities should containTheSameElementsAs(expectedEntitites)
actualScenarioId should beEqualTo(expectedScenarioId)
}

"recalculate event aggregates" should {

"add metadata event to empty state" in {
val enriched = MetadataSpec.enriched
Metadata.recalculate(Map.empty, List(enriched)) should containTheSameElementsAs(
Seq(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty, 1))
Seq(MetadataEvent(enriched, None) -> EntitiesAndCount(Set.empty, 1))
)
}

Expand All @@ -108,9 +116,9 @@ class MetadataSpec extends Specification with CatsIO {
val other = MetadataSpec.enriched
val v1_0_1 = SchemaVer.Full(1, 0, 1)
other.event_version = v1_0_1.asString
val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
val previous = Map(MetadataEvent(enriched, None) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
Metadata.recalculate(previous, List(other)) should containTheSameElementsAs(
previous.toSeq ++ Seq(MetadataEvent(other) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
previous.toSeq ++ Seq(MetadataEvent(other, None) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
)
}

Expand All @@ -126,9 +134,9 @@ class MetadataSpec extends Specification with CatsIO {
enrichedBis.contexts =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-1","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}"""
val entityBis = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 1))
val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1))
val previous = Map(MetadataEvent(enriched, None) -> EntitiesAndCount(entities, 1))
Metadata.recalculate(previous, List(enrichedBis)) should containTheSameElementsAs(
Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis, 2))
Seq(MetadataEvent(enriched, None) -> EntitiesAndCount(entities + entityBis, 2))
)
}

Expand All @@ -148,12 +156,48 @@ class MetadataSpec extends Specification with CatsIO {
enrichedTer.contexts =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-2","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}"""
val entityTer = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 2))
val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1))
val previous = Map(MetadataEvent(enriched, None) -> EntitiesAndCount(entities, 1))
Metadata.recalculate(previous, List(enrichedBis, enrichedTer)) should containTheSameElementsAs(
Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis + entityTer, 3))
Seq(MetadataEvent(enriched, None) -> EntitiesAndCount(entities + entityBis + entityTer, 3))
)
}
}

"put scenario_id in the JSON if defined" in {
val json = Metadata.mkWebhookEvent(
UUID.randomUUID(),
UUID.randomUUID(),
Instant.now(),
Instant.now(),
MetadataEvent(
SchemaKey("com.snowplowanalytics.snowplow", "whatever", "jsonschema", SchemaVer.Full(1, 0, 2)),
None,
None,
None,
Some("hello")
),
42
).toString
json.contains("\"scenario_id\" : \"hello\",") must beTrue
}

"put null as scenario_id in the JSON if not defined" in {
val json = Metadata.mkWebhookEvent(
UUID.randomUUID(),
UUID.randomUUID(),
Instant.now(),
Instant.now(),
MetadataEvent(
SchemaKey("com.snowplowanalytics.snowplow", "whatever", "jsonschema", SchemaVer.Full(1, 0, 2)),
None,
None,
None,
None
),
42
).toString
json.contains("\"scenario_id\" : null,") must beTrue
}
}
}

Expand Down

0 comments on commit 30c5cdc

Please sign in to comment.