By

ES6で Promise, co, ramda, FRP(bacon.js)を試す

先日、Javascriptでちょっとしたツールを書いたのですが、いい機会なので、これを題材に ES6 について勉強してみました。

最初にES5で書いたロジックを、次のようにいろいろなスタイルで書き直してみました。

ソースは、ここ です。

まだ、私自身勉強中なのですが、これをもとに ES6 の機能や新しいパラダイムについて簡単に紹介してみたいと思います。

例題

書いたツールは、誰かがAWSのWebConsoleにログインした時にそれをSlackに通知するツールです。現在は、cronで動かしていますが、AWS Lambdaが Tokyo region でサービスされるようになったら、Lambda上で動かすつもりで、JavaScriptで書いてみました。

この記事のサンプルプログラムはそれを若干簡略化したもので、Slack通知の代わりにコンソールに表示します。


    $ export CLOUDTRAIL_BUCKET=(your S3 bucket for AWS CloudTrail)
    $ export CLOUDTRAIL_DIR=(your directory for AWS CloudTrail)
    $ npm install
    $ npm build
    $ node run.js 2015/06/18
    ConsoleLogin by essa from 122.29.xxx.yyy

ES5で書いたバージョンは次のようになります。

function getConsoleLogin(s3, date, callback) {
  var params = {
    Bucket: process.env.CLOUDTRAIL_BUCKET,
    Prefix: process.env.CLOUDTRAIL_DIR + date
  };

  s3.listObjects(params, function(error, data) {
    if (error) {
      console.log(error);
      process.exit(1);
    }
    var files = data.Contents;
    var i, j;

    for (i=0; i < files.length; i++) {
      var file = files[i].Key;
      var params = {
        Bucket: process.env.CLOUDTRAIL_BUCKET,
        Key: file
      };
      s3.getObject(params, function(error, data) {
        if (error) {
          console.log(error);
          process.exit(1);
        }
        zlib.unzip(data.Body, function(error, data) {
          var logs = JSON.parse(data.toString());
          var records = logs.Records;

          for (j=0; j < records.length; j++) {
            var record = records[j];
            if (record.eventName !== "ConsoleLogin") {
              continue;
            }
            var msg = record.eventName +
              " by " + record.userIdentity.userName +
              " from " + record.sourceIPAddress;
            callback(msg);
          }
        });
      });
    }
  });
}

この関数は、AWS-SDKの s3.listObjects という API で、S3上の指定したディレクトリのファイル一覧を取って、そのファイルに保存されている JSON 形式のログを s3.getObjectというAPIで読み、その中から、指定した条件に合うログのデータをコールバックに渡します。

AWS-SDKのAPIは、多くの Javascript のAPIと同じように、コールバックで結果を渡します。このケースでは、それが二重になっていることと、それぞれのデータの中の配列を処理することでネストが深くなってしまいます。

Cloudtrailのログファイルは圧縮されているので、それを解凍するための処理(zlib.unzip)で、さらに一段コールバックが必要になります。

単純に書くと、このように非同期処理に伴なうコールバックによってネストが深くなってしまうのは、Javascriptにつきものの悩みで、書きかえにあたっては、これをどのように見やすくできるかということを中心に考えました。

ES6直訳バージョン

まず最初に、JavaScript - ぼくのかんがえたさいきょうのES6プロジェクトテンプレート - Qiita という記事を参考に環境を作り、それのテストを兼ねて、そのまま ES6 に直してみました。

ES6のソースは、babel というコンパイラ(トランスパイラ)で ES5 にコンパイルして、nodeで動かしています。テストは、mochaとsinon.jsを使っています。これらの環境回りは上記記事などを参照してください。

ES6バージョンのソースです。

function getConsoleLogin(s3, date, callback) {
  let params = {
    Bucket: process.env.CLOUDTRAIL_BUCKET,
    Prefix: process.env.CLOUDTRAIL_DIR + date
  };

  s3.listObjects(params, (error, data) => {
    if (error) {
      console.log(error);
      process.exit(1);
    }
    let files = data.Contents;

    for (let file of files) {
      let params = {
        Bucket: process.env.CLOUDTRAIL_BUCKET,
        Key: file.Key
      };
      // console.log('getting ' + file.Key)
      s3.getObject(params, (error, data) => {
        if (error) {
          console.log(error);
          process.exit(1);
        }
        // console.log(file.Key)
        zlib.unzip(data.Body, (error, data) => {
          let logs = JSON.parse(data.toString());
          let records = logs.Records;

          for (let record of records) {
            if (record.eventName !== "ConsoleLogin") {
              continue;
            }
            let msg = record.eventName +
              " by " + record.userIdentity.userName +
              " from " + record.sourceIPAddress;
            callback(msg);
          }
        });
      });
    }
  });
}

