diff --git a/config/pulse.php b/config/pulse.php index 28543ed9..c9fa055f 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -79,6 +79,8 @@ 'ingest' => [ 'driver' => env('PULSE_INGEST_DRIVER', 'storage'), + 'buffer' => env('PULSE_INGEST_BUFFER', 5_000), + 'trim' => [ 'lottery' => [1, 1_000], 'keep' => '7 days', diff --git a/src/Pulse.php b/src/Pulse.php index f3e979f8..ade8fd76 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -86,6 +86,11 @@ class Pulse */ protected $css = [__DIR__.'/../dist/pulse.css']; + /** + * Indicates that Pulse is currently evaluating the buffer. + */ + protected bool $evaluatingBuffer = false; + /** * Create a new Pulse instance. */ @@ -153,6 +158,8 @@ public function record( if ($this->shouldRecord) { $this->entries[] = $entry; + + $this->ingestWhenOverBufferSize(); } return $entry; @@ -178,6 +185,8 @@ public function set( if ($this->shouldRecord) { $this->entries[] = $value; + + $this->ingestWhenOverBufferSize(); } return $value; @@ -190,6 +199,8 @@ public function lazy(callable $closure): self { if ($this->shouldRecord) { $this->lazy[] = $closure; + + $this->ingestWhenOverBufferSize(); } return $this; @@ -277,7 +288,7 @@ public function filter(callable $filter): self */ public function ingest(): int { - $this->rescue(fn () => $this->lazy->each(fn ($lazy) => $lazy())); + $this->resolveLazyEntries(); return $this->ignore(function () { $entries = $this->rescue(fn () => $this->entries->filter($this->shouldRecord(...))) ?? collect([]); @@ -327,6 +338,46 @@ public function wantsIngesting(): bool return $this->lazy->isNotEmpty() || $this->entries->isNotEmpty(); } + /** + * Start ingesting entires if over buffer size. + */ + protected function ingestWhenOverBufferSize(): void + { + // To prevent recursion, we track when we are already evaluating the + // buffer and resolving entries. When we are we may simply return + // and the continue execution. We set the value to false later. + if ($this->evaluatingBuffer) { + return; + } + + // TODO remove fallback when tagging v1 + $buffer = $this->app->make('config')->get('pulse.ingest.buffer') ?? 5_000; + + if (($this->entries->count() + $this->lazy->count()) > $buffer) { + $this->evaluatingBuffer = true; + + $this->resolveLazyEntries(); + } + + if ($this->entries->count() > $buffer) { + $this->evaluatingBuffer = true; + + $this->ingest(); + } + + $this->evaluatingBuffer = false; + } + + /** + * Resolve lazy entries. + */ + protected function resolveLazyEntries(): void + { + $this->rescue(fn () => $this->lazy->each(fn ($lazy) => $lazy())); + + $this->lazy = collect([]); + } + /** * Determine if the given entry should be recorded. */ diff --git a/src/Storage/DatabaseStorage.php b/src/Storage/DatabaseStorage.php index 4456f082..258c7485 100644 --- a/src/Storage/DatabaseStorage.php +++ b/src/Storage/DatabaseStorage.php @@ -98,7 +98,8 @@ public function store(Collection $items): void ->chunk($this->config->get('pulse.storage.database.chunk')) ->each(fn ($chunk) => $this->upsertAvg($chunk->all())); - $values + $this + ->collapseValues($values) ->chunk($this->config->get('pulse.storage.database.chunk')) ->each(fn ($chunk) => $this->connection() ->table('pulse_values') @@ -108,7 +109,7 @@ public function store(Collection $items): void ...($attributes = $entry->attributes()), 'key_hash' => md5($attributes['key']), ])->all() - : $chunk->map->attributes()->all(), + : $chunk->map->attributes()->all(), // @phpstan-ignore method.notFound ['type', 'key_hash'], ['timestamp', 'value'] ) @@ -351,6 +352,17 @@ protected function preaggregateAverages(Collection $entries): Collection ]); } + /** + * Collapse the given values. + * + * @param \Illuminate\Support\Collection $values + * @return \Illuminate\Support\Collection + */ + protected function collapseValues(Collection $values): Collection + { + return $values->reverse()->unique(fn (Value $value) => [$value->key, $value->type]); + } + /** * Pre-aggregate entries with a callback. * diff --git a/tests/Feature/PulseTest.php b/tests/Feature/PulseTest.php index 425f8625..1fd9f8e1 100644 --- a/tests/Feature/PulseTest.php +++ b/tests/Feature/PulseTest.php @@ -4,6 +4,7 @@ use Illuminate\Support\Collection; use Illuminate\Support\Facades\App; use Illuminate\Support\Facades\Auth; +use Illuminate\Support\Facades\Config; use Laravel\Pulse\Contracts\ResolvesUsers; use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Entry; @@ -164,3 +165,59 @@ public function fields(int|string|null $key): array 'extra' => '["123","456"]', ]); }); + +it('can limit the buffer size of entries', function () { + Config::set('pulse.ingest.buffer', 4); + + Pulse::record('type', 'key'); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::record('type', 'key'); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::record('type', 'key'); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::record('type', 'key'); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::record('type', 'key'); + expect(Pulse::wantsIngesting())->toBeFalse(); + + Pulse::set('type', 'key', 'value'); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::set('type', 'key', 'value'); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::set('type', 'key', 'value'); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::set('type', 'key', 'value'); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::set('type', 'key', 'value'); + expect(Pulse::wantsIngesting())->toBeFalse(); +}); + +it('resolves lazy entries when considering the buffer', function () { + Config::set('pulse.ingest.buffer', 4); + + Pulse::lazy(fn () => Pulse::record('type', 'key')); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::lazy(fn () => Pulse::set('type', 'key', 'value')); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::lazy(fn () => Pulse::record('type', 'key')); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::lazy(fn () => Pulse::set('type', 'key', 'value')); + expect(Pulse::wantsIngesting())->toBeTrue(); + Pulse::lazy(fn () => Pulse::record('type', 'key')); + expect(Pulse::wantsIngesting())->toBeFalse(); +}); + +it('rescues exceptions that occur while filtering', function () { + $handled = false; + Pulse::handleExceptionsUsing(function () use (&$handled) { + $handled = true; + }); + + Pulse::filter(function ($entry) { + throw new RuntimeException('Whoops!'); + }); + Pulse::record('type', 'key'); + Pulse::ingest(); + + expect($handled)->toBe(true); +}); diff --git a/tests/Feature/Storage/DatabaseStorageTest.php b/tests/Feature/Storage/DatabaseStorageTest.php index 0a9b1d48..184cb320 100644 --- a/tests/Feature/Storage/DatabaseStorageTest.php +++ b/tests/Feature/Storage/DatabaseStorageTest.php @@ -1,6 +1,7 @@ 6, ]); }); + +it('collapses values with the same key into a single upsert', function () { + $bindings = []; + DB::listen(function (QueryExecuted $event) use (&$bindings) { + $bindings = $event->bindings; + }); + + Pulse::set('read_counter', 'post:321', 123); + Pulse::set('read_counter', 'post:321', 234); + Pulse::set('read_counter', 'post:321', 345); + Pulse::ingest(); + + expect($bindings)->not->toContain(123); + expect($bindings)->not->toContain(234); + expect($bindings)->toContain('345'); + $values = Pulse::ignore(fn () => DB::table('pulse_values')->get()); + expect($values)->toHaveCount(1); + expect($values[0]->value)->toBe('345'); +});