Node.js 用の非同期処理を簡単にしてくれるライブラリ async.js

Node.js といえば非同期処理です.そして非同期処理と言えばコールバック.
そんなわけで (どんなわけで?),すぐにこんなコードになったりしがちですよね.

fs.writeFile(path, "hello", function (error) {
    fs.open(path, "a", 0666, function (error, file) {
        fs.write(file, "\nworld", null, "utf-8", function () {
            fs.close(file, function (error) {
                fs.readFile(path, "utf-8", function (error, data) {
                    var lines = data.split("\n");
                    sys.puts(lines[1]);
                });
            });
        });
    });
});

奥に向かって突き進むのが気持ちいい,そんな風に思っていた時期が僕にもありました...
でも,そのうちこんな冗談も言いたくなりますよね.

The last few lines of our #nodejs application - });});});});});});});});});}); our #clojure app looks like this - )))))))))))))))))))) #joke

こんな小さなサンプルでこれだと,現実的なアプリではどうなることやら,と心配で夜も眠れないので調べてみたら,いろいろなライブラリが当然のようにあったりするわけですね.

JSDeferred

これは元々ブラウザの中で動く普通の JavaScript 向けに作られたようですが,Node.js でも使えます.
Deferred は先週書いたテスティングフレームワーク Vows に出てきた Promise と似てるというか,その応用という感じです.
Promise/Future は将来確定する値を取得できるようにするためのオブジェクトですが,Deferred はさらに,値が確定した時に実行する処理を登録しておくことができます.処理の実行をその時まで先送りするから Deferred なんですね.
んで,JSDeferred を使うと先のサンプルは次のように書くことができます.

var fs = require('fs'), sys = require('sys');
var Deferred = require('../lib/jsdeferred/jsdeferred').Deferred;
var path = "fileio.txt";
Deferred.define();

next(function() {
    var deferred = new Deferred();
    fs.writeFile(path, "hello", function(error) {
        deferred.call();
    });
    return deferred;
}).next(function() {
    var deferred = new Deferred();
    fs.open(path, "a", 0666, function(error, file) {
        deferred.call(file);
    });
    return deferred;
}).next(function(file) {
    var deferred = new Deferred();
    fs.write(file, "\nworld", null, "utf-8", function() {
        deferred.call(file);
    });
    return deferred;
}).next(function(file) {
    var deferred = new Deferred();
    fs.close(file, function(error) {
        deferred.call();
    });
    return deferred;
}).next(function() {
    fs.readFile(path, "utf-8", function(error, data) {
        var lines = data.split("\n");
        sys.puts(lines[1]);
    });
});

うーみゅ... 奥に向かって突き進むコードが普通に縦に落ちていくコードにはなりました.しかし,少し残念な感じがしないでもない...
これは JSDeferred が悪いわけではなく,ビルディングブロックである Deferred を直接使ってるからこうなっちゃってるんですね.この決まり切った処理をラップした関数を用意すれば,スッキリしたコードになるはずです.しかし,現時点ではそんなありがたいものはなさそう...
ないなら作ればいいじゃない,と王妃様に言われそうな気がしますが,JSDeferred の作者さんも Node.js を使っているようなので,待ってればいずれ出てくるんじゃないのかなぁと期待しておきます.

promised-io

こちらは promised-based なテスティングフレームワーク patr と同じ作者による promised-based な IO フレームワーク
Node.js の fs モジュールや http.Client のラッパーを用意してくれているようです.
しかし... ドキュメントが...
一応こちらにも情報があるのですが,

なかなか辛いものがあります.
その中に出てくる例.

var myFile = fs.open("my-file.txt", "a");
myFile.write("some data").then(myFile.close);

これだけ見ると,結構いい感じに見えます.
一見同期的なコードに見えますが,実際にはファイルの open() が完了してから write() が,それが終わってから close() が行われます.カッコいい.
のですが,もうちょっと何かやろうとすると行き詰まってしまいました.
例えば最初の例を promised-io でやろうとすると,

var sys  = require("sys");
var fs = require("../lib/promised-io/fs");
var path = "fileio.txt";

fs.writeFile(path, "hello").then(function() {
    var myFile = fs.open(path, "a", 0666);
    myFile.write("\nworld").then(myFile.close);
})

この時点でかっこよくない気がする上に,この先の書き方が分かりません.


