Node.js の非同期 I/O におけるデータ受信の別バリエーション

(Twitterで言及を読んで追記) 普通にアプリケーション側のコードをコールバックベースで記述できるなら、その方が自然です。そうした方がいいと思います。ここで扱う例は「アプリケーション側のコードを変更できない事情がある」「アプリケーション側のコードが巨大になることが予想されるためコールバックベースの記述を積み上げることを容認できない」などの状況の場合に、アプリケーション側のコードを非コールバックベースのものにしつつnode.jsでどうにかするための方法です。

Twitter での言及というのはたぶん自分のこれ.

「node.jsの非同期I/Oにおけるデータ受信のパターン」 http://ow.ly/4srsP 超微妙。素直に考えると read(bytes, callback) な API を用意すれば足りるはず。

こうつぶやいた時点では上で引用した注記はまだなかったので,「Node.js におけるパターン」としては超微妙だなと思ったのです.でまぁ,いろいろ事情があるらしいのですが,その制約が一般的なのかはやっぱり超微妙.Node を使うのに,Node らしい非同期プログラミングが容認されない? うーん...


よくわかりませんが,そういう制約がなかった場合のバリエーションということで read(bytes, callback) を実装してみました.というか,setEncoding() されていることを前提に文字列を扱う read(length, callback) ということにしました.

exports.createReader = function(stream) {
  return new Reader(stream);
};

function Reader(stream) {
  this.stream = stream;
  this.buf = '';
  this.loength = 0;
  this.callback = null;
  stream.on('data', Reader.prototype.onData.bind(this));
  stream.on('end', Reader.prototype.onEnd.bind(this));
};

Reader.prototype.read = function read(length, callback) {
  if (!this.stream) {
    process.nextTick(callback.bind(null, new Error('no data')));
    return;
  }
  this.length = length;
  this.callback = callback;
  if (length <= this.buf.length) {
    process.nextTick(this.notify.bind(this));
  }
};

Reader.prototype.onData = function onData(data) {
  this.buf += data;
  if (this.length <= this.buf.length) {
    this.notify();
  }
};

Reader.prototype.onEnd = function onEnd() {
  if (this.length) {
    this.callback(new Error('no data'), this.buf);
  }
  this.stream = null;
};

Reader.prototype.notify = function notify() {
  var frame = this.buf.slice(0, this.length);
  this.buf = this.buf.slice(this.length);
  var cb = this.callback;
  this.length = 0;
  this.callback = null;
  cb(null, frame);
};

いろいろ手抜きだけど気にしない.


こんな感じで使います.

var createReader = require('./reader').createReader;

var stream = ...;
stream.setEncoding('utf8');
var reader = createReader(stream);
...
reader.read(4, function(err, data) { // 4 文字読み込む
    if (err) return console.log(err);
    // ここで data を処理
    ...
});

現状だと際限なくストリームから読み出してしまうので,実用的なものにするにはバッファがある程度大きくなったらストリームの pause() を呼び出してあげるとかしないといけないけど,イメージとしてはこんな感じということで.