Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding disk buffering, part 4 #221

Merged
merged 15 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
opentelemetry = "1.35.0"
opentelemetry-alpha = "1.32.1-alpha"
opentelemetry-semconv = "1.21.0-alpha"
opentelemetry-contrib = "1.31.0-alpha"
opentelemetry-contrib = "1.33.0-alpha"
mockito = "5.10.0"
junit = "5.10.2"
byteBuddy = "1.14.12"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import android.util.Log;
import io.opentelemetry.android.config.OtelRumConfig;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
import io.opentelemetry.android.features.diskbuffering.SignalFromDiskExporter;
import io.opentelemetry.android.features.diskbuffering.scheduler.ExportScheduleHandler;
import io.opentelemetry.android.instrumentation.InstrumentedApplication;
import io.opentelemetry.android.instrumentation.activity.VisibleScreenTracker;
import io.opentelemetry.android.instrumentation.anr.AnrDetector;
Expand All @@ -31,8 +33,9 @@
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.contrib.disk.buffering.SpanDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.SpanFromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.SpanToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.StorageConfiguration;
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
Expand Down Expand Up @@ -289,24 +292,80 @@ public OpenTelemetryRum build() {

applyConfiguration();

DiskBufferingConfiguration diskBufferingConfiguration =
config.getDiskBufferingConfiguration();
SpanExporter spanExporter = buildSpanExporter();
SignalFromDiskExporter signalFromDiskExporter = null;
if (diskBufferingConfiguration.isEnabled()) {
try {
StorageConfiguration storageConfiguration = createStorageConfiguration();
final SpanExporter originalSpanExporter = spanExporter;
spanExporter =
SpanToDiskExporter.create(originalSpanExporter, storageConfiguration);

signalFromDiskExporter =
new SignalFromDiskExporter(
SpanFromDiskExporter.create(
originalSpanExporter, storageConfiguration),
null,
null);
} catch (IOException e) {
Log.e(RumConstants.OTEL_RUM_LOG_TAG, "Could not initialize disk exporters.", e);
}
}

OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder()
.setTracerProvider(buildTracerProvider(sessionId, application))
.setTracerProvider(
buildTracerProvider(sessionId, application, spanExporter))
.setMeterProvider(buildMeterProvider(application))
.setLoggerProvider(buildLoggerProvider(application))
.setPropagators(buildFinalPropagators())
.build();

scheduleDiskTelemetryReader(signalFromDiskExporter, diskBufferingConfiguration);

SdkPreconfiguredRumBuilder delegate =
new SdkPreconfiguredRumBuilder(application, sdk, sessionId);
instrumentationInstallers.forEach(delegate::addInstrumentation);
return delegate.build();
}

private StorageConfiguration createStorageConfiguration() throws IOException {
DiskManager diskManager = DiskManager.create(config.getDiskBufferingConfiguration());
return StorageConfiguration.builder()
.setMaxFileSize(diskManager.getMaxCacheFileSize())
.setMaxFolderSize(diskManager.getMaxFolderSize())
.setRootDir(diskManager.getSignalsBufferDir())
.setTemporaryFileProvider(
new SimpleTemporaryFileProvider(diskManager.getTemporaryDir()))
.build();
}

private void scheduleDiskTelemetryReader(
@Nullable SignalFromDiskExporter signalExporter,
DiskBufferingConfiguration diskBufferingConfiguration) {
ExportScheduleHandler exportScheduleHandler =
diskBufferingConfiguration.getExportScheduleHandler();
if (signalExporter == null) {
// Disabling here allows to cancel previously scheduled exports using tools that
// can run even after the app has been terminated (such as WorkManager).
// But for in-memory only schedulers, nothing should need to be disabled.
exportScheduleHandler.disable();
} else {
// Not null means that disk buffering is enabled and disk exporters are successfully
// initialized.
SignalFromDiskExporter.set(signalExporter);
exportScheduleHandler.enable();
}
}

