210 lines
6.0 KiB
JavaScript
210 lines
6.0 KiB
JavaScript
|
'use strict';
|
||
|
|
||
|
var STREAM = require('stream'),
|
||
|
UTIL = require('util'),
|
||
|
StringDecoder = require('string_decoder').StringDecoder;
|
||
|
|
||
|
function MemoryReadableStream(data, options) {
|
||
|
if (!(this instanceof MemoryReadableStream))
|
||
|
return new MemoryReadableStream(data, options);
|
||
|
MemoryReadableStream.super_.call(this, options);
|
||
|
this.init(data, options);
|
||
|
}
|
||
|
UTIL.inherits(MemoryReadableStream, STREAM.Readable);
|
||
|
|
||
|
|
||
|
function MemoryWritableStream(data, options) {
|
||
|
if (!(this instanceof MemoryWritableStream))
|
||
|
return new MemoryWritableStream(data, options);
|
||
|
MemoryWritableStream.super_.call(this, options);
|
||
|
this.init(data, options);
|
||
|
}
|
||
|
UTIL.inherits(MemoryWritableStream, STREAM.Writable);
|
||
|
|
||
|
|
||
|
function MemoryDuplexStream(data, options) {
|
||
|
if (!(this instanceof MemoryDuplexStream))
|
||
|
return new MemoryDuplexStream(data, options);
|
||
|
MemoryDuplexStream.super_.call(this, options);
|
||
|
this.init(data, options);
|
||
|
}
|
||
|
UTIL.inherits(MemoryDuplexStream, STREAM.Duplex);
|
||
|
|
||
|
|
||
|
MemoryReadableStream.prototype.init =
|
||
|
MemoryWritableStream.prototype.init =
|
||
|
MemoryDuplexStream.prototype.init = function init (data, options) {
|
||
|
var self = this;
|
||
|
this.queue = [];
|
||
|
|
||
|
if (data) {
|
||
|
if (!Array.isArray(data)) {
|
||
|
data = [ data ];
|
||
|
}
|
||
|
|
||
|
data.forEach(function (chunk) {
|
||
|
if (!(chunk instanceof Buffer)) {
|
||
|
chunk = new Buffer(chunk);
|
||
|
}
|
||
|
self.queue.push(chunk);
|
||
|
});
|
||
|
|
||
|
}
|
||
|
|
||
|
options = options || {};
|
||
|
|
||
|
this.maxbufsize = options.hasOwnProperty('maxbufsize') ? options.maxbufsize
|
||
|
: null;
|
||
|
this.bufoverflow = options.hasOwnProperty('bufoverflow') ? options.bufoverflow
|
||
|
: null;
|
||
|
this.frequence = options.hasOwnProperty('frequence') ? options.frequence
|
||
|
: null;
|
||
|
};
|
||
|
|
||
|
function MemoryStream (data, options) {
|
||
|
if (!(this instanceof MemoryStream))
|
||
|
return new MemoryStream(data, options);
|
||
|
|
||
|
options = options || {};
|
||
|
|
||
|
var readable = options.hasOwnProperty('readable') ? options.readable : true,
|
||
|
writable = options.hasOwnProperty('writable') ? options.writable : true;
|
||
|
|
||
|
if (readable && writable) {
|
||
|
return new MemoryDuplexStream(data, options);
|
||
|
} else if (readable) {
|
||
|
return new MemoryReadableStream(data, options);
|
||
|
} else if (writable) {
|
||
|
return new MemoryWritableStream(data, options);
|
||
|
} else {
|
||
|
throw new Error("Unknown stream type Readable, Writable or Duplex ");
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
MemoryStream.createReadStream = function (data, options) {
|
||
|
options = options || {};
|
||
|
options.readable = true;
|
||
|
options.writable = false;
|
||
|
|
||
|
return new MemoryStream(data, options);
|
||
|
};
|
||
|
|
||
|
|
||
|
MemoryStream.createWriteStream = function (data, options) {
|
||
|
options = options || {};
|
||
|
options.readable = false;
|
||
|
options.writable = true;
|
||
|
|
||
|
return new MemoryStream(data, options);
|
||
|
};
|
||
|
|
||
|
|
||
|
MemoryReadableStream.prototype._read =
|
||
|
MemoryDuplexStream.prototype._read = function _read (n) {
|
||
|
var self = this,
|
||
|
frequence = self.frequence || 0,
|
||
|
wait_data = this instanceof STREAM.Duplex && ! this._writableState.finished ? true : false;
|
||
|
if ( ! this.queue.length && ! wait_data) {
|
||
|
this.push(null);// finish stream
|
||
|
} else if (this.queue.length) {
|
||
|
setTimeout(function () {
|
||
|
if (self.queue.length) {
|
||
|
var chunk = self.queue.shift();
|
||
|
if (chunk && ! self._readableState.ended) {
|
||
|
if ( ! self.push(chunk) ) {
|
||
|
self.queue.unshift(chunk);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}, frequence);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
|
||
|
MemoryWritableStream.prototype._write =
|
||
|
MemoryDuplexStream.prototype._write = function _write (chunk, encoding, cb) {
|
||
|
var decoder = null;
|
||
|
try {
|
||
|
decoder = this.decodeStrings && encoding ? new StringDecoder(encoding) : null;
|
||
|
} catch (err){
|
||
|
return cb(err);
|
||
|
}
|
||
|
|
||
|
var decoded_chunk = decoder ? decoder.write(chunk) : chunk,
|
||
|
queue_size = this._getQueueSize(),
|
||
|
chunk_size = decoded_chunk.length;
|
||
|
|
||
|
if (this.maxbufsize && (queue_size + chunk_size) > this.maxbufsize ) {
|
||
|
if (this.bufoverflow) {
|
||
|
return cb("Buffer overflowed (" + this.bufoverflow + "/" + queue_size + ")");
|
||
|
} else {
|
||
|
return cb();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (this instanceof STREAM.Duplex) {
|
||
|
while (this.queue.length) {
|
||
|
this.push(this.queue.shift());
|
||
|
}
|
||
|
this.push(decoded_chunk);
|
||
|
} else {
|
||
|
this.queue.push(decoded_chunk);
|
||
|
}
|
||
|
cb();
|
||
|
};
|
||
|
|
||
|
|
||
|
MemoryDuplexStream.prototype.end = function (chunk, encoding, cb) {
|
||
|
var self = this;
|
||
|
return MemoryDuplexStream.super_.prototype.end.call(this, chunk, encoding, function () {
|
||
|
self.push(null);//finish readble stream too
|
||
|
if (cb) cb();
|
||
|
});
|
||
|
};
|
||
|
|
||
|
|
||
|
MemoryReadableStream.prototype._getQueueSize =
|
||
|
MemoryWritableStream.prototype._getQueueSize =
|
||
|
MemoryDuplexStream.prototype._getQueueSize = function () {
|
||
|
var queuesize = 0, i;
|
||
|
for (i = 0; i < this.queue.length; i++) {
|
||
|
queuesize += Array.isArray(this.queue[i]) ? this.queue[i][0].length
|
||
|
: this.queue[i].length;
|
||
|
}
|
||
|
return queuesize;
|
||
|
};
|
||
|
|
||
|
|
||
|
MemoryWritableStream.prototype.toString =
|
||
|
MemoryDuplexStream.prototype.toString =
|
||
|
MemoryReadableStream.prototype.toString =
|
||
|
MemoryWritableStream.prototype.getAll =
|
||
|
MemoryDuplexStream.prototype.getAll =
|
||
|
MemoryReadableStream.prototype.getAll = function () {
|
||
|
var self = this,
|
||
|
ret = '';
|
||
|
this.queue.forEach(function (data) {
|
||
|
ret += data;
|
||
|
});
|
||
|
return ret;
|
||
|
};
|
||
|
|
||
|
|
||
|
MemoryWritableStream.prototype.toBuffer =
|
||
|
MemoryDuplexStream.prototype.toBuffer =
|
||
|
MemoryReadableStream.prototype.toBuffer = function () {
|
||
|
var buffer = new Buffer(this._getQueueSize()),
|
||
|
currentOffset = 0;
|
||
|
|
||
|
this.queue.forEach(function (data) {
|
||
|
var data_buffer = data instanceof Buffer ? data : new Buffer(data);
|
||
|
data_buffer.copy(buffer, currentOffset);
|
||
|
currentOffset += data.length;
|
||
|
});
|
||
|
return buffer;
|
||
|
};
|
||
|
|
||
|
|
||
|
module.exports = MemoryStream;
|