promised-io はいくつかのクラスを提供するのですが,ここで出てくるのはそのうちの 3 つ,PromiseDeferred,そして File です.これらは,
Promise <|-- Deferred <|-- File
という継承関係にあります (たぶん,でも自分の JavaScript 力では怪しいのだ).
で,問題なのは Filewrite() が返すのが File ではなく Deferred っぽいこと.なので,

    myFile.write(...).read(...).close().then(...);

なんてできそうに見えないのですね.チェーンができない.
さらに,Deferredthen() が返すのは Deferred ではなく Promise で,こいつの then()

Promise.prototype.then = function(resolvedCallback, errorCallback, progressCallback){
	throw new TypeError("The Promise base class is abstract, this function must be implemented by the Promise implementation");
};

ってなってたりして,使い物にならなかったり.
要するに,JSDeferred でできているようなチェーンができない気がするわけです.
自分の勘違いという可能性も高いのですが (なんせ JavaScript では最下級戦士),ドキュメントもアレなので,これ以上関わるのは難しいと判断して次へ行きます.

async.js

そんなわけで (どんなわけで?),ようやく今日の主題,async.js です.前置き長すぎ.
async.js は先の二つとは随分異なった印象のライブラリで,元々は関数型言語に見られる無限リストや map などの関数を提供するものみたいです.
こいつを使うと例えば

async.range(1, null, 2)
  .print()
  .end();

これでコンソールに 1, 3, 5... と奇数が延々と出力されます.
最初の range(start, stop, step) はジェネレータと呼ばれ,これが後続の関数にデータを渡します.この例では print() です.後続の関数は自分の処理が終わったら次の関数に処理を渡します.つまりパイプラインを構成しているわけです.それが延々と続いて (この例では print() しかありませんが),最後に end() にたどり着くと,ジェネレータに戻って次の値が生成されて... を繰り返します.
おっと,うちの環境だと 4000 を越えたところで例外が.スタックオーバーフローみたいな?
Node.js ってエラーがあった時のメッセージが不親切で状況がまるで分からないことが多い気がします.でもまぁ,これはきっとスタックオーバーフローでしょう.


でも大丈夫です.
async.js には delay() という関数があって,これを挟むと後続の処理を setTimeout() に渡してくれます.これで一度 Node.js に制御を戻してあげればスタックオーバーフローにはならないはず.

async.range(1, null, 2)
  .print()
  .delay(0)
  .end();

うん,大丈夫っぽい.
さらに

// 偶数
async.range(0, null, 2)
  .print()
  .delay(0)
  .end();

// 奇数
async.range(1, null, 2)
.print()
.delay(0)
.end();

なんてやると,0, 1, 2, 3, 4, 5... という具合に両方の数列が交互に出力されます.同期的なパイプラインに見えるコードが実は非同期に実行されている... なんかカッコいい!!


で,今の delay() の挙動が結構重要で,これってコールバックされるまで後続の処理を先送りしてくれるわけです.要するに Deferred なんですね.つまり delay() は,JSDefferd の next() や promised-io の then() みたいなことをやってくれているということ.
ってことは,このパイプラインに I/O を含めることだってできるはず.


そんなわけで (どんなわけで?),async.js は Node.js 標準の fs モジュールライクなプラグインを用意してくれてます.
ただ,ちょっとだけ残念なことに,open() の引数の並びが Node.js のとは違っていたり,readFile()/writeFile() はあるのに read()/write() はなかったり.
そして凄く残念なことに,ネットワーク関係のプラグインはなかったり.うーみゅ...


とりあえず気を取り直して,async.js の fs-node.js というプラグインをちょっと修正してみました.

@@ -53,7 +53,7 @@ async.plugin({
         })
     },
 
