Skip to content

Commit

Permalink
[1.x] Introduce buffer size (#250)
Browse files Browse the repository at this point in the history
* Introduce buffer size

* Fix code styling

* Static analysis

* Collapse values

* Fix code styling

* postpone filtering until ingesting

* Types

* Make buffer env controlled

* Update DatabaseStorage.php

* Rename variable

* Fix code styling

---------

Co-authored-by: timacdonald <[email protected]>
Co-authored-by: Taylor Otwell <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2023
1 parent b0f368b commit a24b5ae
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 3 deletions.
2 changes: 2 additions & 0 deletions config/pulse.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
'ingest' => [
'driver' => env('PULSE_INGEST_DRIVER', 'storage'),

'buffer' => env('PULSE_INGEST_BUFFER', 5_000),

'trim' => [
'lottery' => [1, 1_000],
'keep' => '7 days',
Expand Down
53 changes: 52 additions & 1 deletion src/Pulse.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class Pulse
*/
protected $css = [__DIR__.'/../dist/pulse.css'];

/**
* Indicates that Pulse is currently evaluating the buffer.
*/
protected bool $evaluatingBuffer = false;

/**
* Create a new Pulse instance.
*/
Expand Down Expand Up @@ -153,6 +158,8 @@ public function record(

if ($this->shouldRecord) {
$this->entries[] = $entry;

$this->ingestWhenOverBufferSize();
}

return $entry;
Expand All @@ -178,6 +185,8 @@ public function set(

if ($this->shouldRecord) {
$this->entries[] = $value;

$this->ingestWhenOverBufferSize();
}

return $value;
Expand All @@ -190,6 +199,8 @@ public function lazy(callable $closure): self
{
if ($this->shouldRecord) {
$this->lazy[] = $closure;

$this->ingestWhenOverBufferSize();
}

return $this;
Expand Down Expand Up @@ -277,7 +288,7 @@ public function filter(callable $filter): self
*/
public function ingest(): int
{
$this->rescue(fn () => $this->lazy->each(fn ($lazy) => $lazy()));
$this->resolveLazyEntries();

return $this->ignore(function () {
$entries = $this->rescue(fn () => $this->entries->filter($this->shouldRecord(...))) ?? collect([]);
Expand Down Expand Up @@ -327,6 +338,46 @@ public function wantsIngesting(): bool
return $this->lazy->isNotEmpty() || $this->entries->isNotEmpty();
}

/**
* Start ingesting entires if over buffer size.
*/
protected function ingestWhenOverBufferSize(): void
{
// To prevent recursion, we track when we are already evaluating the
// buffer and resolving entries. When we are we may simply return
// and the continue execution. We set the value to false later.
if ($this->evaluatingBuffer) {
return;
}

// TODO remove fallback when tagging v1
$buffer = $this->app->make('config')->get('pulse.ingest.buffer') ?? 5_000;

if (($this->entries->count() + $this->lazy->count()) > $buffer) {
$this->evaluatingBuffer = true;

$this->resolveLazyEntries();
}

if ($this->entries->count() > $buffer) {
$this->evaluatingBuffer = true;

$this->ingest();
}

$this->evaluatingBuffer = false;
}

/**
* Resolve lazy entries.
*/
protected function resolveLazyEntries(): void
{
$this->rescue(fn () => $this->lazy->each(fn ($lazy) => $lazy()));

$this->lazy = collect([]);
}

/**
* Determine if the given entry should be recorded.
*/
Expand Down
16 changes: 14 additions & 2 deletions src/Storage/DatabaseStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public function store(Collection $items): void
->chunk($this->config->get('pulse.storage.database.chunk'))
->each(fn ($chunk) => $this->upsertAvg($chunk->all()));

$values
$this
->collapseValues($values)
->chunk($this->config->get('pulse.storage.database.chunk'))
->each(fn ($chunk) => $this->connection()
->table('pulse_values')
Expand All @@ -108,7 +109,7 @@ public function store(Collection $items): void
...($attributes = $entry->attributes()),
'key_hash' => md5($attributes['key']),
])->all()
: $chunk->map->attributes()->all(),
: $chunk->map->attributes()->all(), // @phpstan-ignore method.notFound
['type', 'key_hash'],
['timestamp', 'value']
)
Expand Down Expand Up @@ -351,6 +352,17 @@ protected function preaggregateAverages(Collection $entries): Collection
]);
}

/**
* Collapse the given values.
*
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Value> $values
* @return \Illuminate\Support\Collection<int, \Laravel\Pulse\Value>
*/
protected function collapseValues(Collection $values): Collection
{
return $values->reverse()->unique(fn (Value $value) => [$value->key, $value->type]);
}

/**
* Pre-aggregate entries with a callback.
*
Expand Down
57 changes: 57 additions & 0 deletions tests/Feature/PulseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\App;
use Illuminate\Support\Facades\Auth;
use Illuminate\Support\Facades\Config;
use Laravel\Pulse\Contracts\ResolvesUsers;
use Laravel\Pulse\Contracts\Storage;
use Laravel\Pulse\Entry;
Expand Down Expand Up @@ -164,3 +165,59 @@ public function fields(int|string|null $key): array
'extra' => '["123","456"]',
]);
});

it('can limit the buffer size of entries', function () {
Config::set('pulse.ingest.buffer', 4);

Pulse::record('type', 'key');
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::record('type', 'key');
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::record('type', 'key');
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::record('type', 'key');
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::record('type', 'key');
expect(Pulse::wantsIngesting())->toBeFalse();

Pulse::set('type', 'key', 'value');
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::set('type', 'key', 'value');
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::set('type', 'key', 'value');
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::set('type', 'key', 'value');
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::set('type', 'key', 'value');
expect(Pulse::wantsIngesting())->toBeFalse();
});

it('resolves lazy entries when considering the buffer', function () {
Config::set('pulse.ingest.buffer', 4);

Pulse::lazy(fn () => Pulse::record('type', 'key'));
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::lazy(fn () => Pulse::set('type', 'key', 'value'));
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::lazy(fn () => Pulse::record('type', 'key'));
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::lazy(fn () => Pulse::set('type', 'key', 'value'));
expect(Pulse::wantsIngesting())->toBeTrue();
Pulse::lazy(fn () => Pulse::record('type', 'key'));
expect(Pulse::wantsIngesting())->toBeFalse();
});

it('rescues exceptions that occur while filtering', function () {
$handled = false;
Pulse::handleExceptionsUsing(function () use (&$handled) {
$handled = true;
});

Pulse::filter(function ($entry) {
throw new RuntimeException('Whoops!');
});
Pulse::record('type', 'key');
Pulse::ingest();

expect($handled)->toBe(true);
});
20 changes: 20 additions & 0 deletions tests/Feature/Storage/DatabaseStorageTest.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php

use Carbon\CarbonInterval;
use Illuminate\Database\Events\QueryExecuted;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\Config;
use Illuminate\Support\Facades\DB;
Expand Down Expand Up @@ -424,3 +425,22 @@
'cache_miss' => 6,
]);
});

it('collapses values with the same key into a single upsert', function () {
$bindings = [];
DB::listen(function (QueryExecuted $event) use (&$bindings) {
$bindings = $event->bindings;
});

Pulse::set('read_counter', 'post:321', 123);
Pulse::set('read_counter', 'post:321', 234);
Pulse::set('read_counter', 'post:321', 345);
Pulse::ingest();

expect($bindings)->not->toContain(123);
expect($bindings)->not->toContain(234);
expect($bindings)->toContain('345');
$values = Pulse::ignore(fn () => DB::table('pulse_values')->get());
expect($values)->toHaveCount(1);
expect($values[0]->value)->toBe('345');
});

0 comments on commit a24b5ae

Please sign in to comment.