§反応的なストリーム処理
§Enumeratee の守備範囲
Enumeratee
は Iteratee API の中でとても重要なコンポーネントです。Enumeratee
の役割は、データのストリームを適合・変換することです。その役割をイメージできるという意味で、 Enumeratee
の中でも特に Enumeratee.map
というメソッドが親しみやすいかもしれません。
単純な例から始めてみましょう。まずは、次のような Iteratee
をつくります。
val sum: Iteratee[Int,Int] = Iteratee.fold[Int,Int](0){ (s,e) => s + e }
この Iteratee
は Int
を入力にとり、それらの和を計算します。次は以下のような Enumerator
をつくりましょう。
val strings: Enumerator[String] = Enumerator("1","2","3","4")
さて、ここで string:Enumerator[String]
は明らかに Iteratee[Int,Int]
に適用できません。しかし、 Enumerator が生成する String
を何らかのルールに基づいて Int
へ変換することができれば、ストリームの生成側と消費側がうまく噛み合いそうです。つまり、 Iteratee[Int,Int]
を Iteratee[String,Int]
に適合させるか、もしくは Enumerator[String]
を Enumerator[Int]
へ適合させるか、どちらかを行う必要があるということです。Enumeratee
はまさにこの用途のためにあります。 Enumeratee[String,Int]
を利用すると、 Iteratee[Int,Int]
を目的のインタフェースに適合させることができます。
//create am Enumeratee using the map method on Enumeratee
val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt }
val adaptedIteratee: Iteratee[String,Int] = toInt.transform(sum)
//this works!
strings |>> adaptedIteratee
transform
メソッドと全く同じ意味をもつ演算子 &>>
も利用できます。
strings |>> toInt &>> sum
map
メソッドは引数に渡された From => To
という関数をつかって、From
型の入力データを To
型の値へ変換する Enumeratee
を生成します。Enumeratee
は Enumerator
を変換することもできます。
val adaptedEnumerator: Enumerator[Int] = strings.through(toInt)
//this works!
adaptedEnumerator |>> sum
through
メソッドについても、同じ意味の演算子が用意されています。
strings &> toInt |>> sum
Enumeratee
トレイトに定義されている transform
メソッドのシグネチャを見てみましょう。
trait Enumeratee[From, To] {
def transform[A](inner: Iteratee[To, A]): Iteratee[From, A] = ...
}
かなり簡単なシグネチャです。 Enumerator
に定義されている through
メソッドについても同様です。
trait Enumerator[E] {
def through[To](enumeratee: Enumeratee[E, To]): Enumerator[To]
}
Enumeratee
と Enumerator
における transform
と through
はどちらも Enumeratee
の apply
メソッドを利用しています。こちらのシグネチャはもう少し複雑です。
trait Enumeratee[From, To] {
def apply[A](inner: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]] = ...
}
Enumeratee
が出来るのは、単に Iteratee
の方を変換することだけではありません。 Enumeratee
は取り外し可能なアダプターのようなものなので、 Enumeratee
を通して異なる種類の入力データを送信し終わった後は、本来の Iteratee
に戻すことができます。前述の例でいえば、 本来の Iteratee[Int,Int]
に戻してから、今度は Int
の入力データを送ることができます。
val sum:Iteratee[Int,Int] = Iteratee.fold[Int,Int](0){ (s,e) => s + e }
//create am Enumeratee using the map method on Enumeratee
val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt }
val adaptedIteratee: Iteratee[String,Iteratee[Int,Int]] = toInt(sum)
// pushing some strings
val afterPushingStrings: Future[Iteratee[String,Iteratee[Int,Int]]] = {
Enumerator("1","2","3","4") |>> adaptedIteratee
}
val flattenAndRun:Future[Iteratee[Int,Int]] = Iteratee.flatten(afterPushingStrings).run
val originalIteratee = Iteratee.flatten(flattenAndRun)
val moreInts: Future[Iteratee[Int,Int]] = Enumerator(5,6,7) |>> originalIteratee
val sumFuture:Future[Int] = Iteratee.flatten(moreInts).run
sumFuture onSuccess {
case s => println(s)// eventually prints 28
}
このようなことが可能であるため、変換前の元々の Iteratee
を「内側」、変換後の Iteratee
を「外側」と呼びます。
Enumeratee
の全体像が見えてきた所で、少し重要な話をします。実は、 transform
は内側の Iteratee
が Done
状態になったときにに与えられる最後の入力データを取りこぼしてしまいます。つまり、 Enumeratee.map
を使って入力データを変換すると、内側の Iteratee
が入力データの最後のチャンクとともに Done
状態になった際、 transform
メソッドがそれを無視してしまいます。
この場で説明するには少し詳細に入りすぎていると思われるかもしれませんが、モデルを把握する役には立ちます。
Enumeratee.map
の例に立ち戻って考えてみると、 実はそれより汎用的な Enumeratee.mapInput
というメソッドがあります。これを使うと、任意のタイミングで EOF
を返すことができます。
val toIntOrEnd: Enumeratee[String,Int ] = Enumeratee.mapInput[String] {
case Input.El("end") => Input.EOF
case other => other.map(e => e.toInt)
}
Enumeratee.map
と Enumeratee.mapInput
はかなり直感的です。どちらもチャンクを一つ一つ変換するという機能を持っています。さて、次の便利な Enumeratee
は Enumeratee.filter
です。
def filter[E](predicate: E => Boolean): Enumeratee[E, E]
シグネチャからも明らかかもしれませんが、 Enumeraee.filter
は Enumeratee[E,E]
を生成します。その Enumeratee
は入力データのチャックを predicate: E => Boolean
で一つ一つテストして、predicate が true を返したチャンクだけを内側の Iteratee へ送信します。
val numbers = Enumerator(1,2,3,4,5,6,7,8,9,10)
val onlyOdds = Enumeratee.filter[Int](i => i % 2 != 0)
numbers.through(onlyOdds) |>> sum
その他にも、 Enumeratee.collect
や Enumeratee.drop
、 Enumeratee.dropWhile
、 Enumeratee.take
、 Enumeratee.takeWhile
など、似たような原理の Enumeratee が用意されています。
試しに、バイトデータのチャンクに対して Enumeratee.take
を適用してみましょう。
// computes the size in bytes
val fillInMemory: Iteratee[Array[Byte],Array[Byte]] = {
Iteratee.consume[Array[Byte]]()
}
val limitTo100: Enumeratee[Array[Byte],Array[Byte]] = {
Enumeratee.take[Array[Byte]](100)
}
val limitedFillInMemory: Iteratee[Array[Byte],Array[Byte]] = {
limitTo100 &>> fillInMemory
}
一見問題なさそうにみえますが、実際のところ合計で何バイトのデータが残っているのでしょうか?どうすれば、入力データの最大サイズをうまく制限できるのでしょうか。実は、上の例は入力データのチャンク数を制限しただけで、それぞれのチャンクの大きさは制限できていません。どうやら、Enumeratee.take
は入力データの型(ここでは Array[Byte]
)について何の情報も参照できないので、入力のデータの大きさを測ることもできないようです。
しかし、問題ありません。 Array[Byte]
のような TraversableLike
型の入力データ向けの Enumeratee
を作成するヘルパーが Traversable
オブジェクトにひと通り用意されています。上記の例に戻ると、 TraversableLike.take
を使うとうまくいきます。
val fillInMemory: Iteratee[Array[Byte],Array[Byte]] = {
Iteratee.consume[Array[Byte]]()
}
val limitTo100: Enumeratee[Array[Byte],Array[Byte]] = {
Traversable.take[Array[Byte]](100)
}
// We are sure not to get more than 100 bytes loaded into memory
val limitedFillInMemory: Iteratee[Array[Byte],Array[Byte]] = {
limitTo100 &>> fillInMemory
}
Traversable
その他のメソッドとしては、 Traversable.takeUpTo
や Traversable.drop
などがあります。
最後になりましたが、Enumeratee
のインスタンスは compose
メソッドまたは ><>
演算子により合成することができます。注意しなければならないこととして、合成された Enumeratee
のインスタンスは必ず Done
と共に与えられた最後の入力データのチャンクを無視します。しかしながら、 composeConcat
や >+>
演算子を使うと、最後のチャンクについてもちゃんと読み込まれます。
このドキュメントの翻訳は Play チームによってメンテナンスされているものではありません。 間違いを見つけた場合、このページのソースコードを ここ で確認することができます。 ドキュメントガイドライン を読んで、お気軽にプルリクエストを送ってください。