§反応的なストリーム処理
現代の Web プログラミングにおいて、ストリーム処理は重要なタスクです。これには、チャンク単位のデータダウンロード/アップロードや、 Comet や WebSocket など様々な技術を利用したデータストリームのリアルタイム処理、作成、合成、提供などが含まれます。
Iteratee はこのようなストリーム処理を実現する考え方と API です。 Iteratee の主な特徴は次のとおりです。
- データのストリームを生成、処理、変換することができる
- 様々なデータを同じような形式で扱える (例えば、ディスク上のファイル、WebSocket、Chunked HTTP、データアップロードなど)
- 合成可能: ストリームのソースやコンシューマを、別の型のソースやコンシューマに変換するためのアダプタや変換器が豊富に用意されている
- 受信完了とするために必要なデータの定義を細かくコントロールできる。また、ソースがデータ送信を完了した際に通知を受けることができる
- ノン・ブロッキング、リアクティブで、かつ (Thread、Memoryなどの) リソース消費をコントロールできる
§Iteratee
Iteratee はデータのコンシューマーつまり、消費者ーであるといえます。Iteratee は入力データの処理方法および結果の生成方法が記述されます。Iteratee は十分な入力データを受け取ると、それに対して何らかの計算を行い結果値を返します。
// an iteratee that consumes chunkes of String and produces an Int
Iteratee[String,Int]
Iteratee のインタフェースは [Iteratee[E, A]]
のように二つの型パラメータを取ります。E
は入力データの型、A
は結果値の型です。
Iteratee は 3 つの状態を持ちます。それぞれ、Cont
はさらに入力データを受付可能であること、Error
はエラーにより入力を停止したこと、Done
は計算結果が出ていることを表します。これら 3 つのステータスは Iteratee[E,A]
インタフェースの fold
メソッドにより定義することができます。
def fold[B](folder: Step[E, A] => Future[B]): Future[B]
Step
オブジェクトは 3 つの状態を持ちます。
object Step {
case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
case class Cont[E, +A](k: Input[E] => Iteratee[E, A]) extends Step[E, A]
case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}
fold メソッドは、 Iteratee をこれら3つの状態のいずれかに定義します。fold メソッドは 3 つのコールバック関数を引数にとり、状態に応じていずれか一つを呼び出し、最終的には結果値を返します。Iteratee に対する fold
の呼び出しは、次のような意味になります。
- Iteratee が
Done
状態であれば、A
という型の計算結果とInput[E]
という型のこれから消費される入力データの最後のチャンク を元に、B
という型の値を生成します。 - Iteratee が
Cont
状態であれば、Input[E] => Iteratee[E,A]
という型の継続 (入力を待ち受ける) を使って、最終的にはB
という型の値を生成します。この状態が Iteratee にデータを入力する唯一のタイミングであり、データを入力後は提供された継続を使って新しい状態の Iteratee を返します。 - Iteratee が
Error
状態であれば、String
型のエラーメッセージと、エラーの原因となった入力データを元に、B
型の値を生成します。
Iteratee の状態に依存して、fold
は引数に渡された関数のいずれかを呼び出して、適切な B
型の値を生成します。
まとめると、 Iteratee には 3 つの状態が存在して、 fold
メソッドは Iteratee の状態に応じて何か実処理を行う方法を提供します。
§Iteratee の定義における重要な型
Iteratee の具体例を見るために、上記で説明した二つの重要な型について詳しく見ていきます。
Input[E]
は入力データのチャンクを表し、実際の入力データを含むEl[E]
かEmpty
チャンクか、またはストリームの終端を表す EOF のいずれかになります。
例えば、Input[String]
はEl("Hello!")
や Empty 、 EOF にいずれかになります。
Promise[A]
は、その名の通り、A
型の値の Promise です。Promise[A]
は、将来的にA
型の値が与えられた時に redeem(訳注:約束を果たす、という意味)されて、事前に登録されたコールバック関数やその他の処理を呼び出します。Promise は並列処理を同期させたり、非同期処理を合成する際に便利なデータ構造です。Promise についての詳細は 非同期処理 を参照してください。
§基本的な Iteratee
Iteratee や、より具体的にはその fold メソッドを定義することで、後々に再利用できる基本的な Iteratee を作成することができます。
Input[String]
を読み飛ばした上で1:Int
を生成してEmpty
を返すようなDone
状態の Iteratee
val doneIteratee = new Iteratee[String,Int] {
def fold[B](folder: Step[String,Int] => Future[B]): Future[B] = folder(Step.Done(1, Input.Empty))
}
上記のとおり、このような Iteratee は、この例における done
のような適切なコールバック関数に必要な引数を渡すだけで実装することができます。
この Iteratee を利用するためには、約束された値を持つ Future
を使います。
def folder(step: Step[String,Int]):Future[Option[Int]] = step match {
case Step.Done(a, e) => future(Some(a))
case Step.Cont(k) => future(None)
case Step.Error(msg,e) => future(None)
}
val eventuallyMaybeResult: Future[Option[Int]] = doneIteratee.fold(folder)
eventuallyMaybeResult.onComplete(i => println(i))
redeem されたときに Future
の中身を取得するためには、 onComplete
を使います。
// will eventually print 1
eventuallyMaybeResult.onComplete(i => println(i))
上記の実装をもっと汎用的にしてみましょう。Play には結果と入力値から Done
状態の Iteratee を作るヘルパーが用意されています。
val doneIteratee = Done[String,Int](1, Input.Empty)
Done
Iteratee はこの通りとても簡単に作成でき、役に立つケースも無くはないのですが、ご覧のとおり入力データを一切消費してくれません。次は、入力データのチャンクをひとつ消費して、最終的にそのチャンクを結果値として返すような Iteratee を作ってみましょう。
val consumeOneInputAndEventuallyReturnIt = new Iteratee[String,Int] {
def fold[B](folder: Step[String,Int] => Future[B]): Future[B] = {
folder(Step.Cont {
case Input.EOF => Done(0, Input.EOF) //Assuming 0 for default value
case Input.Empty => this
case Input.El(e) => Done(e.toInt,Input.EOF)
})
}
}
def folder(step: Step[String,Int]):Future[Int] = step match {
case Step.Done(a, _) => future(a)
case Step.Cont(k) => k(Input.EOF).fold({
case Step.Done(a1, _) => Future.successful(a1)
case _ => throw new Exception("Erroneous or diverging iteratee")
})
case _ => throw new Exception("Erroneous iteratee")
}
Done
の場合と同様に、Play には Cont
状態の Iteratee を作るためのヘルパーも用意されています。このヘルパーは、 Input[E]
の値を引数にとって、 Iteratee[E,A]
を返します。
val consumeOneInputAndEventuallyReturnIt = {
Cont[String,Int](in => Done(100,Input.Empty))
}
さらに、Error
状態についても、Input[E]
とエラーメッセージを渡すことで Error
状態の Iteratee を作成できるヘルパーが用意されています。
consumeONeINputAndEventuallyReturnIt
の例に立ち戻ると、Cont と Done の 2 ステップのみの単純な Iteratee をベタに実装することは出来そうです。しかし、実際のアプリケーションで使うような、大量の入力データのチャンクを、場合によっては条件付きで消費して、最終的に結果を返すような複雑な Iteratee をするのはなかなかに厄介です。そこで、Play には典型的な Iterateee を作成するためのヘルパーが用意されています。
§入力データの畳込み
Iteratee でよくあるタスクとして、特定の状態を保持して、入力データを受け取るたびにその状態を更新していくような処理があります。この手の Iteratee は Iteratee.fold
で作成することができます。
def fold[E, A](state: A)(f: (A, E) => A): Iteratee[E, A]
シグネチャの通り、この畳込みを行うヘルパーは初期状態 A
、状態と入力データのチャンクを引数にとる (A,E) => A
という関数、入力完了後に E
を消費して A
を返すような Iteratee[E, A]
を引数にとります。作成された Iteratee は EOF
が入力されたタイミングで、 A
型の結果値を含む Done
状態を返します。
試しに、入力データのバイト数を数えるような Iteratee を作ってみましょう。
val inputLength: Iteratee[Array[Byte],Int] = {
Iteratee.fold[Array[Byte],Int](0) { (length, bytes) => length + bytes.size }
}
さらに別の例として、全ての入力データを結合して、最後にそれを返す、という Iteratee を作ってみましょう。
val consume: Iteratee[String,String] = {
Iteratee.fold[String,String]("") { (result, chunk) => result ++ chunk }
}
Iteratee
オブジェクトには、このような Iteratee を任意の TraversableLike
オブジェクトから生成するための consume
というヘルパーが用意されています。
val consume = Iteratee.consume[String]()
このヘルパーの利用例として、入力データのチャンクそれぞれについて、何らかの手続き的な処理を実行する Iteratee を作ってみましょう。
val printlnIteratee = Iteratee.foreach[String](s => println(s))
この他にも repeat
、 ignore
、 fold1
などのヘルパーが用意されています。特に fold1
は、前述の fold
と違い、入力データのチャンクを非同期で処理する機能があります。
さて、これまでの説明を読んだ方は、入力データのチャンクを受け取って、それを Iteratee に畳み込む、という手順を何度も何度も行うのはかなり面倒なのではないかと心配されているかもしれません。確かに、 Iteratee を使ってストリーム処理を行うためには、入力データを受け取るたびにそれを Iteratee に渡して、 fold
関数により状態をチェックして、状態が Cont
であれば次のデータを渡して状態を更新し、そうでなければ結果値を返す、という手順を追う必要があります。ご安心ください。次で説明する Enumerator
は、まさにこのために存在しています。
次ページ: Enumerators