続・CountStream

以前作成したCountStreamを再度書き直してみた。
今度のやつはprocess.nextTickで一行ずつdataを発行するようになってます。

counter_stream.js
var stream = require('stream'),
    util = require('util');

function CounterStream() {
  stream.Stream.call(this);

  this.readable = true;
  this.writable = true;

  this.buffer = [];
  this.encoding = '';
  this.paused = false;

  this.count = 0;

  this.ended = false;
  var that = this;
  this.once('pipe', function(src) {
    src.once('end', function() {
      that.ended = true;
    });
  });
}

util.inherits(CounterStream, stream.Stream);

/***/

CounterStream.prototype.destroy = function() {
  var that = this;

  this.readable = false;
  this.writable = false;

  this.once('end', function() {
    that.emit('close');
  });
  this.emit('end');
};

/***/

// data
// end
// error
// close

CounterStream.prototype.setEncoding = function(encoding) {
  this.encoding = encoding;
};

CounterStream.prototype.pause = function() {
  this.paused = true;
};

CounterStream.prototype.resume = function() {
  this.paused = false;
};

/***/

// drain
// error err
// close
// pipe src

CounterStream.prototype.write = function(data, encoding) {
  var that = this,
      str, lines;

  if (!this.writable) {
    this.emit('error', new Error('CounterStream not writable'));
    return false;
  }

  str = (Buffer.isBuffer(data)) ?
      (new Buffer(data)).toString(encoding || this.encoding) : data;

  lines = str.split(/\r?\n/);
  this.buffer.push(
      (this.buffer.pop() || '') + lines.shift());
  this.buffer = this.buffer.concat(lines);

  process.nextTick(function() {
    that.addLineCount();
  });

  return false;
};

CounterStream.prototype.end = function(data, encoding) {
  if (data) {
    this.write(data, encoding);
  }
  this.destroy();
};

CounterStream.prototype.destroySoon = function() {
  this.destroy();
};

/***/

CounterStream.prototype.addLineCount = function() {
  var that = this;

  if (!this.ended) {
    process.nextTick(function() {
      that.addLineCount();
    });
  }

  if (this.paused) {
    return;
  }

  if (this.buffer.length > 0) {
    ++this.count;
    this.emit('data', this.count + ': ' + this.buffer.shift() + '\n');
  } else {
    this.emit('drain');
  }
};

module.exports = {
  create: function() {
    return new CounterStream;
  }
};
index.js
var fs = require('fs'),
    counterStream = require('./counter_stream'),
    c = counterStream.create();

fs.createReadStream('./counter_stream.js').pipe(c).pipe(process.stdout);

for文でとりあえず受け取ったやつは全部処理しちゃうのがいいのか、process.nextTickで分散させた方がいいのか……
あとdataを発行してるけど、pipeでsrc受け取ってwriteするのとは違うのかなあ?どういう風に使い分けるんだろう?