ES6は、まずは、JavaScript: The Good Parts の方向に自然に拡張していった言語だと思います。ここでは、そういう方向の以下の機能だけを使いました。

  • varでなく、letで変数を宣言
  • “arrow function”記法を使用
  • 配列の処理を for( .. of ...) で書く

ES6 with Promise

次に、ES6で標準ライブラリになった、Promiseという機能を使ってみました。

Promiseを使うと、callbackになっていた処理を、.then というメソッドで逐次処理的に書くことができます。

まず、使用するAPIを Promiseでラップします。

function AsPromise(func) {
  return function(arg) {
    return new Promise((resolve, reject)=>{
      func(arg, (error, data) => {
        if (error) {
          reject(error);
        } else {
          resolve(data);
        }
      });
    });
  };
}

function getConsoleLogin(s3, date, callback) {
  const listObjectsAsPromise = AsPromise(s3.listObjects.bind(s3));
  const getObjectAsPromise = AsPromise(s3.getObject.bind(s3));
  const unzipAsPromise = AsPromise(zlib.unzip);
  ...
}

今回は使わなかったのですが、aws-sdk-promisenode-promise のようなライブラリを使えば、これを自分で書く必要はありません。また、今後は多くのAPIやライブラリが最初からPromise対応になると思います。

これによって、以下のように、.thenの連鎖でコールバックを使わずにロジックを書けるようになります。(.thenのコールバックが返したものがPromiseでなければ、その値をそのまま次の.thenに渡す)

function getProp(key) {
  return function(obj) {
    return obj[key];
  };
}

function getConsoleLogin(s3, date, callback) {
  const listObjectsAsPromise = AsPromise(s3.listObjects.bind(s3));
  const getObjectAsPromise = AsPromise(s3.getObject.bind(s3));
  const unzipAsPromise = AsPromise(zlib.unzip);

  const params = {
    Bucket: process.env.CLOUDTRAIL_BUCKET,
    Prefix: process.env.CLOUDTRAIL_DIR + date
  };

  return listObjectsAsPromise(params)
    .then(getProp("Contents"))
    .then((files)=>{
      return Promise.all(files.map((file)=>{
        const params = {
          Bucket: process.env.CLOUDTRAIL_BUCKET,
          Key: file.Key
        };
        return getObjectAsPromise(params)
          .then(getProp("Body"))
          .then(unzipAsPromise)
          .then((data)=>{
            const logs = JSON.parse(data.toString());
            const records = logs.Records;

            return Promise.all(records.filter((record)=>{
              return record.eventName === "ConsoleLogin" ;
            }).map((record)=>{
              const msg = `${record.eventName} by ${record.userIdentity.userName} from ${record.sourceIPAddress}`;
              return callback(msg);
            }));
          });
      }));
    }).catch((error)=>console.log(error));
}

配列を処理する時には、Promsise.allというAPIを使っていますが、これは、複数のPromise内の非同期処理を並行処理して結果を待ち合わせるものです。

もし、API呼出しのうち一つでも失敗すると、最後の }).catch((error)=>console.log(error)) にエラーが通知されます。最低限のエラー処理をまとめて書くには便利だと思います。

また、.catch.then.の直後に書けば、APIごとに別のエラー処理を行うこともできます。

逐次処理 (処理Aの完了後に処理Bが、処理Bの完了後に処理Cが実行される)

listObjectsAsPromise(params)
  .then(/*処理A*/)
  .then(/*処理B*/)
  .then(/*処理C*/)
  .catch(/*エラー処理*/)

並列処理 (3つのgetObjectAsPromiseが同時に実行される)

Promise.all(
  [
    getObjectAsPromise(...),
    getObjectAsPromise(...),
    getObjectAsPromise(...),
  ]
).then(/*3つ全部正常終了した時、完了後に呼ばれる*/)
 .error(/*どれかでエラーが発生した時呼ばれる*/)

データ構造を作るように、並列処理と逐次処理を自由に組合せて、必要なタイミングでコールバックやエラー処理を呼出すことができます。

ES6 with ‘co’ and generator

‘co’というライブラリを使うと、このような非同期のAPI呼出しを含む処理を同期処理のように(rubyのように)書くことができます。