-    open: function(flags, mode) {
+    open: function(mode, flags) {
         return this.$fileOp(function(file, next) {
             fs.open(file.path, mode || file.mode || "r", flags || file.flags || 0666, next)
         }, "fd")
@@ -124,17 +124,12 @@ async.plugin({
             if (!file.path)
                 return next("not a file sequence!")
             
-            if (encoding)
-                fs.readFile(file.path, encoding, readCallback)
-            else
-                fs.readFile(file.path, readCallback)
-            
-            function readCallback(err, data) {
+            fs.readFile(file.path, encoding, function readCallback(err, data) {
                 if (err) 
                     return next(err)
                 file.data = data
                 next()
-            }            
+            })
         })
     },
     
@@ -142,6 +137,20 @@ async.plugin({
         return this.$fileOp(function(file, next) {
             fs.writeFile(file.path, data || file.data, next)
         })
+    },
+
+    write: function(buffer, offset, length, position) {
+        return this.$fileOp(function(file, next) {
+            if (!Buffer.isBuffer(buffer)) {
+                buffer = new Buffer(buffer, length);
+                length = buffer.length;
+                position = offset;
+                offset = 0;
+            }
+            fs.write(file.fd, buffer, offset, length, position, function(err, result){
+                next(null, file);
+            });
+        });
     }
 }, {
     files: function(files, root) {
@@ -154,6 +163,10 @@ async.plugin({
             }
         }))
     },
+
+    file: function(file, root) {
+        return this.files([file], root);
+    },
     
     glob: function(pattern) {
         function fileSort(file1, file2) {

これを使うと,最初の例はこうなります.ジャジャーン.

var sys = require('sys'), fs = require('fs');
var async = require('../lib/async');
var path = 'fileio.txt';

async.file(path)
  .writeFile('hello')
  .open('a')
  .write('\nworld')
  .close()
  .readFile('utf-8')
  .get('data')
  .each(function(data) {
      var lines = data.split("\n");
      sys.puts(lines[1]);
  })
  .end();

おおぉぉぉ...
これです.まさに,これが望んでいたものそのものですよ!!
async.js△!!!!


この動作を簡単に説明すると,
最初の file() 関数はジェネレータで,ファイルのパスを作って後続に渡します.
次の writeFile() はそのファイルに文字列を書き込みます.
その次の open() はファイルをオープンし,ファイル記述子を後続へのデータに追加します.
write() はそのファイル記述子を使って文字列を書き込みます.
close() はファイルをクローズしてファイル記述子を削除します.
readFile() はジェネレータが設定したパスを使ってファイルを読み込み,結果をパイプラインを流れるオブジェクトの data プロパティに設定します.
get() はその data プロパティを取り出して後続に渡します.
each() はそれ (data) をコールバックで受け取って処理します.
という感じ.


ポイントは,open() とか write() という名前の関数は,実際には open()write() を行う関数を呼び出す関数を準備している (パイプラインを構築する) だけということ.そして,実際の関数は前段の処理が終わった後に非同期に呼び出されるということ.


途中に print() を入れると,パイプラインを流れるデータを表示してくれて便利です.
対応するソースと混ぜるとこんな感じになります.

//async.file(path)
{ path: 'fileio.txt', name: 'fileio.txt' }
//  .writeFile('hello')
{ path: 'fileio.txt', name: 'fileio.txt' }
//  .open('a')
{ path: 'fileio.txt', name: 'fileio.txt', fd: 5 }
//  .write('\nworld')
{ path: 'fileio.txt', name: 'fileio.txt', fd: 5 }
//  .close()
{ path: 'fileio.txt'
, name: 'fileio.txt'
, fd: undefined
}
//  .readFile('utf-8')
{ path: 'fileio.txt'
, name: 'fileio.txt'
, fd: undefined
, data: 'hello\nworld'
}
//  .get('data')
hello
world

ジェネレータの file() が作ったオブジェクトに open()fd を追加して,close がそれを消して,readFiledata を追加して,get() がその data だけを後続に渡している様子が分かります.


ちなみに,ジェネレータに複数のファイルのパスを受け取る async.js 本来の files() を使うと,

async.files(['file1.txt', 'file2.txt', 'file3.txt'])
  .writeFile('hello')
  .open('a')
  .write('\nworld')
  .close()
  .readFile('utf-8')
  .get('data')
  .each(function(data) {
      var lines = data.split("\n");
      sys.puts(lines[1]);
  })
  .end();

なんて風に複数のファイルに対する繰り返しも簡単.
さらに,glob() とか walkfiles() なんて便利そうなジェネレータも用意されているので,ファイルに対する操作に関してはそこそこ便利そうです.


ネットワーク関係のプラグインがないとか,ReadableStream にも対応してなさそうとか,エラーハンドリングはどうするとか,課題も多そうな async.js ですが,それでも Node.js でのプログラミングスタイルについて考えさせてくれるいい材料という感じがします.
http.Client のラッパーでも作りながらもう少し遊んでみようと思ってます.