-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
93 lines (81 loc) · 2.07 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
'use strict'
const setImmediate = require('setimmediate2').setImmediate
const linkedList = require('./lib/linked-list.js')
function noop () {}
function once (fn) {
let called = false
if (fn.length < 2) {
return function (err) {
if (called === true) return
called = true
fn(err)
}
}
return function (err, data) {
if (called === true) return
called = true
fn(err, data)
}
}
function _mapEach (items, process, concurrency, next) {
next = once(next)
const iter = items[Symbol.iterator]()
const result = next.length > 1 ? linkedList() : undefined
let total = 0
let done = 0
let finish = concurrency > 0 ? setImmediate.bind(null, step) : noop
function step () {
if (concurrency > 0 && done + concurrency <= total) {
return
}
const item = iter.next()
if (item.done) {
finish = function () {
if (done === total) {
next(null, result)
}
}
finish()
return
}
total += 1
const node = (result !== undefined) ? result.add() : undefined
process(item.value, function (err, value) {
if (err) {
next(err)
}
if (node !== undefined) {
node.value = value
}
done += 1
finish()
})
step()
}
setImmediate(step)
}
function mapEach (items, process, concurrency, next) {
if (typeof next === 'number') {
return mapEach(items, process, next, concurrency)
}
if (typeof concurrency !== 'number') {
return mapEach(items, process, 0, concurrency)
}
if (typeof process !== 'function') {
throw new Error('Process needs to be a function!')
}
if (!items || typeof items[Symbol.iterator] !== 'function') {
throw new Error('Items needs to be a collection')
}
concurrency = concurrency | 0
if (typeof next !== 'function') {
return new Promise(function (resolve, reject) {
_mapEach(items, process, concurrency, function (err, data) {
if (err) return reject(err)
resolve(data)
})
})
}
_mapEach(items, process, concurrency, next)
}
exports.mapEach = mapEach