はじめてのWritableStream
引き続きStreamのお勉強をしております。今日はWritableStreamを。
wstream.js
var stream = require('stream'), util = require('util'); function WStream() { var that = this; stream.Stream.call(this); this.writable = true; this.buffer = ''; process.nextTick(function() { that.emit('drain'); }); } util.inherits(WStream, stream.Stream); WStream.prototype.write = function(data, encoding) { var str; if (!this.writable) { this.emit('error', new Error('WStream not writable')); this.writable = false; return; } str = (Buffer.isBuffer(data)) ? (new Buffer(data)).toString(encoding || this.encoding) : data; this.buffer = str; process.stdout.write(this.buffer); var that = this; setTimeout(function() { that.emit('drain'); }, 500); return false; }; WStream.prototype.end = function(data, encoding) { if (data) { this.write(data, encoding); } this.destroy(); }; WStream.prototype.destroy = function() { this.writable = false; this.emit('close'); }; WStream.prototype.destroySoon = function() { this.destroy(); }; module.exports = { create: function() { return new WStream; } };
index.js
var fs = require('fs'), wstream = require('./wstream'), w = wstream.create(); var count = 0; w.on('drain', function() { if (++count < 10) { w.write(String(count), 'utf8'); } else { w.end(); } }); w.on('error', function(err) { console.error(err); }); w.on('pipe', function(src) { console.log('pipe'); }); w.on('close', function() { console.log('close'); w.write('asdf'); // error });
これを実行すると1..9までカウントした後にcloseと出力してから終了します。
これ、pipeされたときのこと考えるととっても面倒な気がするんだけど……
一筋縄じゃいかないStreamなのでした……
続・WritableStream
pipeしたときのpipeイベントをごにょごにょしなくても、いい感じにやってくれるみたい。
以下はソースコード。
wstream.js
var stream = require('stream'), util = require('util'); function WStream() { var that = this; stream.Stream.call(this); this.writable = true; this.source = null; this.once('pipe', function(src) { that.source = src; }); } util.inherits(WStream, stream.Stream); WStream.prototype.write = function(data, encoding) { var that = this; if (!this.writable) { this.emit('error', new Error('WStream not writable')); this.writable = false; return; } process.stdout.write(data, encoding); setTimeout(function() { that.emit('drain'); }, 500); return false; }; WStream.prototype.end = function(data, encoding) { if (data) { this.write(data, encoding); } this.destroy(); }; WStream.prototype.destroy = function() { this.writable = false; this.emit('close'); }; WStream.prototype.destroySoon = function() { this.destroy(); }; module.exports = { create: function() { return new WStream; } };
index.js
var fs = require('fs'), wstream = require('./wstream'), w = wstream.create(); fs.createReadStream('./wstream.js', { bufferSize: 16 }).pipe(w);
これを実行すると、wstream.jsの中身がちょこちょこ出力されるようになります。
一週間やってみて、まだまだ使いこなせてないけどそれなりにStreamがわかってきた気がする!