§反応的なストリーム処理
現代の Web プログラミングにおいて、ストリーム処理は重要なタスクです。これには、チャンク単位のデータダウンロード/アップロードや、 Comet や WebSocket など様々な技術を利用したデータストリームのリアルタイム処理、作成、合成、提供などが含まれます。
Iteratee はこのようなストリーム処理を実現する考え方と API です。 Iteratee の主な特徴は次のとおりです。
- データのストリームを生成、処理、変換することができる
- 様々なデータを同じような形式で扱える (例えば、ディスク上のファイル、WebSocket、Chunked HTTP、データアップロードなど)
- 合成可能: ストリームのソースやコンシューマを、別の型のソースやコンシューマに変換するためのアダプタや変換器が豊富に用意されている
- データの送信を途中で停めたり、データの送信が完了した際に通知を受けることができるようにする
- ノン・ブロッキング、リアクティブで、かつ (Thread、Memoryなどの) リソース消費をコントロールできる
§Iteratee
Iteratee はコンシューマです。Iteratee は入力データの処理方法および結果の生成方法が記述されます。Iteratee は十分な入力データを受け取ると、それに対して何らかの計算を行い結果値を返します。
// an iteratee that consumes String chunks and produces an Int
Iteratee[String,Int]
Iteratee のインタフェースは [Iteratee[E, A]]
のように二つの型パラメータを取ります。E
は入力データの型、A
は結果値の型です。
Iteratee は 3 つの状態を持ちます。それぞれ、Cont
はさらに入力データを受付可能であること、Error
はエラーにより入力を停止したこと、Done
は計算結果が出ていることを表します。これら 3 つのステータスは Iteratee[E,A]
インタフェースの fold
メソッドにより定義することができます。
def fold[B](folder: Step[E, A] => Future[B]): Future[B]
Step
オブジェクトは 3 つの状態を持ちます。
object Step {
case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
case class Cont[E, +A](k: Input[E] => Iteratee[E, A]) extends Step[E, A]
case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}
fold メソッドは、 Iteratee をこれら 3 つの状態のいずれかに定義します。fold メソッドは 3 つのコールバック関数を引数にとり、状態に応じていずれか一つを呼び出し、最終的には結果値を返します。Iteratee に対する fold
の呼び出しは、次のような意味になります。
- Iteratee が
Done
状態であれば、A
という型の計算結果とInput[E]
という型のこれから消費される入力データの最後のチャンクを元に、B
という型の値を生成します。 - Iteratee が
Cont
状態であれば、Input[E] => Iteratee[E,A]
という型の継続 (入力を待ち受ける) を使って、最終的にはB
という型の値を生成します。この状態が Iteratee にデータを入力する唯一のタイミングであり、データを入力後は提供された継続を使って新しい状態の Iteratee を返します。 - Iteratee が
Error
状態であれば、String
型のエラーメッセージと、エラーの原因となった入力データを元に、B
型の値を生成します。
Iteratee の状態に依存して、fold
は引数に渡された関数のいずれかを呼び出して、適切な B
型の値を生成します。
まとめると、 Iteratee には 3 つの状態が存在して、 fold
メソッドは Iteratee の状態に応じて何か実処理を行う方法を提供します。
§Iteratee の定義における重要な型
Iteratee の具体例を見るために、上記で説明した二つの重要な型について詳しく見ていきます。
Input[E]
は入力データのチャンクを表し、実際の入力データを含むEl[E]
かEmpty
チャンクか、またはストリームの終端を表す EOF のいずれかになります。
例えば、Input[String]
はEl("Hello!")
や Empty 、 EOF にいずれかになります。
Future[A]
は、その名の通りA
型の将来における値です。これは、はじめは空であり、最終的に (“約束された”)A
型の値がセットされること、その値を使いたい場合はコールバックやその他の処理を計画できることを意味しています。Future は並列処理を同期させたり、非同期処理を合成する際に便利なデータ構造であり、ScalaAsync の節で詳しく説明されています。
§基本的な Iteratee
Iteratee や、より具体的にはその fold メソッドを定義することで、後々に再利用できる基本的な Iteratee を作成することができます。
1:Int
を生成し、最後のInput[String]
のあとに残る値としてEmpty
を返すDone
状態の Iteratee
val doneIteratee = new Iteratee[String,Int] {
def fold[B](folder: Step[String,Int] => Future[B])(implicit ec: ExecutionContext) : Future[B] =
folder(Step.Done(1, Input.Empty))
}
上記のとおり、このような Iteratee は、この例における Done
のような適切な apply
関数に必要な引数を渡すだけで実装することができます。
この Iteratee を利用するためには、約束された値を持つ Future
を使います。
def folder(step: Step[String,Int]):Future[Option[Int]] = step match {
case Step.Done(a, e) => future(Some(a))
case Step.Cont(k) => future(None)
case Step.Error(msg,e) => future(None)
}
val eventuallyMaybeResult: Future[Option[Int]] = doneIteratee.fold(folder)
eventuallyMaybeResult.onComplete(i => println(i))
redeem されたときに Future
の中身を取得するためには、 onComplete
を使います。
// will eventually print 1
eventuallyMaybeResult.onComplete(i => println(i))
上記の実装をもっと汎用的にしてみましょう。Play には結果と入力値から Done
状態の Iteratee を作るヘルパーが用意されています。
val doneIteratee = Done[String,Int](1, Input.Empty)
Done
Iteratee はこの通りとても簡単に作成でき、役に立つケースも無くはないのですが、入力データをなにも消費しません。次は、入力データのチャンクをひとつ消費して、最終的にそのチャンクを結果値として返すような Iteratee を作ってみましょう。
val consumeOneInputAndEventuallyReturnIt = new Iteratee[String,Int] {
def fold[B](folder: Step[String,Int] => Future[B])(implicit ec: ExecutionContext): Future[B] = {
folder(Step.Cont {
case Input.EOF => Done(0, Input.EOF) //Assuming 0 for default value
case Input.Empty => this
case Input.El(e) => Done(e.toInt,Input.EOF)
})
}
}
def folder(step: Step[String,Int]):Future[Int] = step match {
case Step.Done(a, _) => future(a)
case Step.Cont(k) => k(Input.EOF).fold({
case Step.Done(a1, _) => Future.successful(a1)
case _ => throw new Exception("Erroneous or diverging iteratee")
})
case _ => throw new Exception("Erroneous iteratee")
}
Done
の場合と同様に、Play には Cont
状態の Iteratee を作るためのヘルパーも用意されています。このヘルパーは、 Input[E]
の値を引数にとって、 Iteratee[E,A]
を返します。
val consumeOneInputAndEventuallyReturnIt = {
Cont[String,Int](in => Done(100,Input.Empty))
}
さらに、Error
状態についても、Input[E]
とエラーメッセージを渡すことで Error
状態の Iteratee を作成できるヘルパーが用意されています。
consumeONeINputAndEventuallyReturnIt
の例に立ち戻ると、Cont と Done の 2 ステップのみの単純な Iteratee をベタに実装することは出来そうです。しかし、実際のアプリケーションで使うような、大量の入力データのチャンクを、場合によっては条件付きで消費して、最終的に結果を返すような複雑な Iteratee をするのはなかなかに厄介です。そこで、Play には典型的な Iterateee を作成するためのヘルパーが用意されています。
§入力データの畳込み
Iteratee でよくあるタスクとして、特定の状態を保持して、入力データを受け取るたびにその状態を更新していくような処理があります。この手の Iteratee は Iteratee.fold
で作成することができます。
def fold[E, A](state: A)(f: (A, E) => A): Iteratee[E, A]
シグネチャの通り、この畳込みを行うヘルパーは初期状態 A
、状態と入力データのチャンクを引数にとる (A,E) => A
という関数、入力完了後に E
を消費して A
を返すような Iteratee[E, A]
を引数にとります。作成された Iteratee は EOF
が入力されたタイミングで、 A
型の結果値を含む Done
状態を返します。
試しに、入力データのバイト数を数えるような Iteratee を作ってみましょう。
val inputLength: Iteratee[Array[Byte],Int] = {
Iteratee.fold[Array[Byte],Int](0) { (length, bytes) => length + bytes.size }
}
さらに別の例として、全ての入力データを結合して、最後にそれを返す、という Iteratee を作ってみましょう。
val consume: Iteratee[String,String] = {
Iteratee.fold[String,String]("") { (result, chunk) => result ++ chunk }
}
Iteratee
オブジェクトには、このような Iteratee を任意の TraversableLike
オブジェクトから生成するための consume
というヘルパーが用意されています。
val consume = Iteratee.consume[String]()
このヘルパーの利用例として、入力データのチャンクそれぞれについて、何らかの手続き的な処理を実行する Iteratee を作ってみましょう。
val printlnIteratee = Iteratee.foreach[String](s => println(s))
この他にも repeat
、 ignore
、 fold1
などのヘルパーが用意されています。特に fold1
は、前述の fold
と違い、入力データのチャンクを非同期で処理する機能があります。
さて、これまでの説明を読んだ方は、入力データのチャンクを受け取って、それを Iteratee に畳み込む、という手順を何度も何度も行うのはかなり面倒なのではないかと心配されているかもしれません。確かに、 Iteratee を使ってストリーム処理を行うためには、入力データを受け取るたびにそれを Iteratee に渡して、 fold
関数により状態をチェックして、状態が Cont
であれば次のデータを渡して状態を更新し、そうでなければ結果値を返す、という手順を追う必要があります。ご安心ください。次で説明する Enumerator
は、まさにこのために存在しています。
Next: Enumerators
このドキュメントの翻訳は Play チームによってメンテナンスされているものではありません。 間違いを見つけた場合、このページのソースコードを ここ で確認することができます。 ドキュメントガイドライン を読んで、お気軽にプルリクエストを送ってください。