Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
silesky committed Jan 29, 2025
1 parent d0bebb8 commit c07edbc
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions packages/signals/signals/src/core/signals/signals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,23 @@ export class Signals implements ISignals {
/**
* Flush/process any signals that were emitted before the start method was called.
*/
private flushPreStartBuffer = (processor: SignalEventProcessor) => {
private processSignals = (processor: SignalEventProcessor) => {
logger.debug(
`Flushing ${this.preStartBuffer.length} events in pre-start buffer`,
this.preStartBuffer
)
this.signalEmitter.unsubscribe(this.addToPreStartBuffer)
// process any signals that were emitted before start was called -- since our signalEmitter has the ability to enqueu signals until init, maybe can change?
this.preStartBuffer.forEach(async (signal) => {
void processor.process(signal, await this.buffer.getAll())
})
this.preStartBuffer = []

// listen + process new signals - meaning, executing them in the analytics runtime
// This could be implemented as a plugin?
this.signalEmitter.subscribe(async (signal) => {
void processor.process(signal, await this.buffer.getAll())
})
}

/**
Expand All @@ -110,34 +117,26 @@ export class Signals implements ISignals {
.autoInstrumentationSettings?.sampleRate ?? 0,
})

// promise will resolve once all the middleware has been initialized.
void this.signalEmitter.initialize({
settings: this.globalSettings,
writeKey: analyticsService.instance.settings.writeKey,
})

const sandbox = new Sandbox(
new SandboxSettings(this.globalSettings.sandbox)
)

const processor = new SignalEventProcessor(
analyticsService.instance,
sandbox
new Sandbox(new SandboxSettings(this.globalSettings.sandbox))
)

// flush pre start buffer and then actually process signals
void this.flushPreStartBuffer(processor)
this.signalEmitter.subscribe(async (signal) => {
void processor.process(signal, await this.buffer.getAll())
})
void this.processSignals(processor)

await this.registerGenerator([
analyticsService.createSegmentInstrumentationEventGenerator(),
])

// flush pre start buffer and send any signals
await this.signalsClient.init({
writeKey: analyticsService.instance.settings.writeKey,
})

// load emitter and flush any queued signals to all subscribers
void this.signalEmitter.initialize({
settings: this.globalSettings,
writeKey: analyticsService.instance.settings.writeKey,
})
}

stop() {
Expand Down

0 comments on commit c07edbc

Please sign in to comment.