§反応的なストリーム処理
§Enumerator
Iteratee がストリームの消費者やシンク、入力だとすると、 Enumerator
は入力データを特定の Iteratee へ渡す送信元であるといえます。その名前が示すとおり、Enumerator
は入力データを列挙 (Enumerate) して、 Iteratee に渡していきます。そして、最終的に新しい状態の Iteratee を返します。この挙動は、 Enumerator
のシグネチャを見ると想像しやすいでしょう。
trait Enumerator[E] {
/**
* Apply this Enumerator to an Iteratee
*/
def apply[A](i: Iteratee[E, A]): Promise[Iteratee[E, A]]
}
Enumerator[E]
は Iteratee[E,A]
を引数に取ります。この Iterate は Input[E]
を消費して、 Promise[Iteratee[E,A]]
を返します。最終的には、この Promise から次の状態の Iteratee を取り出すことができます。
このまま単に Iteratee.fold メソッドを呼び出すことで Enumerator
を実装してもよいのですが、 Enumerator
作成用のヘルパーを使うこともできます。例えば、 文字列のリストを Iteratee へ送る Enumerator[String]
は次のように作成することができます。
val enumerateUsers: Enumerator[String] = {
Enumerator("Guillaume", "Sadek", "Peter", "Erwan")
}
この Enumerator を使って、先程作成した Iteratee にデータを消費させるには次のように書きます。
val consume = Iteratee.consume[String]()
val newIteratee: Future[Iteratee[String,String]] = enumerateUsers(consume)
Iteratee を終了させて、計算結果を取り出すためには、 Input.EOF
を渡します。 Iteratee
にはそのための run
メソッドが用意されています。このメソッドを呼び出すと、 Input.EOF
が送信されて、 Promise[A]
が帰ります。残りの入力データは無視されます。
// We use flatMap since newIteratee is a promise,
// and run itself return a promise
val eventuallyResult: Future[String] = newIteratee.flatMap(i => i.run)
//Eventually print the result
eventuallyResult.onSuccess { case x => println(x) }
// Prints "GuillaumeSadekPeterErwan"
もしかしたら、 Iteratee
が最終的に結果を生成し (fold に適切なコールバック関数を渡した場合は Promise が返ります) 、一方で Promise
も最終的に結果を生成することに気づかれた方がいるかもしれません。このとき、 Promise[Iteratee[E,A]]
は Iteratee[E,A]
と見なすことができます。 Iteratee.flatten
はまさしくこの Promise と Iteratee の変換を行うヘルパーです。先程の例でこのヘルパーを使ってみましょう。
//Apply the enumerator and flatten then run the resulting iteratee
val newIteratee = Iteratee.flatten(enumerateUsers(consume))
val eventuallyResult: Future[String] = newIteratee.run
//Eventually print the result
eventuallyResult.onSuccess { case x => println(x) }
// Prints "GuillaumeSadekPeterErwan"
Enumerator
には演算子のように振る舞う記号的なメソッドがいくつか用意されています。いずれも、文脈によっては括弧の節約という意味で役に立つことがあるかもしれません。例えば、 |>>
メソッドは apply メソッドと全く同じ結果になります。
val eventuallyResult: Future[String] = {
Iteratee.flatten(enumerateUsers |>> consume).run
}
Enumerator
は入力データを Iteratee へ送信して、最終的には新しい状態の Iteratee を返します。この新しい Iteratee に、別の Enumerator
を使ってさらに入力データを渡すことができます。これは、 Promise
に flatMap
を適用するか、もしくは Enumerator
のインスタンスを andThen
メソッドによって組み合わせることで実現できます。
val colors = Enumerator("Red","Blue","Green")
val moreColors = Enumerator("Grey","Orange","Yellow")
val combinedEnumerator = colors.andThen(moreColors)
val eventuallyIteratee = combinedEnumerator(consume)
apply メソッドと同様に、 andThen
にも >>>
という演算子版が容易されています。これも、状況によっては括弧の節約に役立つでしょう。
val eventuallyIteratee = {
Enumerator("Red","Blue","Green") >>>
Enumerator("Grey","Orange","Yellow") |>>
consume
}
ファイルの内容を列挙するための Enumerator
を作成することもできます。
val fileEnumerator: Enumerator[Array[Byte]] = {
Enumerator.fromFile(new File("path/to/some/file"))
}
より汎用的には、 Enumerator.fromStream
を利用して java.io.InputStream
内のデータを列挙することができます。この場合、 Enumerator
に割り当てられている Iteratee が次の入力データを読み込めるような状態になるまで、Enumerator 側でも新しいデータが読み込まれないことに注意してください。
内部的には、これらのメソッドは両方とも Enumerator.fromCallback
という関数に依存しています。
def fromCallback[E](
retriever: () => Promise[Option[E]],
onComplete: () => Unit = () => (),
onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => ()
): Enumerator[E] = {
...
}
Enumerator
オブジェクトに定義されているこのメソッドは、手続き的な処理を行う Enumerator
をつくる場合に大変重要なメソッドです。シグネチャをよくみると、このメソッドは retriever: () => Promise[Option[E]]
というコールバック関数を引数をとることがわかります。このコールバック関数は、 Enumerator
に割り当てられている Iteratee が次の入力データを読込可能な状態になった際に呼び出されます。
例えば、このメソッドを利用すると、 Promise を返すタイミングにおいて、100 ミリ秒おきに日時データを生成するストリームを生成することができます。
Enumerator.fromCallback { () =>
Promise.timeout(Some(new Date), 100 milliseconds)
}
同じような考え方で、WS
API を使って特定の URL の内容を一定時間おきに取得して、 Promise
を返す Enumerator
も次のようにつくることができます。
このコールバック Enumerator と手続き的な Iteratee.foreach
を組み合わせることで、一定時間おきに Stream から取得した日時データを println することができます。
val timeStream = Enumerator.fromCallback { () =>
Promise.timeout(Some(new Date), 100 milliseconds)
}
val printlnSink = Iteratee.foreach[Date](date => println(date))
timeStream |>> printlnSink
Enumerator.pushee
を利用すると、 Enumerator
をさらに手続き的に記述することができます。このメソッドからは、 push
と close
というメソッドが定義された Pushee
へのインタフェースが提供されます。
val channel = Enumerator.pushee[String] { onStart = pushee =>
pushee.push("Hello")
pushee.push("World")
}
channel |>> Iteratee.foreach(println)
この onStart
関数は、Enumerator
が Iteratee
へ適用されるたびに呼び出されます。例えば、チャットルームなどのアプリケーションでは、 Pushee を STM などで同期制御されたグローバル変数と関連付けておき、その変数にチャットルームの参加者のリストを持たせておく、といった利用方法が考えられます。 Enumerator.pushee
にはその他に onComplete
、 onError
という関数も用意されています。
最後に一つ興味深いメソッドを紹介します。 interleave
または演算子の >-
というメソッドです。これはその名の通り、二つの Enumerator
を並べて、それぞれから入力データが与えられたら、それに反応して即座に Iteratee へ渡すという、新しい Enumerator
を生成します。
§Enumerator アラカルト
さて、これまでの説明で様々な Enumerator
の作り方を知ることができました。これらの Enumerator
を andThen
/ >>>
や interleave
/ >-
で組み合わせて、任意の Enumerator
を合成することができます。
もうお気づきかもしれませんが、多数のストリームを要するアプリケーションをうまく構築するためには、基本となる Enumerator
を生成して、それらを組み合わせるとよいでしょう。例えば、監視システムを作る場合、次のようなコードになるでしょう。
object AvailableStreams {
val cpu: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)
val memory: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)
val threads: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)
val heap: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)
}
val physicalMachine = AvailableStreams.cpu >- AvailableStreams.memory
val jvm = AvailableStreams.threads >- AvailableStreams.heap
def usersWidgetsComposition(prefs: Preferences) = {
// do the composition dynamically
}
次ページでは、 Enumerator
や Iteratee
を変換したり、アダプターをかませる方法… Enumeratee
について説明します!
次ページ: Enumeratee
このドキュメントの翻訳は Play チームによってメンテナンスされているものではありません。 間違いを見つけた場合、このページのソースコードを ここ で確認することができます。 ドキュメントガイドライン を読んで、お気軽にプルリクエストを送ってください。