Skip to content

Commit

Permalink
refactor(android): implement batch insertions of events and spans in db
Browse files Browse the repository at this point in the history
  • Loading branch information
abhaysood committed Dec 24, 2024
1 parent f5c3eed commit 4739514
Show file tree
Hide file tree
Showing 16 changed files with 527 additions and 116 deletions.
5 changes: 2 additions & 3 deletions android/docs/internal-documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ SQLite database is configured with the following settings:
* [journal_mode](https://sqlite.org/pragma.html#pragma_journal_mode): WAL
* [foreign_keys](https://sqlite.org/pragma.html#pragma_foreign_keys): ON

Events are written to the database & file storage (if needed) as soon as they are received. This can be
improved in future by adding a queue which batches the inserts. However, as WAL mode enabled, this optimization
has been ignored for now.
Batches of events and spans are inserted in database either every 3 seconds or if either the spans or events buffer reaches 30.
At time of a crash, events and spans are immediately persisted to the db.

# Batching & export

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ class FakeConfigProvider : ConfigProvider {
override val maxSpanNameLength: Int = 64
override val maxCheckpointNameLength: Int = 64
override val maxCheckpointsPerSpan: Int = 100
override val maxInMemorySignalsQueueSize: Int = 30
override val inMemorySignalsQueueFlushRateMs: Long = 3000
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import sh.measure.android.storage.Database
import sh.measure.android.storage.DatabaseImpl
import sh.measure.android.storage.FileStorage
import sh.measure.android.storage.FileStorageImpl
import sh.measure.android.storage.PeriodicSignalStoreScheduler
import sh.measure.android.storage.PrefsStorage
import sh.measure.android.storage.PrefsStorageImpl
import sh.measure.android.storage.SignalStore
Expand Down Expand Up @@ -228,6 +229,14 @@ internal class TestMeasureInitializer(
database = database,
fileStorage = fileStorage,
idProvider = idProvider,
configProvider = configProvider,
),
override val periodicSignalStoreScheduler: PeriodicSignalStoreScheduler = PeriodicSignalStoreScheduler(
logger = logger,
defaultExecutor = executorServiceRegistry.defaultExecutor(),
ioExecutor = executorServiceRegistry.ioExecutor(),
signalStore = signalStore,
configProvider = configProvider,
),
override val resumedActivityProvider: ResumedActivityProvider = ResumedActivityProviderImpl(
application,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import sh.measure.android.storage.Database
import sh.measure.android.storage.DatabaseImpl
import sh.measure.android.storage.FileStorage
import sh.measure.android.storage.FileStorageImpl
import sh.measure.android.storage.PeriodicSignalStoreScheduler
import sh.measure.android.storage.PrefsStorage
import sh.measure.android.storage.PrefsStorageImpl
import sh.measure.android.storage.SignalStore
Expand Down Expand Up @@ -228,6 +229,14 @@ internal class MeasureInitializerImpl(
database = database,
fileStorage = fileStorage,
idProvider = idProvider,
configProvider = configProvider,
),
override val periodicSignalStoreScheduler: PeriodicSignalStoreScheduler = PeriodicSignalStoreScheduler(
logger = logger,
defaultExecutor = executorServiceRegistry.defaultExecutor(),
ioExecutor = executorServiceRegistry.ioExecutor(),
signalStore = signalStore,
configProvider = configProvider,
),
override val resumedActivityProvider: ResumedActivityProvider = ResumedActivityProviderImpl(
application,
Expand Down Expand Up @@ -450,4 +459,5 @@ internal interface MeasureInitializer {
val powerStateProvider: PowerStateProvider
val spanCollector: SpanCollector
val customEventCollector: CustomEventCollector
val periodicSignalStoreScheduler: PeriodicSignalStoreScheduler
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ internal class MeasureInternal(measureInitializer: MeasureInitializer) : AppLife
private val configProvider by lazy { measureInitializer.configProvider }
private val dataCleanupService by lazy { measureInitializer.dataCleanupService }
private val powerStateProvider by lazy { measureInitializer.powerStateProvider }
private val periodicSignalStoreScheduler by lazy { measureInitializer.periodicSignalStoreScheduler }
private var isStarted: Boolean = false
private var startLock = Any()

Expand Down Expand Up @@ -122,6 +123,7 @@ internal class MeasureInternal(measureInitializer: MeasureInitializer) : AppLife
periodicExporter.resume()
spanCollector.register()
customEventCollector.register()
periodicSignalStoreScheduler.register()
}

override fun onAppForeground() {
Expand All @@ -148,6 +150,7 @@ internal class MeasureInternal(measureInitializer: MeasureInitializer) : AppLife
periodicExporter.pause()
powerStateProvider.unregister()
networkChangesCollector.unregister()
periodicSignalStoreScheduler.onAppBackground()
dataCleanupService.clearStaleData()
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.R) {
appExitCollector.collect()
Expand Down Expand Up @@ -221,5 +224,6 @@ internal class MeasureInternal(measureInitializer: MeasureInitializer) : AppLife
periodicExporter.unregister()
spanCollector.unregister()
customEventCollector.unregister()
periodicSignalStoreScheduler.unregister()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ internal data class Config(
override val maxSpanNameLength: Int = 64
override val maxCheckpointNameLength: Int = 64
override val maxCheckpointsPerSpan: Int = 100
override val maxInMemorySignalsQueueSize: Int = 30
override val inMemorySignalsQueueFlushRateMs: Long = 3000
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ internal class ConfigProviderImpl(
get() = getMergedConfig { customEventNameRegex }
override val maxUserDefinedAttributesPerEvent: Int
get() = getMergedConfig { maxUserDefinedAttributesPerEvent }
override val maxInMemorySignalsQueueSize: Int
get() = getMergedConfig { maxInMemorySignalsQueueSize }
override val inMemorySignalsQueueFlushRateMs: Long
get() = getMergedConfig { inMemorySignalsQueueFlushRateMs }

override fun shouldTrackHttpBody(url: String, contentType: String?): Boolean {
if (!trackHttpBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,15 @@ internal interface InternalConfig {
* Max checkpoints per span. Defaults to 100.
*/
val maxCheckpointsPerSpan: Int

/**
* Maximum number of signals (events and spans) in the in memory queue. Defaults to 30.
*/
val maxInMemorySignalsQueueSize: Int

/**
* The timeout after which signals are attempted to be flushed to disk in milliseconds.
* Defaults to 3000ms.
*/
val inMemorySignalsQueueFlushRateMs: Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ internal interface Database : Closeable {
* Returns the count of spans stored in spans table.
*/
fun getSpansCount(): Int

/**
* Inserts a batch of events and spans in a single transaction.
*/
fun insertSignals(eventEntities: List<EventEntity>, spanEntities: List<SpanEntity>): Boolean
}

/**
Expand Down Expand Up @@ -360,8 +365,7 @@ internal class DatabaseImpl(
put(SpansBatchTable.COL_BATCH_ID, batchEntity.batchId)
put(SpansBatchTable.COL_CREATED_AT, batchEntity.createdAt)
}
val result =
writableDatabase.insert(SpansBatchTable.TABLE_NAME, null, spanBatches)
val result = writableDatabase.insert(SpansBatchTable.TABLE_NAME, null, spanBatches)
if (result == -1L) {
logger.log(LogLevel.Error, "Failed to insert batched span = $spanId")
return false
Expand Down Expand Up @@ -854,6 +858,90 @@ internal class DatabaseImpl(
return count
}

override fun insertSignals(
eventEntities: List<EventEntity>,
spanEntities: List<SpanEntity>,
): Boolean {
writableDatabase.beginTransaction()
try {
// Batch insert events
eventEntities.forEach { event ->
val values = ContentValues(11).apply {
put(EventTable.COL_ID, event.id)
put(EventTable.COL_TYPE, event.type)
put(EventTable.COL_TIMESTAMP, event.timestamp)
put(EventTable.COL_SESSION_ID, event.sessionId)
put(EventTable.COL_USER_TRIGGERED, event.userTriggered)
if (event.filePath != null) {
put(EventTable.COL_DATA_FILE_PATH, event.filePath)
} else if (event.serializedData != null) {
put(EventTable.COL_DATA_SERIALIZED, event.serializedData)
}
put(EventTable.COL_ATTRIBUTES, event.serializedAttributes)
put(EventTable.COL_USER_DEFINED_ATTRIBUTES, event.serializedUserDefAttributes)
put(EventTable.COL_ATTACHMENT_SIZE, event.attachmentsSize)
put(EventTable.COL_ATTACHMENTS, event.serializedAttachments)
}

if (writableDatabase.insert(EventTable.TABLE_NAME, null, values) == -1L) {
return false
}

// Batch insert attachments for this event
event.attachmentEntities?.forEach { attachment ->
val attachmentValues = ContentValues(7).apply {
put(AttachmentTable.COL_ID, attachment.id)
put(AttachmentTable.COL_EVENT_ID, event.id)
put(AttachmentTable.COL_TYPE, attachment.type)
put(AttachmentTable.COL_TIMESTAMP, event.timestamp)
put(AttachmentTable.COL_SESSION_ID, event.sessionId)
put(AttachmentTable.COL_FILE_PATH, attachment.path)
put(AttachmentTable.COL_NAME, attachment.name)
}

if (writableDatabase.insert(
AttachmentTable.TABLE_NAME,
null,
attachmentValues,
) == -1L
) {
return false
}
}
}

// Batch insert spans
spanEntities.forEach { span ->
val values = ContentValues(12).apply {
put(SpansTable.COL_NAME, span.name)
put(SpansTable.COL_SESSION_ID, span.sessionId)
put(SpansTable.COL_SPAN_ID, span.spanId)
put(SpansTable.COL_TRACE_ID, span.traceId)
put(SpansTable.COL_PARENT_ID, span.parentId)
put(SpansTable.COL_START_TIME, span.startTime)
put(SpansTable.COL_END_TIME, span.endTime)
put(SpansTable.COL_DURATION, span.duration)
put(SpansTable.COL_SERIALIZED_ATTRS, span.serializedAttributes)
put(SpansTable.COL_SERIALIZED_SPAN_EVENTS, span.serializedCheckpoints)
put(SpansTable.COL_SAMPLED, span.sampled)
put(SpansTable.COL_STATUS, span.status.value)
}

if (writableDatabase.insert(SpansTable.TABLE_NAME, null, values) == -1L) {
return false
}
}

writableDatabase.setTransactionSuccessful()
return true
} catch (e: SQLiteException) {
logger.log(LogLevel.Error, "Failed to insert signals", e)
return false
} finally {
writableDatabase.endTransaction()
}
}

override fun getSessionForAppExit(pid: Int): AppExitCollector.Session? {
readableDatabase.rawQuery(Sql.getSessionForAppExit(pid), null).use {
if (it.count == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package sh.measure.android.storage

import sh.measure.android.config.ConfigProvider
import sh.measure.android.executors.MeasureExecutorService
import sh.measure.android.logger.LogLevel
import sh.measure.android.logger.Logger
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit

internal class PeriodicSignalStoreScheduler(
private val logger: Logger,
private val defaultExecutor: MeasureExecutorService,
private val ioExecutor: MeasureExecutorService,
private val signalStore: SignalStore,
private val configProvider: ConfigProvider,
) {

@Volatile
private var future: Future<*>? = null

fun register() {
if (future != null) {
return
}
try {
future = defaultExecutor.scheduleAtFixedRate(
{
ioExecutor.submit {
signalStore.flush()
}
},
initialDelay = configProvider.inMemorySignalsQueueFlushRateMs,
delayMillis = configProvider.inMemorySignalsQueueFlushRateMs,
TimeUnit.MILLISECONDS,
)
} catch (e: RejectedExecutionException) {
logger.log(LogLevel.Error, "Failed to start periodic signal store scheduler", e)
return
}
}

fun unregister() {
future?.cancel(false)
future = null
ioExecutor.submit {
signalStore.flush()
}
}

fun onAppBackground() {
ioExecutor.submit {
signalStore.flush()
}
}
}
Loading

0 comments on commit 4739514

Please sign in to comment.