Documentation

You are viewing the documentation for the 2.2.0 release in the 2.2.x series of releases. The latest stable release series is 2.4.x.

§反応的なストリーム処理

§Enumerator

Iteratee がストリームの消費者やシンク、入力だとすると、 Enumerator は入力データを特定の Iteratee へ渡す送信元であるといえます。その名前が示すとおり、Enumerator は入力データを列挙 (Enumerate) して、 Iteratee に渡していきます。そして、最終的に新しい状態の Iteratee を返します。この挙動は、 Enumerator のシグネチャを見ると想像しやすいでしょう。

trait Enumerator[E] {

  /**
   * Apply this Enumerator to an Iteratee
   */
  def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]

}

Enumerator[E]Iteratee[E,A] を引数に取ります。この Iterate は Input[E] を消費して、 Future[Iteratee[E,A]] を返します。最終的には、この Promise から次の状態の Iteratee を取り出すことができます。

このまま単に Iteratee.fold メソッドを呼び出すことで Enumerator を実装してもよいのですが、 Enumerator 作成用のヘルパーを使うこともできます。例えば、 文字列のリストを Iteratee へ送る Enumerator[String] は次のように作成することができます。

val enumerateUsers: Enumerator[String] = {
  Enumerator("Guillaume", "Sadek", "Peter", "Erwan")
}

この Enumerator を使って、先程作成した Iteratee にデータを消費させるには次のように書きます。

val consume = Iteratee.consume[String]()
val newIteratee: Future[Iteratee[String,String]] = enumerateUsers(consume) 

Iteratee を終了させて、計算結果を取り出すためには、 Input.EOF を渡します。 Iteratee にはそのための run メソッドが用意されています。このメソッドを呼び出すと、 Input.EOF が送信されて、 Future[A] が帰ります。残りの入力データは無視されます。

// We use flatMap since newIteratee is a promise, 
// and run itself return a promise
val eventuallyResult: Future[String] = newIteratee.flatMap(i => i.run)

//Eventually print the result
eventuallyResult.onSuccess { case x => println(x) }

// Prints "GuillaumeSadekPeterErwan"

もしかしたら、 Iteratee が最終的に結果を生成し (fold に適切なコールバック関数を渡した場合は Promise が返ります) 、一方で Future も最終的に結果を生成することに気づかれた方がいるかもしれません。このとき、 Future[Iteratee[E,A]]Iteratee[E,A] と見なすことができます。 Iteratee.flatten はまさしくこの Promise と Iteratee の変換を行うヘルパーです。先程の例でこのヘルパーを使ってみましょう。

//Apply the enumerator and flatten then run the resulting iteratee
val newIteratee = Iteratee.flatten(enumerateUsers(consume))

val eventuallyResult: Future[String] = newIteratee.run
   
//Eventually print the result 
eventuallyResult.onSuccess { case x => println(x) }

// Prints "GuillaumeSadekPeterErwan"

Enumerator には演算子のように振る舞う記号的なメソッドがいくつか用意されています。いずれも、文脈によっては括弧の節約という意味で役に立つことがあるかもしれません。例えば、 |>> メソッドは apply メソッドと全く同じ結果になります。

val eventuallyResult: Future[String] = {
  Iteratee.flatten(enumerateUsers |>> consume).run
}

Enumerator は入力データを Iteratee へ送信して、最終的には新しい状態の Iteratee を返します。この新しい Iteratee に、別の Enumerator を使ってさらに入力データを渡すことができます。これは、 FutureflatMap を適用するか、もしくは Enumerator のインスタンスを andThen メソッドによって組み合わせることで実現できます。

val colors = Enumerator("Red","Blue","Green")

val moreColors = Enumerator("Grey","Orange","Yellow")

val combinedEnumerator = colors.andThen(moreColors)

val eventuallyIteratee = combinedEnumerator(consume)

apply メソッドと同様に、 andThen にも >>> という演算子版が容易されています。これも、状況によっては括弧の節約に役立つでしょう。

