Skip to content

Commit

Permalink
Simplify slice (#242)
Browse files Browse the repository at this point in the history
* Simplify slice fusion, add experimental map-slice commutation

* Counting down and comparing to zero turns out to be slightly faster
  • Loading branch information
briancavalier committed Apr 27, 2016
1 parent 19b29a2 commit 1f984fe
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 56 deletions.
28 changes: 18 additions & 10 deletions lib/combinator/slice.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var Stream = require('../Stream');
var Sink = require('../sink/Pipe');
var core = require('../source/core');
var dispose = require('../disposable/dispose');
var Map = require('../fusion/Map');

exports.take = take;
exports.skip = skip;
Expand Down Expand Up @@ -44,28 +45,35 @@ function slice(start, end, stream) {
}

function sliceSource(start, end, source) {
if(source instanceof Slice) {
var s = start + source.skip;
var e = Math.min(s + end, source.skip + source.take);
return new Slice(s, e, source.source);
}
return new Slice(start, end, source);
return source instanceof Map ? commuteMapSlice(start, end, source)
: source instanceof Slice ? fuseSlice(start, end, source)
: new Slice(start, end, source);
}

function commuteMapSlice(start, end, source) {
return Map.create(source.f, sliceSource(start, end, source.source))
}

function fuseSlice(start, end, source) {
start += source.min;
end = Math.min(end + source.min, source.max);
return new Slice(start, end, source.source);
}

function Slice(min, max, source) {
this.skip = min;
this.take = max - min;
this.source = source;
this.min = min;
this.max = max;
}

Slice.prototype.run = function(sink, scheduler) {
return new SliceSink(this.skip, this.take, this.source, sink, scheduler);
return new SliceSink(this.min, this.max - this.min, this.source, sink, scheduler);
};

function SliceSink(skip, take, source, sink, scheduler) {
this.sink = sink;
this.skip = skip;
this.take = take;
this.sink = sink;
this.disposable = dispose.once(source.run(this, scheduler));
}

Expand Down
7 changes: 7 additions & 0 deletions test/helper/stream-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@ var expect = require('buster').expect;
var reduce = require('../../lib/combinator/accumulate').reduce;

exports.assertSame = assertSame;
exports.expectArray = expectArray;

function assertSame(s1, s2) {
return Promise.all([toArray(s1), toArray(s2)]).then(arrayEquals);
}

function expectArray(array, s) {
return toArray(s).then(function(result) {
expect(result).toEqual(array);
});
}

function toArray(s) {
return reduce(function(a, x) {
a.push(x);
Expand Down
4 changes: 3 additions & 1 deletion test/perf/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
"flatMap": "node ./flatMap.js",
"merge": "node ./merge.js",
"merge-nested": "node ./merge-nested.js",
"sample": "node ./sample.js",
"scan": "node ./scan.js",
"slice": "node ./slice.js",
"skipRepeats": "node ./skipRepeats.js",
"zip": "node ./zip.js",
"start": "npm run filter-map-reduce && npm run flatMap && npm run concatMap && npm run merge && npm run zip && npm run scan && npm run skipRepeats"
"start": "npm run filter-map-reduce && npm run flatMap && npm run concatMap && npm run merge && npm run zip && npm run sample && npm run scan && npm run slice && npm run skipRepeats"
},
"dependencies": {
"@reactivex/rxjs": "5.0.0-beta.4",
Expand Down
71 changes: 71 additions & 0 deletions test/perf/slice.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
var Benchmark = require('benchmark');
var most = require('../../most');
var rx = require('rx');
var rxjs = require('@reactivex/rxjs')
var kefir = require('kefir');
var bacon = require('baconjs');
var lodash = require('lodash');
var highland = require('highland');

var runners = require('./runners');
var kefirFromArray = runners.kefirFromArray;

// Create a stream from an Array of n integers
// filter out odds, map remaining evens by adding 1, then reduce by summing
var n = runners.getIntArg(1000000);
var a = new Array(n);
for(var i = 0; i< a.length; ++i) {
a[i] = i;
}

var suite = Benchmark.Suite('skip(n/4) -> take(n/2) ' + n + ' integers');
var options = {
defer: true,
onError: function(e) {
e.currentTarget.failure = e.error;
}
};

var s = n * 0.25;
var t = n * 0.5;

suite
.add('most', function(deferred) {
runners.runMost(deferred, most.from(a).skip(s).take(t).reduce(sum, 0));
}, options)
.add('rx 4', function(deferred) {
runners.runRx(deferred, rx.Observable.fromArray(a).skip(s).take(t).reduce(sum, 0));
}, options)
.add('rx 5', function(deferred) {
runners.runRx5(deferred,
rxjs.Observable.from(a).skip(s).take(t).reduce(sum, 0));
}, options)
.add('kefir', function(deferred) {
runners.runKefir(deferred, kefirFromArray(a).skip(s).take(t).scan(sum, 0).last());
}, options)
.add('bacon', function(deferred) {
runners.runBacon(deferred, bacon.fromArray(a).skip(s).take(t).reduce(0, sum));
}, options)
.add('highland', function(deferred) {
runners.runHighland(deferred, highland(a).drop(s).take(t).reduce(0, sum));
}, options)
.add('lodash', function() {
return lodash(a).slice(s).slice(0, t).reduce(sum, 0);
})
.add('Array', function() {
return a.slice(s).slice(0, t).reduce(sum, 0);
});

runners.runSuite(suite);

function add1(x) {
return x + 1;
}

function even(x) {
return x % 2 === 0;
}

function sum(x, y) {
return x + y;
}
75 changes: 30 additions & 45 deletions test/slice-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,79 @@ require('buster').spec.expose();
var expect = require('buster').expect;

var slice = require('../lib/combinator/slice');
var map = require('../lib/combinator/transform').map;
var Map = require('../lib/fusion/Map');
var fromArray = require('../lib/source/fromArray').fromArray;
var reduce = require('../lib/combinator/accumulate').reduce;

var expectArray = require('./helper/stream-helper').expectArray;

describe('slice', function() {
describe('should narrow', function() {
it('when second slice is smaller', function() {
describe('fusion', function() {
it('should narrow when second slice is smaller', function() {
var s = slice.slice(1, 5, slice.slice(1, 10, fromArray([1])));
expect(s.source.skip).toBe(2);
expect(s.source.take).toBe(5);
expect(s.source.min).toBe(2);
expect(s.source.max).toBe(6);
});

it('when second slice is larger', function() {
it('should narrow when second slice is larger', function() {
var s = slice.slice(1, 10, slice.slice(1, 5, fromArray([1])));
expect(s.source.skip).toBe(2);
expect(s.source.take).toBe(3);
expect(s.source.min).toBe(2);
expect(s.source.max).toBe(5);
});

it('should commute map', function() {
function id(x) {
return x;
}
var s = slice.slice(0, 3, map(id, fromArray([1, 2, 3, 4])));

expect(s.source instanceof Map).toBe(true);
expect(s.source.f).toBe(id);
return expectArray([1, 2, 3], s);
});
});

it('should retain only sliced range', function () {
var a = [1, 2, 3, 4, 5, 6, 7, 8, 9];
var s = slice.slice(2, a.length-2, fromArray(a));

return reduce(function (a, x) {
return a.concat(x);
}, [], s)
.then(function (result) {
expect(result).toEqual(a.slice(2, a.length-2))
});
return expectArray(a.slice(2, a.length-2), s);
});
});

describe('take', function() {
it('should take first n elements', function () {
var s = slice.take(2, fromArray([1,1,1,1,1,1,1]));

return reduce(function (count) {
return count + 1;
}, 0, s)
.then(function (count) {
expect(count).toBe(2);
});
var s = slice.take(2, fromArray([1 ,1, 1, 1, 1, 1, 1]));
return expectArray([1, 1], s);
});
});

describe('skip', function() {
it('should skip first n elements', function () {
var a = [1, 1, 1, 1, 1, 1, 1];
var s = slice.skip(2, fromArray(a));

return reduce(function (count) {
return count + 1;
}, 0, s)
.then(function (count) {
expect(count).toBe(a.length - 2);
});
return expectArray(a.slice(2), s)
});
});

describe('takeWhile', function() {
it('should take elements until condition becomes false', function() {
var a = [0,1,2,3,4,5,6,7,8,9];
var a = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
var s = slice.takeWhile(function(x) {
return x < 5;
}, fromArray(a));

return reduce(function(count, x) {
expect(x).toBeLessThan(5);
return count + 1;
}, 0, s)
.then(function(count) {
expect(count).toBe(5);
});
return expectArray([0, 1, 2, 3, 4], s);
});
});

describe('skipWhile', function() {
it('should skip elements until condition becomes false', function() {
var a = [0,1,2,3,4,5,6,7,8,9];
var a = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
var s = slice.skipWhile(function(x) {
return x < 5;
}, fromArray(a));

return reduce(function(count, x) {
expect(x >= 5).toBeTrue();
return count + 1;
}, 0, s)
.then(function(count) {
expect(count).toBe(5);
});
return expectArray([5, 6, 7, 8, 9], s)
});
});

0 comments on commit 1f984fe

Please sign in to comment.