function getConsoleLogin(s3, date, callback) {
  const listObjectsAsPromise = AsPromise(s3.listObjects.bind(s3));
  const getObjectAsPromise = AsPromise(s3.getObject.bind(s3));
  const unzipAsPromise = AsPromise(zlib.unzip);

  const params = {
    Bucket: process.env.CLOUDTRAIL_BUCKET,
    Prefix: process.env.CLOUDTRAIL_DIR + date
  };

  return co(function*(){
    const files = yield listObjectsAsPromise(params);
    for(let file of files.Contents) {
      const params = {
        Bucket: process.env.CLOUDTRAIL_BUCKET,
        Key: file.Key
      };
      const ret = yield getObjectAsPromise(params);
      const data = yield unzipAsPromise(ret.Body);
      const logs = JSON.parse(data.toString());
      for (let record of logs.Records) {
        if (record.eventName !== "ConsoleLogin") {
          continue;
        }
        const msg = record.eventName +
          " by " + record.userIdentity.userName +
          " from " + record.sourceIPAddress;
        callback(msg);
      }
    }
  });
}

yieldという記述を無いものとして見れば、Java や Ruby と同じような、逐次処理(待ち合わせるAPI)を使って書かれているように見えます。’co’ は、 generator という ES6の新しい機能を巧みに使うことで、このような書き方を可能にしています。

generatorは、データを生成する側と消費する側が、コルーチンとして交互に動く仕組みです。消費側が要求するまでデータの生成が遅延されます。’co’では、処理をイベントを生成する generator として書くことによって、擬似的な逐次処理を可能にしています。

‘co’のラッパー関数は、イベントの消費者となり、そのイベントが完了する時に次のイベントを要求することで、ユーザ関数をコントロールします。

const ret = yield getObjectAsPromise(params);

たとえば、ここでは、getObjectAsPromiseは、APIを発行したらその結果が返る前にPromiseを即時リターンします。yieldによって、この関数の実行は一時中断し、このPromiseが、外側の coという関数に渡されます。coが、Promise完了後に、次のデータを要求することで、yieldによって中断されていた所から実行が継続し、retに値が代入されてから、次の行に進みます。

この書き方だと、コールバックが無い分だけネストは浅くなります。処理の流れも見えやすくなります。

なお、このバージョンは、前のバージョンと少しだけ違う所があります。

最初の3つのバージョンは、複数のログファイルを全部一度に要求しますが、このバージョンは、ログファイルの処理は一つづつ順番に行います。最初のファイルの応答があって、その処理を終えてから、次のファイルを要求します。

配列の各要素を同時に並行するのではなく、一つづつ順番に処理することが必要な場合には、この書き方がよいと思います。

それと、yield の後に Promise の配列を書くと、並列処理もできるそうです。つまり、ちょっと複雑ですが、’co’ のラッパー関数とのプロトコルを理解すれば、yieldによって、処理の流れをコントロールできます。

ES6 with functional programing style with ramda.js

次に Ramda というライブラリを使って、関数型プログラミングのスタイルで書いてみました。

function getConsoleLogin(s3, date, callback) {
  const nodeCallback = (dataFunc) => {
    return function(error, data) {
      if (error) {
        console.log(error);
        process.exit(1);
      } else {
        return dataFunc(data);
      }
    };
  };

  const format = (record) => `${record.eventName} by ${record.userIdentity.userName} from ${record.sourceIPAddress}`;

  const processLogRecords = R.pipe(
    R.invoke("toString", []),
    JSON.parse,
    R.prop("Records"),
    R.filter(
      R.where({
        eventName: R.equals("ConsoleLogin")
      })
    ),
    R.forEach(R.pipe(format, callback))
  );


  const processData = (data) => {
    zlib.unzip(data.Body, nodeCallback(processLogRecords));
  };

  const processFile = (file) => {
    var params = {
      Bucket: process.env.CLOUDTRAIL_BUCKET,
      Key: file.Key
    };
    s3.getObject(params, nodeCallback(processData));
  };

  const processList = R.pipe(
    R.prop("Contents"),
    R.forEach(processFile)
  );

  var params = {
    Bucket: process.env.CLOUDTRAIL_BUCKET,
    Prefix: process.env.CLOUDTRAIL_DIR + date
  };
  s3.listObjects(params, nodeCallback(processList));
}

プレーンな JS を使っていても、コールバック関数を処理の単位で分割すれば、最初のプログラムは見通しがよくなります。ここでは、それをやっているだけなのですが、ramda.js を使うと、「関数の合成」という方法で、部品の組合せで関数を作り出すことができます。

たとえば、次の箇所は、

  const processLogRecords = R.pipe(
    R.invoke("toString", []),
    JSON.parse,
    R.prop("Records"),
    R.filter(
      R.where({
        eventName: R.equals("ConsoleLogin")
      })
    ),
    R.forEach(R.pipe(format, callback))
  );

もとのバージョンのこの部分に相当する関数を合成します。

        zlib.unzip(data.Body, (error, data) => {
          let logs = JSON.parse(data.toString());
          let records = logs.Records;

          for (let record of records) {
            if (record.eventName !== "ConsoleLogin") {
              continue;
            }
            let msg = record.eventName +
              " by " + record.userIdentity.userName +
              " from " + record.sourceIPAddress;
            callback(msg);
          }
        });

