Documentation

You are viewing the documentation for the 2.1.5 release in the 2.1.x series of releases. The latest stable release series is 2.4.x.

§反応的なストリーム処理

現代の Web プログラミングにおいて、ストリーム処理は重要なタスクです。これには、チャンク単位のデータダウンロード/アップロードや、 Comet や WebSocket など様々な技術を利用したデータストリームのリアルタイム処理、作成、合成、提供などが含まれます。

Iteratee はこのようなストリーム処理を実現する考え方と API です。 Iteratee の主な特徴は次のとおりです。

§Iteratee

Iteratee はデータのコンシューマーつまり、消費者ーであるといえます。Iteratee は入力データの処理方法および結果の生成方法が記述されます。Iteratee は十分な入力データを受け取ると、それに対して何らかの計算を行い結果値を返します。

// an iteratee that consumes chunkes of String and produces an Int
Iteratee[String,Int] 

Iteratee のインタフェースは [Iteratee[E, A]] のように二つの型パラメータを取ります。E は入力データの型、A は結果値の型です。

Iteratee は 3 つの状態を持ちます。それぞれ、Cont はさらに入力データを受付可能であること、Error はエラーにより入力を停止したこと、Done は計算結果が出ていることを表します。これら 3 つのステータスは Iteratee[E,A] インタフェースの fold メソッドにより定義することができます。

def fold[B](folder: Step[E, A] => Future[B]): Future[B]

Step オブジェクトは 3 つの状態を持ちます。

object Step {
  case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
  case class Cont[E, +A](k: Input[E] => Iteratee[E, A]) extends Step[E, A]
  case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}

fold メソッドは、 Iteratee をこれら3つの状態のいずれかに定義します。fold メソッドは 3 つのコールバック関数を引数にとり、状態に応じていずれか一つを呼び出し、最終的には結果値を返します。Iteratee に対する fold の呼び出しは、次のような意味になります。

Iteratee の状態に依存して、fold は引数に渡された関数のいずれかを呼び出して、適切な B 型の値を生成します。

まとめると、 Iteratee には 3 つの状態が存在して、 fold メソッドは Iteratee の状態に応じて何か実処理を行う方法を提供します。

§Iteratee の定義における重要な型

Iteratee の具体例を見るために、上記で説明した二つの重要な型について詳しく見ていきます。

§基本的な Iteratee

Iteratee や、より具体的にはその fold メソッドを定義することで、後々に再利用できる基本的な Iteratee を作成することができます。

val doneIteratee = new Iteratee[String,Int] {
  def fold[B](folder: Step[String,Int] => Future[B]): Future[B] = folder(Step.Done(1, Input.Empty))
}

上記のとおり、このような Iteratee は、この例における done のような適切なコールバック関数に必要な引数を渡すだけで実装することができます。

この Iteratee を利用するためには、約束された値を持つ Future を使います。

def folder(step: Step[String,Int]):Future[Option[Int]] = step match {
  case Step.Done(a, e) => future(Some(a))
  case Step.Cont(k) => future(None)
  case Step.Error(msg,e) => future(None)
} 

val eventuallyMaybeResult: Future[Option[Int]] = doneIteratee.fold(folder)

eventuallyMaybeResult.onComplete(i => println(i))

redeem されたときに Future の中身を取得するためには、 onComplete を使います。

// will eventually print 1
eventuallyMaybeResult.onComplete(i => println(i))

上記の実装をもっと汎用的にしてみましょう。Play には結果と入力値から Done 状態の Iteratee を作るヘルパーが用意されています。

val doneIteratee = Done[String,Int](1, Input.Empty)

Done Iteratee はこの通りとても簡単に作成でき、役に立つケースも無くはないのですが、ご覧のとおり入力データを一切消費してくれません。次は、入力データのチャンクをひとつ消費して、最終的にそのチャンクを結果値として返すような Iteratee を作ってみましょう。