/** Leverage the configuration to wire up various instrumentation components. */
private void applyConfiguration() {
if (config.shouldGenerateSdkInitializationEvents()) {
initializationEvents = new SdkInitializationEvents();
if (initializationEvents == InitializationEvents.NO_OP) {
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
initializationEvents = new SdkInitializationEvents();
}
initializationEvents.recordConfiguration(config);
}
initializationEvents.sdkInitializationStarted();
Expand Down Expand Up @@ -406,13 +465,14 @@ private CurrentNetworkProvider getOrCreateCurrentNetworkProvider() {
return currentNetworkProvider;
}

private SdkTracerProvider buildTracerProvider(SessionId sessionId, Application application) {
private SdkTracerProvider buildTracerProvider(
SessionId sessionId, Application application, SpanExporter spanExporter) {
SdkTracerProviderBuilder tracerProviderBuilder =
SdkTracerProvider.builder()
.setResource(resource)
.addSpanProcessor(new SessionIdSpanAppender(sessionId));

SpanExporter spanExporter = buildSpanExporter();
initializationEvents.spanExporterInitialized(spanExporter);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was having issues to verify that the right type of exporter (the disk exporter) was properly set in the unit tests, so one of the least ugly ways I could manage to do so was by adding this new function to the InitializationEvents interface and also a package-private setter for it below in this class (to set a mock impl from the tests). I'm open to better alternatives I might have missed 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe initializationEvents takes spanExporter in the ctor? (L331) so if you can provide/mock the spanExporter, you don't need a setter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if, instead of creating an init event for the span exporter, there was an init event for the disk buffering being initialized? That's really the feature here, and then you can assert that it was called I guess?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, also cool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if, instead of creating an init event for the span exporter, there was an init event for the disk buffering being initialized? That's really the feature here, and then you can assert that it was called I guess?

I see what you mean, although the initialization of the disk buffering tool doesn't say anything about the feature actually being used in the OpenTelemetry instance that has been built here, which I believe should be what we verify to guarantee that the RUM config is honored, and all those pieces are out of the scope of the disk buffering feature.

Copy link
Contributor Author

@LikeTheSalad LikeTheSalad Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe initializationEvents takes spanExporter in the ctor? (L331) so if you can provide/mock the spanExporter, you don't need a setter.

Maybe a setter could work, the thing about the constructor is that it needs to be called way before the exporter is created. But also it would mean that we need to cast the impl to SdkInitializationEvents which is essentially just dead code rn so I'm not sure we should rely on that impl for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see the issue, since GitHub collapsed the code diff, I thought that these changes and the changes on L331 were a single method, so you could have used a ctor, my bad.

BatchSpanProcessor batchSpanProcessor = BatchSpanProcessor.builder(spanExporter).build();
tracerProviderBuilder.addSpanProcessor(batchSpanProcessor);

Expand All @@ -426,32 +486,7 @@ private SdkTracerProvider buildTracerProvider(SessionId sessionId, Application a
private SpanExporter buildSpanExporter() {
// TODO: Default to otlp...but how can we make endpoint and auth mandatory?
SpanExporter defaultExporter = LoggingSpanExporter.create();
SpanExporter spanExporter = defaultExporter;
DiskBufferingConfiguration diskBufferingConfiguration =
config.getDiskBufferingConfiguration();
if (diskBufferingConfiguration.isEnabled()) {
try {
spanExporter = createDiskExporter(defaultExporter, diskBufferingConfiguration);
} catch (IOException e) {
Log.w(RumConstants.OTEL_RUM_LOG_TAG, "Could not create span disk exporter.", e);
}
}
return spanExporterCustomizer.apply(spanExporter);
}

private static SpanExporter createDiskExporter(
SpanExporter defaultExporter, DiskBufferingConfiguration diskBufferingConfiguration)
throws IOException {
DiskManager diskManager = DiskManager.create(diskBufferingConfiguration);
StorageConfiguration storageConfiguration =
StorageConfiguration.builder()
.setMaxFileSize(diskManager.getMaxCacheFileSize())
.setMaxFolderSize(diskManager.getMaxFolderSize())
.setTemporaryFileProvider(
new SimpleTemporaryFileProvider(diskManager.getTemporaryDir()))
.build();
return SpanDiskExporter.create(
defaultExporter, diskManager.getSignalsBufferDir(), storageConfiguration);
return spanExporterCustomizer.apply(defaultExporter);
}

private SdkMeterProvider buildMeterProvider(Application application) {
Expand All @@ -478,4 +513,9 @@ private ContextPropagators buildFinalPropagators() {
TextMapPropagator defaultPropagator = buildDefaultPropagator();
return ContextPropagators.create(propagatorCustomizer.apply(defaultPropagator));
}

OpenTelemetryRumBuilder setInitializationEvents(InitializationEvents initializationEvents) {
this.initializationEvents = initializationEvents;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@

package io.opentelemetry.android.features.diskbuffering;

import io.opentelemetry.android.features.diskbuffering.scheduler.DefaultExportScheduleHandler;
import io.opentelemetry.android.features.diskbuffering.scheduler.DefaultExportScheduler;
import io.opentelemetry.android.features.diskbuffering.scheduler.ExportScheduleHandler;

/** Configuration for disk buffering. */
public final class DiskBufferingConfiguration {
private final boolean enabled;
private final int maxCacheSize;
private final ExportScheduleHandler exportScheduleHandler;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is problematic because it introduces an action class as a field of a configuration data class. It mixes up dumb/simple config with things that do work....and I think that's a troublesome path.

What are the use cases that you can think of for allowing a user to pass in their own ExportScheduleHandler instance to the DiskBufferingConfiguration? I thought that maybe it was to specify their own schedule/period, but the way the code is structured now I'm not even sure that's possible.

If that was the main intent, I'd rather that the period be the configurable thing, and the builders for the action classes can do the building by using the config, not by having the config do the building itself. Make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the use cases that you can think of for allowing a user to pass in their own ExportScheduleHandler instance to the DiskBufferingConfiguration?

The default "scheduler" only works while the app is running and I'm guessing some users would like to attempt exporting data even if their app isn't running by using WorkManager for example.

Though I agree that the config should generally be made of data instead of functions or the like, maybe since the disk buffering feature is needed to initialize RUM, we could categorize it as an "initialization" feature, which will allow us to get rid of DiskBufferingConfiguration altogether as we would only use OtelRumConfig to set all initialization params, and then we could move the ExportScheduleHandler setter over to OpenTelemetryRumBuilder if we decided to go with the proposed structure in here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with moving this around after this PR, possibly as part of dealing with the larger config structure questions.

private static final int DEFAULT_MAX_CACHE_SIZE = 60 * 1024 * 1024;
private static final int MAX_FILE_SIZE = 1024 * 1024;

private DiskBufferingConfiguration(Builder builder) {
this.enabled = builder.enabled;
this.maxCacheSize = builder.maxCacheSize;
this.exportScheduleHandler = builder.exportScheduleHandler;
}

public static Builder builder() {
Expand All @@ -33,9 +39,15 @@ public int getMaxCacheFileSize() {
return MAX_FILE_SIZE;
}

public ExportScheduleHandler getExportScheduleHandler() {
return exportScheduleHandler;
}

public static final class Builder {
private boolean enabled;
private int maxCacheSize;
private ExportScheduleHandler exportScheduleHandler =
new DefaultExportScheduleHandler(new DefaultExportScheduler());

private Builder(boolean enabled, int maxCacheSize) {
this.enabled = enabled;
Expand All @@ -58,6 +70,15 @@ public Builder setMaxCacheSize(int maxCacheSize) {
return this;
}

/**
* Sets a scheduler that will take care of periodically read data stored in disk and export
* it.
*/
public Builder setExportScheduleHandler(ExportScheduleHandler exportScheduleHandler) {
this.exportScheduleHandler = exportScheduleHandler;
return this;
}

public DiskBufferingConfiguration build() {
return new DiskBufferingConfiguration(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.features.diskbuffering

import androidx.annotation.WorkerThread
import io.opentelemetry.contrib.disk.buffering.LogRecordFromDiskExporter
import io.opentelemetry.contrib.disk.buffering.MetricFromDiskExporter
import io.opentelemetry.contrib.disk.buffering.SpanFromDiskExporter
import java.io.IOException
import java.util.concurrent.TimeUnit

/**
* Entrypoint to read and export previously cached signals.
*/
class SignalFromDiskExporter
@JvmOverloads
internal constructor(
private val spanFromDiskExporter: SpanFromDiskExporter?,
private val metricFromDiskExporter: MetricFromDiskExporter?,
private val logRecordFromDiskExporter: LogRecordFromDiskExporter?,
private val exportTimeoutInMillis: Long = TimeUnit.SECONDS.toMillis(5),
) {
/**
* A batch contains all the signals that arrived in one call to [SpanDiskExporter.export]. So if
* that function is called 5 times, then there will be 5 batches in disk. This function reads
* and exports ONE batch every time is called.
*
* @return TRUE if it found data in disk and the exporter succeeded. FALSE if any of those conditions were
* not met.
*/
@WorkerThread
@Throws(IOException::class)
fun exportBatchOfSpans(): Boolean {
return spanFromDiskExporter?.exportStoredBatch(
exportTimeoutInMillis,
TimeUnit.MILLISECONDS,
) ?: false
}

/**
* A batch contains all the signals that arrived in one call to [MetricDiskExporter.export]. So if
* that function is called 5 times, then there will be 5 batches in disk. This function reads
* and exports ONE batch every time is called.
*
* @return TRUE if it found data in disk and the exporter succeeded. FALSE if any of those conditions were
* not met.
*/
@WorkerThread
@Throws(IOException::class)
fun exportBatchOfMetrics(): Boolean {
return metricFromDiskExporter?.exportStoredBatch(
exportTimeoutInMillis,
TimeUnit.MILLISECONDS,
) ?: false
}

/**
* A batch contains all the signals that arrived in one call to [LogRecordDiskExporter.export]. So if
* that function is called 5 times, then there will be 5 batches in disk. This function reads
* and exports ONE batch every time is called.
*
* @return TRUE if it found data in disk and the exporter succeeded. FALSE if any of those conditions were
* not met.
*/
@WorkerThread
@Throws(IOException::class)
fun exportBatchOfLogs(): Boolean {
return logRecordFromDiskExporter?.exportStoredBatch(
exportTimeoutInMillis,
TimeUnit.MILLISECONDS,
) ?: false
}

/**
* Convenience method that attempts to export all kinds of signals from disk.
*
* @return TRUE if at least one of the signals were successfully exported, FALSE if no signal
* of any kind was exported.
*/
@WorkerThread
@Throws(IOException::class)
fun exportBatchOfEach(): Boolean {
var atLeastOneWorked = exportBatchOfSpans()
if (exportBatchOfMetrics()) {
atLeastOneWorked = true
}
if (exportBatchOfLogs()) {
atLeastOneWorked = true
}
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
return atLeastOneWorked
}

companion object {
private var instance: SignalFromDiskExporter? = null

@JvmStatic
fun get(): SignalFromDiskExporter? {
return instance
}

@JvmStatic
fun set(signalFromDiskExporter: SignalFromDiskExporter) {
check(instance == null) { "An instance is already set. You can only set it once." }
instance = signalFromDiskExporter
}

@JvmStatic
fun resetForTesting() {
instance = null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

package io.opentelemetry.android.features.diskbuffering.scheduler

import android.util.Log
import io.opentelemetry.android.RumConstants
import io.opentelemetry.android.features.diskbuffering.SignalFromDiskExporter
import io.opentelemetry.android.internal.services.periodicwork.PeriodicRunnable
import java.io.IOException
import java.util.concurrent.TimeUnit

class DefaultExportScheduler : PeriodicRunnable() {
Expand All @@ -14,11 +18,19 @@ class DefaultExportScheduler : PeriodicRunnable() {
}

override fun onRun() {
// TODO for next PR.
val exporter = SignalFromDiskExporter.get() ?: return

try {
do {
val didExport = exporter.exportBatchOfEach()
} while (didExport)
} catch (e: IOException) {
Log.e(RumConstants.OTEL_RUM_LOG_TAG, "Error while exporting signals from disk.", e)
}
}

override fun shouldStopRunning(): Boolean {
return false
return SignalFromDiskExporter.get() == null
}

override fun minimumDelayUntilNextRunInMillis(): Long {
Expand Down
Loading