Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an optional flush period #11

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"lerna": "^2.9.1",
"mocha": "^5.0.5",
"most": "^1.7.3",
"@most/create": "^2.0.1",
"nyc": "^11.6.0",
"typescript": "^2.8.1"
},
Expand Down
15 changes: 14 additions & 1 deletion packages/most-buffer/src/buffer-sink.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
class BufferSink {
constructor(count, sink) {
constructor(count, sink, flushAfter) {
this.active = true;
this.sink = sink;
this.flushAfter = flushAfter;
this.count = count;
this.buffer = count === undefined ? [] : new Array(count);
this.length = 0;
Expand All @@ -15,6 +16,18 @@ class BufferSink {
// Buffering the new value
this.buffer[this.length++] = value;

if (this.flushAfter) {
// make sure we flush this if some time has passed and count is not reached
this.flushTimeout && clearTimeout(this.flushTimeout);

setTimeout(() => {
if (this.length > 0) {
this.sink.event(time, value);
this.length = 0;
}
}, this.flushAfter);
}

// If the buffer has a limit and is full, let's emit it
if (this.length === this.count) {
const value = this.buffer;
Expand Down
4 changes: 2 additions & 2 deletions packages/most-buffer/src/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const BufferSink = require('./buffer-sink.js');

function buffer(count = undefined) {
function buffer(count = undefined, flushAfter) {
return (stream) => new stream.constructor({
run: (sink, scheduler) => stream.source.run(new BufferSink(count, sink), scheduler)
run: (sink, scheduler) => stream.source.run(new BufferSink(count, sink, flushAfter), scheduler)
});
}

Expand Down
22 changes: 22 additions & 0 deletions packages/most-buffer/test/test-buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const { expect } = require('chai');
const buffer = require('../src');
const BufferSink = require('../src/buffer-sink');
const most = require('most');
const { create } = require('@most/create');

describe('buffer', function() {
it('can group stream events 10 by 10', function() {
Expand Down Expand Up @@ -54,4 +55,25 @@ describe('buffer', function() {
sink.end(time + 1);
sink.event(time + 2);
});
it('should flush after the given flush period even if the buffer is not full', function(done) {
const stream$ = create((add, end) => {
setTimeout(() => add('data'), 10);
setTimeout(() => add('data'), 20);
setTimeout(() => add('data'), 50);
setTimeout(() => {
add('data');
end();
}, 100);
});

return stream$
.take(10)
.thru(buffer(5, 700))
.subscribe({
next: (data) => {
expect(data.length < 5).to.be.true;
},
complete: () => done()
});
});
});