Skip to content

Commit

Permalink
add scope to the payload before send
Browse files Browse the repository at this point in the history
  • Loading branch information
vahidlazio committed Mar 1, 2024
1 parent 3d48deb commit 37050f1
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 76 deletions.
89 changes: 13 additions & 76 deletions Provider/src/main/java/com/spotify/confidence/EventSender.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
package com.spotify.confidence

import android.content.Context
import com.spotify.confidence.client.Clock
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch

interface EventSender {
fun emit(definition: String, payload: Map<String, String>)
Expand All @@ -26,83 +20,20 @@ data class EventsScope(
)

class EventSenderImpl private constructor(
private val context: Context,
private val clientSecret: String,
private val scope: EventsScope = EventsScope(),
private val clock: Clock = Clock.CalendarBacked.systemUTC(),
private val flushPolicies: List<FlushPolicy> = listOf(),
private val eventStorage: EventStorage = EventStorageImpl(context),
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
private val eventSenderEngine: EventSenderEngine,
private val scope: EventsScope = EventsScope()
) : EventSender {

private lateinit var uploader: EventSenderUploader

private val writeReqChannel: Channel<Event> = Channel()
private val sendChannel: Channel<String> = Channel()
private val coroutineScope by lazy {
CoroutineScope(SupervisorJob() + dispatcher)
}
private val exceptionHandler by lazy {
CoroutineExceptionHandler { _, _ ->
// do nothing
}
}

init {
coroutineScope.launch {
for (event in writeReqChannel) {
eventStorage.writeEvent(event)
flushPolicies.forEach { it.hit(event) }
val shouldFlush = flushPolicies.any { it.shouldFlush() }
if (shouldFlush) {
flushPolicies.forEach { it.reset() }
sendChannel.send(SEND_SIG)
}
}
}

// upload might throw exceptions
coroutineScope.launch(exceptionHandler) {
for (flush in sendChannel) {
eventStorage.rollover()
val readyFiles = eventStorage.batchReadyFiles()
for (readyFile in readyFiles) {
val batch = EventBatch(
clientSecret = clientSecret,
events = eventStorage.eventsFor(readyFile),
sendTime = clock.currentTime()
)
val shouldCleanup = uploader.upload(batch)
if (shouldCleanup) {
readyFile.delete()
}
}
}
}
}
override fun emit(definition: String, payload: Map<String, String>) {
coroutineScope.launch {
val event = Event(
eventDefinition = definition,
eventTime = clock.currentTime(),
payload = payload
)
writeReqChannel.send(event)
}
eventSenderEngine.emit(definition, payload + scope.fields())
}

override fun withScope(scope: EventsScope): EventSender {
val combinedFields = {
scope.fields() + this.scope.fields()
}
return EventSenderImpl(
context,
clientSecret,
EventsScope(fields = combinedFields),
clock,
flushPolicies,
eventStorage,
dispatcher
eventSenderEngine,
EventsScope(fields = combinedFields)
)
}

Expand All @@ -112,12 +43,18 @@ class EventSenderImpl private constructor(
context: Context,
clientSecret: String,
scope: EventsScope,
flushPolicies: List<FlushPolicy> = listOf(),
dispatcher: CoroutineDispatcher = Dispatchers.IO
): EventSender = instance ?: run {
EventSenderImpl(context, clientSecret, scope, dispatcher = dispatcher).also {
val engine = EventSenderEngine(
EventStorageImpl(context),
clientSecret,
flushPolicies,
dispatcher = dispatcher
)
EventSenderImpl(engine, scope).also {
instance = it
}
}
private const val SEND_SIG = "FLUSH"
}
}
82 changes: 82 additions & 0 deletions Provider/src/main/java/com/spotify/confidence/EventSenderEngine.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.spotify.confidence

import com.spotify.confidence.client.Clock
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch

internal class EventSenderEngine(
private val eventStorage: EventStorage,
private val clientSecret: String,
private val flushPolicies: List<FlushPolicy> = listOf(),
private val clock: Clock = Clock.CalendarBacked.systemUTC(),
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
private val writeReqChannel: Channel<Event> = Channel()
private val sendChannel: Channel<String> = Channel()
private val coroutineScope by lazy {
CoroutineScope(SupervisorJob() + dispatcher)
}
private val exceptionHandler by lazy {
CoroutineExceptionHandler { _, _ ->
// do nothing
}
}
private lateinit var uploader: EventSenderUploader

init {
coroutineScope.launch {
for (event in writeReqChannel) {
eventStorage.writeEvent(event)
flushPolicies.forEach { it.hit(event) }
val shouldFlush = flushPolicies.any { it.shouldFlush() }
if (shouldFlush) {
flushPolicies.forEach { it.reset() }
sendChannel.send(SEND_SIG)
}
}
}

// upload might throw exceptions
coroutineScope.launch(exceptionHandler) {
for (flush in sendChannel) {
eventStorage.rollover()
val readyFiles = eventStorage.batchReadyFiles()
for (readyFile in readyFiles) {
val batch = EventBatch(
clientSecret = clientSecret,
events = eventStorage.eventsFor(readyFile),
sendTime = clock.currentTime()
)
val shouldCleanup = uploader.upload(batch)
if (shouldCleanup) {
readyFile.delete()
}
}
}
}
}
fun emit(definition: String, payload: Map<String, String>) {
coroutineScope.launch {
val event = Event(
eventDefinition = definition,
eventTime = clock.currentTime(),
payload = payload
)
writeReqChannel.send(event)
}
}

fun stop() {
coroutineScope.cancel()
}

companion object {
private const val SEND_SIG = "FLUSH"
}
}

0 comments on commit 37050f1

Please sign in to comment.