§反応的なストリーム処理
§Enumerator
Iteratee がストリームの消費者やシンク、入力だとすると、 Enumerator
は入力データを特定の Iteratee へ渡す送信元であるといえます。その名前が示すとおり、Enumerator
は入力データを列挙 (Enumerate) して、 Iteratee に渡していきます。そして、最終的に新しい状態の Iteratee を返します。この挙動は、 Enumerator
のシグネチャを見ると想像しやすいでしょう。
trait Enumerator[E] {
/**
* Apply this Enumerator to an Iteratee
*/
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]
}
Enumerator[E]
は Iteratee[E,A]
を引数に取ります。この Iterate は Input[E]
を消費して、 Future[Iteratee[E,A]]
を返します。最終的には、この Promise から次の状態の Iteratee を取り出すことができます。
このまま単に Iteratee.fold メソッドを呼び出すことで Enumerator
を実装してもよいのですが、 Enumerator
作成用のヘルパーを使うこともできます。例えば、 文字列のリストを Iteratee へ送る Enumerator[String]
は次のように作成することができます。
val enumerateUsers: Enumerator[String] = {
Enumerator("Guillaume", "Sadek", "Peter", "Erwan")
}
この Enumerator を使って、先程作成した Iteratee にデータを消費させるには次のように書きます。
val consume = Iteratee.consume[String]()
val newIteratee: Future[Iteratee[String,String]] = enumerateUsers(consume)
Iteratee を終了させて、計算結果を取り出すためには、 Input.EOF
を渡します。 Iteratee
にはそのための run
メソッドが用意されています。このメソッドを呼び出すと、 Input.EOF
が送信されて、 Future[A]
が返ります。残りの入力データは無視されます。
// We use flatMap since newIteratee is a promise,
// and run itself return a promise
val eventuallyResult: Future[String] = newIteratee.flatMap(i => i.run)
//Eventually print the result
eventuallyResult.onSuccess { case x => println(x) }
// Prints "GuillaumeSadekPeterErwan"
もしかしたら、 Iteratee
が最終的に結果を生成し (fold に適切なコールバック関数を渡した場合は Promise が返ります) 、一方で Future
も最終的に結果を生成することに気づかれた方がいるかもしれません。このとき、 Future[Iteratee[E,A]]
は Iteratee[E,A]
と見なすことができます。 Iteratee.flatten
はまさしくこの Promise と Iteratee の変換を行うヘルパーです。先程の例でこのヘルパーを使ってみましょう。
//Apply the enumerator and flatten then run the resulting iteratee
val newIteratee = Iteratee.flatten(enumerateUsers(consume))
val eventuallyResult: Future[String] = newIteratee.run
//Eventually print the result
eventuallyResult.onSuccess { case x => println(x) }
// Prints "GuillaumeSadekPeterErwan"
Enumerator
には演算子のように振る舞う記号的なメソッドがいくつか用意されています。いずれも、文脈によっては括弧の節約という意味で役に立つことがあるかもしれません。例えば、 |>>
メソッドは apply メソッドと全く同じ結果になります。
val eventuallyResult: Future[String] = {
Iteratee.flatten(enumerateUsers |>> consume).run
}
Enumerator
は入力データを Iteratee へ送信して、最終的には新しい状態の Iteratee を返します。この新しい Iteratee に、別の Enumerator
を使ってさらに入力データを渡すことができます。これは、 Future
に flatMap
を適用するか、もしくは Enumerator
のインスタンスを andThen
メソッドによって組み合わせることで実現できます。
val colors = Enumerator("Red","Blue","Green")
val moreColors = Enumerator("Grey","Orange","Yellow")
val combinedEnumerator = colors.andThen(moreColors)
val eventuallyIteratee = combinedEnumerator(consume)
apply メソッドと同様に、 andThen
にも >>>
という演算子版が用意されています。これも、状況によっては括弧の節約に役立つでしょう。
val eventuallyIteratee = {
Enumerator("Red","Blue","Green") >>>
Enumerator("Grey","Orange","Yellow") |>>
consume
}
ファイルの内容を列挙するための Enumerator
を作成することもできます。
val fileEnumerator: Enumerator[Array[Byte]] = {
Enumerator.fromFile(new File("path/to/some/file"))
}
より汎用的には、 Enumerator.fromStream
を利用して java.io.InputStream
内のデータを列挙することができます。この場合、 Enumerator
に割り当てられている Iteratee が次の入力データを読み込めるような状態になるまで、Enumerator 側でも新しいデータが読み込まれないことに注意してください。
実際に両方のメソッドは、下記のシグネチャを持つ、より汎用的な Enumerator.generateM
をベースにしています。
def generateM[E](e: => Future[Option[E]]) = {
...
}
Enumerator
オブジェクトに定義されたこのメソッドは、手続き的なロジックから Enumerator
を生成するためのもっとも重要なメソッドのひとつです。シグネチャをよく見てみると、このメソッドは、Enumerator
が割り当てられている iteratee が入力を受け付けられるようになるたびに呼び出されるコールバック関数 e: => Future[Option[E]]
を取ります。
例えば、このメソッドを利用すると、 Promise を返すタイミングにおいて、100 ミリ秒おきに日時データを生成するストリームを生成することができます。
Enumerator.generateM {
Promise.timeout(Some(new Date), 100 milliseconds)
}
同じような考え方で、WS
API を使って特定の URL の内容を一定時間おきに取得して、 Future
を返す Enumerator
も次のようにつくることができます。
このコールバック Enumerator と手続き的な Iteratee.foreach
を組み合わせることで、一定時間おきに Stream から取得した日時データを println することができます。
val timeStream = Enumerator.generateM {
Promise.timeout(Some(new Date), 100 milliseconds)
}
val printlnSink = Iteratee.foreach[Date](date => println(date))
timeStream |>> printlnSink
Enumerator
を生成する、より命令的なもうひとつの方法は、準備が整い次第、push
および end
メソッドが定義されている Channel
インターフェースを提供する Concurrent.unicast
を使うことです。
val enumerator = Concurrent.unicast[String](onStart = channel => {
channel.push("Hello")
channel.push("World")
})
enumerator |>> Iteratee.foreach(println)
onStart
関数は、Enumerator
が Iteratee
に割り当てられる度に呼ばれます。例えばチャットルームのような、いくつかのアプリーケーションにおいて、(例えばSTMを使用して)同期された、リスナーのリストに含むグローバル値に enumerator
を割り当てることは理にかなっています。Concurrent.unicast
は他に onComplete
と onError
という 2 つの関数を受け付けます。
最後に一つ興味深いメソッドを紹介します。 interleave
または演算子の >-
というメソッドです。これはその名の通り、二つの Enumerator
を並べて、それぞれから入力データが与えられたら、それに反応して即座に Iteratee へ渡すという、新しい Enumerator
を生成します。
§Enumerator アラカルト
さて、これまでの説明で様々な Enumerator
の作り方を知ることができました。これらの Enumerator
を andThen
/ >>>
や interleave
/ >-
で組み合わせて、任意の Enumerator
を合成することができます。
もうお気づきかもしれませんが、多数のストリームを要するアプリケーションをうまく構築するためには、基本となる Enumerator
を生成して、それらを組み合わせるとよいでしょう。例えば、監視システムを作る場合、次のようなコードになるでしょう。
object AvailableStreams {
val cpu: Enumerator[JsValue] = Enumerator.generateM(/* code here */)
val memory: Enumerator[JsValue] = Enumerator.generateM(/* code here */)
val threads: Enumerator[JsValue] = Enumerator.generateM(/* code here */)
val heap: Enumerator[JsValue] = Enumerator.generateM(/* code here */)
}
val physicalMachine = AvailableStreams.cpu >- AvailableStreams.memory
val jvm = AvailableStreams.threads >- AvailableStreams.heap
def usersWidgetsComposition(prefs: Preferences) = {
// do the composition dynamically
}
次ページでは、 Enumerator
や Iteratee
を変換したり、アダプターをかませる方法… Enumeratee
について説明します!
Next: Enumeratees
このドキュメントの翻訳は Play チームによってメンテナンスされているものではありません。 間違いを見つけた場合、このページのソースコードを ここ で確認することができます。 ドキュメントガイドライン を読んで、お気軽にプルリクエストを送ってください。