val eventuallyIteratee = {
  Enumerator("Red","Blue","Green") >>>
  Enumerator("Grey","Orange","Yellow") |>>
  consume    
}

ファイルの内容を列挙するための Enumerator を作成することもできます。

val fileEnumerator: Enumerator[Array[Byte]] = {
  Enumerator.fromFile(new File("path/to/some/file"))
}

より汎用的には、 Enumerator.fromStream を利用して java.io.InputStream 内のデータを列挙することができます。この場合、 Enumerator に割り当てられている Iteratee が次の入力データを読み込めるような状態になるまで、Enumerator 側でも新しいデータが読み込まれないことに注意してください。

内部的には、これらのメソッドは両方とも Enumerator.fromCallback という関数に依存しています。

def fromCallback[E](
  retriever: () => Future[Option[E]],
  onComplete: () => Unit = () => (),
  onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => ()
): Enumerator[E] = {
  ... 
}

Enumerator オブジェクトに定義されているこのメソッドは、手続き的な処理を行う Enumerator をつくる場合に大変重要なメソッドです。シグネチャをよくみると、このメソッドは retriever: () => Future[Option[E]] というコールバック関数を引数をとることがわかります。このコールバック関数は、 Enumerator に割り当てられている Iteratee が次の入力データを読込可能な状態になった際に呼び出されます。

例えば、このメソッドを利用すると、 Promise を返すタイミングにおいて、100 ミリ秒おきに日時データを生成するストリームを生成することができます。

Enumerator.fromCallback { () =>
  Promise.timeout(Some(new Date), 100 milliseconds)
}

同じような考え方で、WS API を使って特定の URL の内容を一定時間おきに取得して、 Future を返す Enumerator も次のようにつくることができます。

このコールバック Enumerator と手続き的な Iteratee.foreach を組み合わせることで、一定時間おきに Stream から取得した日時データを println することができます。

val timeStream = Enumerator.fromCallback { () => 
  Promise.timeout(Some(new Date), 100 milliseconds)
}

val printlnSink = Iteratee.foreach[Date](date => println(date))

timeStream |>> printlnSink

Enumerator.pushee を利用すると、 Enumerator をさらに手続き的に記述することができます。このメソッドからは、 pushclose というメソッドが定義された Pushee へのインタフェースが提供されます。

val channel = Enumerator.pushee[String] { onStart = pushee =>
  pushee.push("Hello")
  pushee.push("World")
}

channel |>> Iteratee.foreach(println)

この onStart 関数は、EnumeratorIteratee へ適用されるたびに呼び出されます。例えば、チャットルームなどのアプリケーションでは、 Pushee を STM などで同期制御されたグローバル変数と関連付けておき、その変数にチャットルームの参加者のリストを持たせておく、といった利用方法が考えられます。 Enumerator.pushee にはその他に onCompleteonError という関数も用意されています。

最後に一つ興味深いメソッドを紹介します。 interleave または演算子の >- というメソッドです。これはその名の通り、二つの Enumerator を並べて、それぞれから入力データが与えられたら、それに反応して即座に Iteratee へ渡すという、新しい Enumerator を生成します。

§Enumerator アラカルト

さて、これまでの説明で様々な Enumerator の作り方を知ることができました。これらの EnumeratorandThen / >>>interleave / >- で組み合わせて、任意の Enumerator を合成することができます。

もうお気づきかもしれませんが、多数のストリームを要するアプリケーションをうまく構築するためには、基本となる Enumerator を生成して、それらを組み合わせるとよいでしょう。例えば、監視システムを作る場合、次のようなコードになるでしょう。

object AvailableStreams {

  val cpu: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)

  val memory: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)

  val threads: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)

  val heap: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)

}

val physicalMachine = AvailableStreams.cpu >- AvailableStreams.memory
val jvm = AvailableStreams.threads >- AvailableStreams.heap

def usersWidgetsComposition(prefs: Preferences) = {
  // do the composition dynamically
}

次ページでは、 EnumeratorIteratee を変換したり、アダプターをかませる方法… Enumeratee について説明します!

次ページ: Enumeratee