val consumeOneInputAndEventuallyReturnIt = new Iteratee[String,Int] {
    
def fold[B](folder: Step[String,Int] => Future[B]): Future[B] = {
     folder(Step.Cont {
       case Input.EOF => Done(0, Input.EOF) //Assuming 0 for default value
       case Input.Empty => this
       case Input.El(e) => Done(e.toInt,Input.EOF) 
     })
  }
}

def folder(step: Step[String,Int]):Future[Int] = step match {
  case Step.Done(a, _) => future(a)
  case Step.Cont(k) => k(Input.EOF).fold({
    case Step.Done(a1, _) => Future.successful(a1)
    case _ => throw new Exception("Erroneous or diverging iteratee")
  })
  case _ => throw new Exception("Erroneous iteratee")
} 

Done の場合と同様に、Play には Cont 状態の Iteratee を作るためのヘルパーも用意されています。このヘルパーは、 Input[E] の値を引数にとって、 Iteratee[E,A] を返します。

val consumeOneInputAndEventuallyReturnIt = {
  Cont[String,Int](in => Done(100,Input.Empty))
}

さらに、Error 状態についても、Input[E] とエラーメッセージを渡すことで Error 状態の Iteratee を作成できるヘルパーが用意されています。

consumeONeINputAndEventuallyReturnIt の例に立ち戻ると、Cont と Done の 2 ステップのみの単純な Iteratee をベタに実装することは出来そうです。しかし、実際のアプリケーションで使うような、大量の入力データのチャンクを、場合によっては条件付きで消費して、最終的に結果を返すような複雑な Iteratee をするのはなかなかに厄介です。そこで、Play には典型的な Iterateee を作成するためのヘルパーが用意されています。

§入力データの畳込み

Iteratee でよくあるタスクとして、特定の状態を保持して、入力データを受け取るたびにその状態を更新していくような処理があります。この手の Iteratee は Iteratee.fold で作成することができます。

def fold[E, A](state: A)(f: (A, E) => A): Iteratee[E, A]

シグネチャの通り、この畳込みを行うヘルパーは初期状態 A、状態と入力データのチャンクを引数にとる (A,E) => A という関数、入力完了後に E を消費して A を返すような Iteratee[E, A] を引数にとります。作成された Iteratee は EOF が入力されたタイミングで、 A 型の結果値を含む Done 状態を返します。

試しに、入力データのバイト数を数えるような Iteratee を作ってみましょう。

val inputLength: Iteratee[Array[Byte],Int] = {
  Iteratee.fold[Array[Byte],Int](0) { (length, bytes) => length + bytes.size }
}

さらに別の例として、全ての入力データを結合して、最後にそれを返す、という Iteratee を作ってみましょう。

val consume: Iteratee[String,String] = {
  Iteratee.fold[String,String]("") { (result, chunk) => result ++ chunk }
}

Iteratee オブジェクトには、このような Iteratee を任意の TraversableLike オブジェクトから生成するための consume というヘルパーが用意されています。

val consume = Iteratee.consume[String]()

このヘルパーの利用例として、入力データのチャンクそれぞれについて、何らかの手続き的な処理を実行する Iteratee を作ってみましょう。

val printlnIteratee = Iteratee.foreach[String](s => println(s))

この他にも repeatignorefold1 などのヘルパーが用意されています。特に fold1 は、前述の fold と違い、入力データのチャンクを非同期で処理する機能があります。

さて、これまでの説明を読んだ方は、入力データのチャンクを受け取って、それを Iteratee に畳み込む、という手順を何度も何度も行うのはかなり面倒なのではないかと心配されているかもしれません。確かに、 Iteratee を使ってストリーム処理を行うためには、入力データを受け取るたびにそれを Iteratee に渡して、 fold 関数により状態をチェックして、状態が Cont であれば次のデータを渡して状態を更新し、そうでなければ結果値を返す、という手順を追う必要があります。ご安心ください。次で説明する Enumerator は、まさにこのために存在しています。

次ページ: Enumerators