はじめてのWritableStream

引き続きStreamのお勉強をしております。今日はWritableStreamを。

wstream.js
var stream = require('stream'),
    util = require('util');

function WStream() {
  var that = this;

  stream.Stream.call(this);

  this.writable = true;
  this.buffer = '';

  process.nextTick(function() {
    that.emit('drain');
  });
}

util.inherits(WStream, stream.Stream);

WStream.prototype.write = function(data, encoding) {
  var str;

  if (!this.writable) {
    this.emit('error', new Error('WStream not writable'));
    this.writable = false;
    return;
  }

  str = (Buffer.isBuffer(data)) ?
    (new Buffer(data)).toString(encoding || this.encoding) :
    data;

  this.buffer = str;
  process.stdout.write(this.buffer);

  var that = this;
  setTimeout(function() {
    that.emit('drain');
  }, 500);

  return false;
};

WStream.prototype.end = function(data, encoding) {
  if (data) {
    this.write(data, encoding);
  }
  this.destroy();
};

WStream.prototype.destroy = function() {
  this.writable = false;
  this.emit('close');
};

WStream.prototype.destroySoon = function() {
  this.destroy();
};

module.exports = {
  create: function() {
    return new WStream;
  }
};
index.js
var fs = require('fs'),
    wstream = require('./wstream'),
    w = wstream.create();

var count = 0;

w.on('drain', function() {
  if (++count < 10) {
    w.write(String(count), 'utf8');
  } else {
    w.end();
  }
});
w.on('error', function(err) {
  console.error(err);
});
w.on('pipe', function(src) {
  console.log('pipe');
});
w.on('close', function() {
  console.log('close');
  w.write('asdf'); // error
});

これを実行すると1..9までカウントした後にcloseと出力してから終了します。


これ、pipeされたときのこと考えるととっても面倒な気がするんだけど……
一筋縄じゃいかないStreamなのでした……

続・WritableStream

pipeしたときのpipeイベントをごにょごにょしなくても、いい感じにやってくれるみたい。
以下はソースコード

wstream.js
var stream = require('stream'),
    util = require('util');

function WStream() {
  var that = this;

  stream.Stream.call(this);

  this.writable = true;
  this.source = null;

  this.once('pipe', function(src) {
    that.source = src;
  });
}

util.inherits(WStream, stream.Stream);

WStream.prototype.write = function(data, encoding) {
  var that = this;

  if (!this.writable) {
    this.emit('error', new Error('WStream not writable'));
    this.writable = false;
    return;
  }

  process.stdout.write(data, encoding);

  setTimeout(function() {
    that.emit('drain');
  }, 500);

  return false;
};
WStream.prototype.end = function(data, encoding) {
  if (data) {
    this.write(data, encoding);
  }
  this.destroy();
};
WStream.prototype.destroy = function() {
  this.writable = false;
  this.emit('close');
};
WStream.prototype.destroySoon = function() {
  this.destroy();
};

module.exports = {
  create: function() {
    return new WStream;
  }
};
index.js
var fs = require('fs'),
    wstream = require('./wstream'),
    w = wstream.create();

fs.createReadStream('./wstream.js', {
  bufferSize: 16
}).pipe(w);

これを実行すると、wstream.jsの中身がちょこちょこ出力されるようになります。
一週間やってみて、まだまだ使いこなせてないけどそれなりにStreamがわかってきた気がする!