HTTP による非同期プログラミング
この節では、数千の同時接続までスケールし得る典型的な長時間ポーリング、ストリーミング、そしてその他の Comet スタイル のアプリケーションを達成するために、Play アプリケーションで非同期処理を取り扱う方法を説明します。
HTTP リクエストの中断
Play はとても短いリクエストで動作することを意図しています。Play は HTTP コネクタによってキューイングされたリクエストを処理するために固定のスレッドプールを使用します。最適な結果を得るために、このスレッドプールは可能な限り小さくあるべきです。デフォルトのプールサイズを設定するための最適値として、典型的には プロセッサ数 + 1
を使用します。
これは、もしリクエストの処理時間がとても長い場合 (例えば、長い計算を待つなど) に、リクエストがスレッドプールをブロックし、アプリケーションの応答性に不利益をもたらすことを意味します。もちろん、プールにより多くのスレッドを追加することもできますが、リソースを浪費する結果となるかも知れませんし、いずれにしてもプールのサイズは決して無限にはなりません。
ブラウザが画面に表示する新しいメッセージを待つためにブロッキング HTTP リクエストを送信するチャットアプリケーションを考えてみましょう。これらのリクエストはとてもとても長く (典型的には数秒) なることがあり、スレッドプールをブロックします。もしこのチャットアプリケーションに 100 ユーザが同時に接続できるよう計画しているのであれば、最低でも 100 スレッドを供給する必要があるでしょう。ええ、それくらいなら実現可能です。でも 1,000 ユーザではどうでしょう? 10,000 ユーザなら?
これらのユースケースを解決するために、Play ではリクエストを一時的に中断することができます。HTTP リクエストは接続されたままですが、リクエストの実行はスレッドプールの外に押し出され、あとで再実行されます。固定した遅延のあとにリクエストを実行するか、または Promise
の値が利用可能になるのを待つよう Play に伝えることができます。
Tip. 本物の例を samples-and-tests/chat
で見ることができます。
例えば、このアクションはとても長いジョブを起動し、HTTP レスポンスに結果を返す前にジョブの完了を待機します:
public static void generatePDF(Long reportId) {
Promise<InputStream> pdf = new ReportAsPDFJob(report).now();
InputStream pdfStream = await(pdf);
renderBinary(pdfStream);
}
ここでは Promise<InputStream>
が回復するまでリクエストを中断するよう Play に依頼するために await(…)
を使用しています。
継続
フレームワークは、他のリクエストに供給するために使用していたスレッドを回収する必要があるので、コードの実行を中断しなければなりません。以前のバージョンの Play の await(…)
は、アクションを中断し、その後に最初から再度実行する waitFor(…)
と等価でした。
非同期処理をより簡単に扱うために、継続を紹介します。継続はコードの中断と透過的な再開を可能にします。このため、以下のようにとても命令的にコードを書くことができます:
public static void computeSomething() {
Promise<String> delayedResult = veryLongComputation(…);
String result = await(delayedResult);
render(result);
}
ここでは実際のところ、コードは二つのステップ、二つの異なるスレッドで実行されます。しかし、ご覧のとおりアプリケーションコードはとても透過的です。
await(…)
と継続を使って、以下のようにループを書くことができます:
public static void loopWithoutBlocking() {
for(int i=0; i<=10; i++) {
Logger.info(i);
await("1s");
}
renderText("Loop finished");
}
リクエストを処理するためにひとつのスレッドしか使いませんが、Play はこれらのループを複数のリクエストに対して同時に並列実行することができます。
より現実的な例として、リモート URL からの非同期なコンテンツ取得があります。次の例では、三つのリモート HTTP リクエストを平行して実行します: それぞれの play.libs.WS.WSRequest.getAsync()
メソッドに対する呼び出しは非同期に GET リクエストを実行し、 play.libs.F.Promise
を返却します。このアクションメソッドは、三つの Promise
インスタンスの組み合わせについて await(…)
を呼び出すことで、受け取った HTTP リクエストを中断します。三つ全てのリモート呼び出しがレスポンスを生成したら、スレッドは処理を再開し、レスポンスをレンダリングします。
public class AsyncTest extends Controller {
public static void remoteData() {
F.Promise<WS.HttpResponse> r1 = WS.url("http://example.org/1").getAsync();
F.Promise<WS.HttpResponse> r2 = WS.url("http://example.org/2").getAsync();
F.Promise<WS.HttpResponse> r3 = WS.url("http://example.org/3").getAsync();
F.Promise<List<WS.HttpResponse>> promises = F.Promise.waitAll(r1, r2, r3);
// Suspend processing here, until all three remote calls are complete.
List<WS.HttpResponse> httpResponses = await(promises);
render(httpResponses);
}
}
コールバック
前述した三つの非同期リモートリクエストの例を実装する別の方法は、コールバックを使うことです。ここでは await(…)
の呼び出しに、すべての promises
が完了した時に実行されるコールバックである play.libs.F.Action
の実装が含まれています。
public class AsyncTest extends Controller {
public static void remoteData() {
F.Promise<WS.HttpResponse> r1 = WS.url("http://example.org/1").getAsync();
F.Promise<WS.HttpResponse> r2 = WS.url("http://example.org/2").getAsync();
F.Promise<WS.HttpResponse> r3 = WS.url("http://example.org/3").getAsync();
F.Promise<List<WS.HttpResponse>> promises = F.Promise.waitAll(r1, r2, r3);
// Suspend processing here, until all three remote calls are complete.
await(promises, new F.Action<List<WS.HttpResponse>>() {
public void invoke(List<WS.HttpResponse> httpResponses) {
render(httpResponses);
}
});
}
}
HTTP レスポンスストリーミング
これで、リクエストをブロックせずにループを行うことができるようになったので、処理結果の一部が利用可能になり次第、ブラウザにデータを送りたくなるのではないでしょうか。それこそが Content-Type:Chunked
HTTP レスポンス型のポイントです。複数のチャンクを使って HTTP レスポンスを何度も送ることができます。ブラウザはこれらのチャンクが発行され次第、これを受け取ります。
今では await(…)
と継続を使ってこれを達成することができます:
public static void generateLargeCSV() {
CSVGenerator generator = new CSVGenerator();
response.contentType = "text/csv";
while(generator.hasMoreData()) {
String someCsvData = await(generator.nextDataChunk());
response.writeChunk(someCsvData);
}
}
もし CSV の生成に一時間かかったとしても、生成されたデータが利用可能になり次第、クライアントに送り返すことで、Play はひとつのスレッドを使って複数のリクエストを同時に処理することができます。
WebSockets の使用
WebSockets は、ブラウザとアプリケーション間の双方向コミュニケーションチャンネルを開く、ひとつの方法です。ブラウザ側で “ws://” という url を使ってソケットを開きます。
new Socket("ws://localhost:9000/helloSocket?name=Guillaume")
Play 側では WS ルートを定義します:
WS /helloSocket MyWebSocket.hello
MyWebSocket
は WebSocketController
です。WebSocket コントローラは標準的な HTTP コントローラに似ていますが、異なる概念を取り扱います。
- リクエストオブジェクトを持ちますが、レスポンスオブジェクトは持ちません。
- セッションにアクセスできますが、読み出し専用です。
renderArgs
,routeArgs
とフラッシュスコープを持ちません。- ルートパターンまたはクエリ文字列からのパラメータのみ読むことができます。
- 二つのコミュニケーションチャンネル: inbound と outbound を持ちます。
クライアントが ws://localhost:9000/helloSocket
ソケットに接続すると、Play は MyWebSocket.hello
アクションメソッドを起動します。一旦 MyWebSocket.hello
アクションメソッドが終了するとソケットは閉じられます。
このため、とても基本的なソケットの例は以下のようになります:
public class MyWebSocket extends WebSocketController {
public static void hello(String name) {
outbound.send("Hello %s!", name);
}
}
ここでは、クライアントはソケットに接続すると‘Hello Guillaume’というメッセージを受け取り、その後 Play はソケットを閉じます。
もちろん、通常は直ちにソケットを閉じたいと思わないでしょう。これは await(…)
と継続を使って容易に達成できます。
基本的なエコーサーバの例です:
public class MyWebSocket extends WebSocketController {
public static void echo() {
while(inbound.isOpen()) {
WebSocketEvent e = await(inbound.nextEvent());
if(e instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame)e;
if(!e.isBinary) {
if(frame.textData.equals("quit")) {
outbound.send("Bye!");
disconnect();
} else {
outbound.send("Echo: %s", frame.textData);
}
}
}
if(e instanceof WebSocketClose) {
Logger.info("Socket closed!");
}
}
}
}
上記の例において、ネストされた‘if’と‘キャスト’の海は書くのが退屈でエラーを起こしがちでした。この点で Java は最低です。このようなシンプルな場合でさえ容易に扱えません。複数のストリームを結びつけ、より多くのイベントタイプが存在するような、より複雑なケースにおいては悪夢のようになることでしょう。
これこそが、私たちが play.libs.F ライブラリにおいてある種の基本的なパターンマッチングを導入した理由です。
これにより上記の例を次のように書き直すことができます:
public static void echo() {
while(inbound.isOpen()) {
WebSocketEvent e = await(inbound.nextEvent());
for(String quit: TextFrame.and(Equals("quit")).match(e)) {
outbound.send("Bye!");
disconnect();
}
for(String message: TextFrame.match(e)) {
outbound.send("Echo: %s", message);
}
for(WebSocketClose closed: SocketClosed.match(e)) {
Logger.info("Socket closed!");
}
}
}
考察を続けます
次は Ajax リクエスト を行ってみましょう。