-
Notifications
You must be signed in to change notification settings - Fork 7
/
index.js
132 lines (117 loc) · 4.09 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
var stream = require('stream')
, util = require('util')
;
var BufferedStreamToBase64 = function (limit) {
if (typeof limit === 'undefined') {
limit = Infinity;
}
this.limit = limit;
// Since oddBytes saved can be up to 2 bytes, assume worst case and assume when there are no chunks there are still two bytes saved in oddBytes
this.size = 2;
this.oddBytes;
this.chunks = [];
this.writable = true;
this.readable = true;
this.paused = false;
this.sentOdd = false;
}
util.inherits(BufferedStreamToBase64, stream.Stream);
BufferedStreamToBase64.prototype.pipe = function (dest, options) {
var self = this
if (self.resume) self.resume();
stream.Stream.prototype.pipe.call(self, dest, options)
//just incase you are piping to two streams, do not emit data twice.
//note: you can pipe twice, but you need to pipe both streams in the same tick.
//(this is normal for streams)
if(this.piped)
return dest
process.nextTick(function () {
self.emitAllBufferedChunks();
})
this.piped = true
return dest
}
BufferedStreamToBase64.prototype.write = function (chunk) {
if (!this.chunks.length && !this.paused) {
this.emit('data', this.getBase64SizedChunk(chunk).toString('base64'));
return;
}
this.chunks.push(chunk);
this.size += chunk.length;
if (this.limit < this.size) {
this.pause();
}
}
BufferedStreamToBase64.prototype.end = function () {
if(!this.chunks.length && !this.paused) {
// If there are any remaining data not encoded, we need to encode it and send it now
if(Buffer.isBuffer(this.oddBytes) && !this.sentOdd) {
this.emit('data', this.oddBytes.toString('base64'));
this.sentOdd = true;
}
// In case a pause is triggered after oddBytes been emitted, we can't end just yet
if(this.paused) {
this.ended = true;
} else {
this.emit('end');
}
} else {
this.ended = true;
}
}
if (!stream.Stream.prototype.pause) {
BufferedStreamToBase64.prototype.pause = function() {
this.paused = true;
this.emit('pause');
};
}
if (!stream.Stream.prototype.resume) {
BufferedStreamToBase64.prototype.resume = function() {
this.paused = false;
this.emitAllBufferedChunks();
this.emit('resume');
};
}
BufferedStreamToBase64.prototype.getBase64SizedChunk = function (chunk) {
// Ensure we are dealing with a buffer
if(!Buffer.isBuffer(chunk)) {
chunk = new Buffer(chunk);
}
// Add any oddBytes left from last write
if(Buffer.isBuffer(this.oddBytes)) {
// Calculate total length to optimize for performance (avoid unnecessary loop or array)
chunk = Buffer.concat([this.oddBytes, chunk], this.oddBytes.length + chunk.length);
this.oddBytes = undefined;
}
// Base64 encode three bytes at a time, thus, we want to calculate how many bytes to push to next write
var remaining = chunk.length % 3;
// Split data into one part which is possible to Base64 encode (without any empty encoding) and one part we wait to encode until next write
if(remaining) {
this.oddBytes = chunk.slice(chunk.length - remaining);
chunk = chunk.slice(0, chunk.length - remaining);
}
return chunk;
};
BufferedStreamToBase64.prototype.emitAllBufferedChunks = function () {
var self = this
// Loop through all chunks and try to emit the data - return true if all chunks have been processed, otherwise false (if paused somewhere along the processing)
, emitChunk = function () {
if(self.paused) return false;
var chunk = self.chunks.shift();
if(!chunk) return true;
self.size -= chunk.length;
self.emit('data', self.getBase64SizedChunk(chunk).toString('base64'));
return emitChunk();
};
if(emitChunk() && self.ended) {
// If there are any remaining data not encoded, we need to encode it and send it now
if(Buffer.isBuffer(this.oddBytes)&& !this.sentOdd) {
this.emit('data', this.oddBytes.toString('base64'));
this.sentOdd =true;
// In case a pause is triggered after oddBytes been emitted, end here
if(!self.paused) return;
}
self.emit('end');
}
};
exports.BufferedStreamToBase64 = BufferedStreamToBase64;