-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
40 lines (32 loc) · 802 Bytes
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
const assert = require('assert')
function noop () {}
function isFunction (value) {
return typeof value === 'function'
}
function isStream (value) {
return isFunction(value.write) && isFunction(value.end)
}
async function pipeStream (iterator, stream) {
for await (let chunk of iterator) {
stream.write(chunk)
}
stream.end()
}
function pipe (source, target) {
assert(
isFunction(source[Symbol.asyncIterator]),
'source should be an async iterable'
)
if (isStream(target)) {
pipeStream(source, target).catch(noop)
return target
} else if (isFunction(target)) {
return target(source)
} else {
throw new Error('Unrecognized target type')
}
}
function pipeThrough (source, ...rest) {
return rest.reduce(pipe, source)
}
module.exports = pipeThrough