続・CountStream
以前作成したCountStreamを再度書き直してみた。
今度のやつはprocess.nextTickで一行ずつdataを発行するようになってます。
counter_stream.js
var stream = require('stream'), util = require('util'); function CounterStream() { stream.Stream.call(this); this.readable = true; this.writable = true; this.buffer = []; this.encoding = ''; this.paused = false; this.count = 0; this.ended = false; var that = this; this.once('pipe', function(src) { src.once('end', function() { that.ended = true; }); }); } util.inherits(CounterStream, stream.Stream); /***/ CounterStream.prototype.destroy = function() { var that = this; this.readable = false; this.writable = false; this.once('end', function() { that.emit('close'); }); this.emit('end'); }; /***/ // data // end // error // close CounterStream.prototype.setEncoding = function(encoding) { this.encoding = encoding; }; CounterStream.prototype.pause = function() { this.paused = true; }; CounterStream.prototype.resume = function() { this.paused = false; }; /***/ // drain // error err // close // pipe src CounterStream.prototype.write = function(data, encoding) { var that = this, str, lines; if (!this.writable) { this.emit('error', new Error('CounterStream not writable')); return false; } str = (Buffer.isBuffer(data)) ? (new Buffer(data)).toString(encoding || this.encoding) : data; lines = str.split(/\r?\n/); this.buffer.push( (this.buffer.pop() || '') + lines.shift()); this.buffer = this.buffer.concat(lines); process.nextTick(function() { that.addLineCount(); }); return false; }; CounterStream.prototype.end = function(data, encoding) { if (data) { this.write(data, encoding); } this.destroy(); }; CounterStream.prototype.destroySoon = function() { this.destroy(); }; /***/ CounterStream.prototype.addLineCount = function() { var that = this; if (!this.ended) { process.nextTick(function() { that.addLineCount(); }); } if (this.paused) { return; } if (this.buffer.length > 0) { ++this.count; this.emit('data', this.count + ': ' + this.buffer.shift() + '\n'); } else { this.emit('drain'); } }; module.exports = { create: function() { return new CounterStream; } };
index.js
var fs = require('fs'), counterStream = require('./counter_stream'), c = counterStream.create(); fs.createReadStream('./counter_stream.js').pipe(c).pipe(process.stdout);
for文でとりあえず受け取ったやつは全部処理しちゃうのがいいのか、process.nextTickで分散させた方がいいのか……
あとdataを発行してるけど、pipeでsrc受け取ってwriteするのとは違うのかなあ?どういう風に使い分けるんだろう?