関数型プログラミングは、このように、単純な処理を合成して、求める関数を作り出します。ramdaのようなライブラリには合成する関数のパーツが多種多様に用意されているので、うまく選択すると、見通しがよくバグを入れこむ余地がないコードが書けそうな気がします。

ES6 with functional reactive programing style with bacon.js

最後に、今一番ホットなFRP(Functional reactive programing)のスタイルにも挑戦してみました。

Javascript用の FRP のライブラリは、RxJS と bacon.js が有名ですが、後者の方が pragmaticな感じで使いやすそうに見えたので、これを使いました。

function getConsoleLogin(s3, date, callback) {
  var params = {
    Bucket: process.env.CLOUDTRAIL_BUCKET,
    Prefix: process.env.CLOUDTRAIL_DIR + date
  };

  const files = Bacon.fromNodeCallback(s3.listObjects.bind(s3), params)
    .map(".Contents")
    .flatMap(Bacon.fromArray)
    .map(".Key")
    ;

  const records = files.flatMap((fname)=>{
    var params = {
      Bucket: process.env.CLOUDTRAIL_BUCKET,
      Key: fname
    };
    return Bacon.fromNodeCallback(s3.getObject.bind(s3), params);
  })
    .map(".Body")
    .flatMap((data)=>{
      return Bacon.fromNodeCallback(zlib.unzip, data);
    })
    .map((x)=>x.toString())
    .map(JSON.parse)
    .map(".Records")
    .flatMap(Bacon.fromArray)
    ;

  const format = (record) => `${record.eventName} by ${record.userIdentity.userName} from ${record.sourceIPAddress}`;
  return records.filter((x)=>x.eventName === "ConsoleLogin")
    .map(format)
    .doAction(callback)
    .toPromise()
    ;
}

mapfilterなどの標準パーツを組み合わせることは同じですが、FRPの場合は、組合せてできあがるものが、関数ではなく、イベントストリームのパイプラインになります。

イベントがパイプを通って、少しづつ処理されて、最後に出口から望むものが出てくる、というようなイメージです。

  Bacon.fromNodeCallback(s3.listObjects.bind(s3), params)

が起点で、listObjectの結果が最初のイベントとしてパイプラインに流しこまれ、後続のステップでそのイベントを少しづつ変形していくというイメージです。

パイプラインには、時間やイベントという概念があるので、それを扱う部品も用意されています。たとえば、.bufferingThrottleというメソッドを次のように使うと、callbackの呼出し間隔が一定時間より短かくならないことが保証できます。

  .map(format)
  .bufferingThrottle(500)
  .doAction(callback)

これは、最終的に結果を報告する Slack などの API に、一定時間内の回数制限がある場合には、非常に便利です。

それと、デバッグや動作確認のためには、.log() という API があって、上記のチェーンの(ほぼ)任意の箇所に、この一行を指しこむと、そこを流れているデータがコンソールに出力されます。

最初は、意味がわからなかったり勘違いしていたメソッドがいくつかありましたが、この.log() で入出力を見ることで、すぐ機能を理解できました。

まとめ

ES6は、攻守ともにかなり強化されていると感じました。

守り(good parts的機能)は、地味ですが、普通に予想通りに動いて、すぐに使えました。また、arrow functionは、関数型 や FRP を楽に書くのに欠かせません。

複数行の arrow function は、return が必要なので、CoffeScript の方がいいような気がしますが、1行の関数を最小限の記述で書けるのは、やはり便利です。

攻めの機能としては、generatorが目を引きます。これは、使い所がかなり難しいとは思いますが、single thread で co-routine の処理系を前提とした言語としては、自然な書き方であるような気もします。

しかし、一番印象に残ったのは、FRP(bacon.js) で、これは、ブラウザ側でもサーバ側でも、イベントの発生元が複数ある場合の処理が、画期的に簡単に書けるライブラリというかパラダイムのように感じました。

特に画期的なのは、bufferingThrottleのような機能を独立性の高い部品として作れることです。

従来のパラダイムでは、こういうイベントやタイミングの複雑な処理は、部品化しても、使う側がいろいろ気を使って、中の動きをよく理解しないと使えない「部品」になってしまったのですが、FRPにおいては、作る側は難しくても使う側が割と気軽に使える部品になるような気がします。

なお、ES6には、この他に、オブジェクト指向の機能やモジュール分割の機能がありますが、今回は使いませんでした。

一緒にユニークな決済サービスを作ってくれる Rails エンジニアを募集中です!
多国籍なメンバーと一緒に仕事をしてみませんか?詳細はこちらのリンクです:D