§Handling data streams reactively
§Enumerators
If an iteratee represents the consumer, or sink, of input, an Enumerator
is the source that pushes input into a given iteratee. As the name suggests, it enumerates some input into the iteratee and eventually returns the new state of that iteratee. This can be easily seen looking at the Enumerator
’s signature:
trait Enumerator[E] {
/**
* Apply this Enumerator to an Iteratee
*/
def apply[A](i: Iteratee[E, A]): Promise[Iteratee[E, A]]
}
An Enumerator[E]
takes an Iteratee[E,A]
which is any iteratee that consumes Input[E]
and returns a Promise[Iteratee[E,A]]
which eventually gives the new state of the iteratee.
We can go ahead and manually implement Enumerator
instances by consequently calling the iteratee’s fold method, or use one of the provided Enumerator
creation methods. For instance we can create an Enumerator[String]
that pushes a list of strings into an iteratee, like the following:
val enumerateUsers: Enumerator[String] = {
Enumerator("Guillaume", "Sadek", "Peter", "Erwan")
}
Now we can apply it to the consume iteratee we created before:
val consume = Iteratee.consume[String]()
val newIteratee: Future[Iteratee[String,String]] = enumerateUsers(consume)
To terminate the iteratee and extract the computed result we pass Input.EOF
. An Iteratee
carries a run
method that does just this. It pushes an Input.EOF
and returns a Promise[A]
, ignoring left input if any.
// We use flatMap since newIteratee is a promise,
// and run itself return a promise
val eventuallyResult: Future[String] = newIteratee.flatMap(i => i.run)
//Eventually print the result
eventuallyResult.onSuccess { case x => println(x) }
// Prints "GuillaumeSadekPeterErwan"
You might notice here that an Iteratee
will eventually produce a result (returning a promise when calling fold and passing appropriate calbacks), and a Promise
eventually produces a result. Then a Promise[Iteratee[E,A]]
can be viewed as Iteratee[E,A]
. Indeed this is what Iteratee.flatten
does, Let’s apply it to the previous example:
//Apply the enumerator and flatten then run the resulting iteratee
val newIteratee = Iteratee.flatten(enumerateUsers(consume))
val eventuallyResult: Future[String] = newIteratee.run
//Eventually print the result
eventuallyResult.onSuccess { case x => println(x) }
// Prints "GuillaumeSadekPeterErwan"
An Enumerator
has some symbolic methods that can act as operators, which can be useful in some contexts for saving some parentheses. For example, the |>>
method works exactly like apply:
val eventuallyResult: Future[String] = {
Iteratee.flatten(enumerateUsers |>> consume).run
}
Since an Enumerator
pushes some input into an iteratee and eventually return a new state of the iteratee, we can go on pushing more input into the returned iteratee using another Enumerator
. This can be done either by using the flatMap
function on Promise
s or more simply by combining Enumerator
instancess using the andThen
method, as follows:
val colors = Enumerator("Red","Blue","Green")
val moreColors = Enumerator("Grey","Orange","Yellow")
val combinedEnumerator = colors.andThen(moreColors)
val eventuallyIteratee = combinedEnumerator(consume)
As for apply, there is a symbolic version of the andThen
called >>>
that can be used to save some parentheses when appropriate:
val eventuallyIteratee = {
Enumerator("Red","Blue","Green") >>>
Enumerator("Grey","Orange","Yellow") |>>
consume
}
We can also create Enumerator
s for enumerating files contents:
val fileEnumerator: Enumerator[Array[Byte]] = {
Enumerator.fromFile(new File("path/to/some/file"))
}
Or more generally enumerating a java.io.InputStream
using Enumerator.fromStream
. It is important to note that input won’t be read until the iteratee this Enumerator
is applied on is ready to take more input.
Actually both methods are based on the more generic Enumerator.fromCallback
that has the following signature:
def fromCallback[E](
retriever: () => Promise[Option[E]],
onComplete: () => Unit = () => (),
onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => ()
): Enumerator[E] = {
...
}
This method defined on the Enumerator
object is one of the most important methods for creating Enumerator
s from imperative logic. Looking closely at the signature, this method takes a callback function retriever: () => Promise[Option[E]]
that will be called each time the iteratee this Enumerator
is applied to is ready to take some input.
It can be easily used to create an Enumerator
that represents a stream of time values every 100 millisecond using the opportunity that we can return a promise, like the following:
Enumerator.fromCallback { () =>
Promise.timeout(Some(new Date), 100 milliseconds)
}
In the same manner we can construct an Enumerator
that would fetch a url every some time using the WS
api which returns, not suprisingly a Promise
Combining this, callback Enumerator, with an imperative Iteratee.foreach
we can println a stream of time values periodically:
val timeStream = Enumerator.fromCallback { () =>
Promise.timeout(Some(new Date), 100 milliseconds)
}
val printlnSink = Iteratee.foreach[Date](date => println(date))
timeStream |>> printlnSink
Another, more imperative, way of creating an Enumerator
is by using Enumerator.pushee
which once it is ready will give a Pushee
interface on which defined methods push
and close
:
val channel = Enumerator.pushee[String] { onStart = pushee =>
pushee.push("Hello")
pushee.push("World")
}
channel |>> Iteratee.foreach(println)
The onStart
function will be called each time the Enumerator
is applied to an Iteratee
. In some applications, a chatroom for instance, it makes sense to assign the pushee to a synchronized global value (using STMs for example) that will contain a list of listeners. Enumerator.pushee
accepts two other functions, onComplete
and onError
.
One more interesting method is the interleave
or >-
method which as the name says, itrerleaves two Enumerators. For reactive Enumerator
s Input will be passed as it happens from any of the interleaved Enumerator
s
§Enumerators à la carte
Now that we have several interesting ways of creating Enumerator
s, we can use these together with composition methods andThen
/ >>>
and interleave
/ >-
to compose Enumerator
s on demand.
Indeed one interesting way of organizing a streamful application is by creating primitive Enumerator
s and then composing a collection of them. Let’s imagine doing an application for monitoring systems:
object AvailableStreams {
val cpu: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)
val memory: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)
val threads: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)
val heap: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)
}
val physicalMachine = AvailableStreams.cpu >- AvailableStreams.memory
val jvm = AvailableStreams.threads >- AvailableStreams.heap
def usersWidgetsComposition(prefs: Preferences) = {
// do the composition dynamically
}
Now, it is time to adapt and transform Enumerator
s and Iteratee
s using … Enumeratee
s!
Next: Enumeratees