PK c>u u build.xml
PK c>M M commands.py# Here you can create play commands that are specific to the module, and extend existing commands
MODULE = 'play-rabbitmq'
# Commands that are specific to your module
COMMANDS = ['play-rabbitmq:hello']
def execute(**kargs):
command = kargs.get("command")
app = kargs.get("app")
args = kargs.get("args")
env = kargs.get("env")
if command == "play-rabbitmq:hello":
print "~ Hello"
# This will be executed before any command (new, run...)
def before(**kargs):
command = kargs.get("command")
app = kargs.get("app")
args = kargs.get("args")
env = kargs.get("env")
# This will be executed after any command (new, run...)
def after(**kargs):
command = kargs.get("command")
app = kargs.get("app")
args = kargs.get("args")
env = kargs.get("env")
if command == "new":
pass
PK ;>Db$ $ manifestversion=0.0.7
frameworkVersions=1.2
PK m>h README.textileh1. Play! Framework RabbitMQ Module
Felipe Oliveira
http://mashup.fm
http://playframework.info
http://geeks.aretotally.in
http://twitter.com/_felipera
h3. 1) Installation
Under dependencies.yml:
require:
- play -> rabbitmq 0.0.6
h3. 2) RabbitMQ
Download RabbitMQ from http://www.rabbitmq.com/server.html.
Unzip the distribution, go to folder sbin and run "./rabbitmq-server".
It should be running on port 5762 by default. The default user "guest" and password is "guest".
h3. 3) Configuration
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.vhost=/
rabbitmq.user=guest
rabbitmq.password=guest
rabbitmq.exchangeType=direct
rabbitmq.durable=true
rabbitmq.autoAck=false
rabbitmq.basicQos=true
rabbitmq.retries=5
rabbitmq.msgmapper=json
rabbitmq.host -> I am sure you can guess this one
rabbitmq.port -> I am sure you can guess this one
rabbitmq.vhost -> If you don't know what this is I recommend reading the Admin Guide for RabbitMQ (http://www.rabbitmq.com/admin-guide.html)
rabbitmq.user -> I am sure you can guess this one. guest is the default user on a fresh RabbitMQ install.
rabbitmq.password -> I am sure you can guess this one. guest is the default value on a fresh RabbitMQ install.
rabbitmq.exchangeType -> These are the different exchangeTypes provided by RabbitMQ like direct, fanout, etc. If you are not familiar with them, please checkout RabbitMQ's doc (http://www.rabbitmq.com/getstarted.html).
rabbitmq.durable -> Durable message. You most likely want to keep that as true if your app requires reliability in your message deliveries.
rabbitmq.autoAck -> If true you are basically turning off any form of retry so you probabily don't want that.
rabbitmq.basicQos -> Please read RabbitMQ's documentation for more information on basicQos (https://dev.rabbitmq.com/wiki/BasicQosDesign?version=15b1bab0ac73).
rabbitmq.retries -> Max number of retries per message. This number can be overwritten on a queue level by overriding the retries() method on your consumer class.
rabbitmq.msgmapper -> Possible values are "pojo" or "json". These are the different implementations used to store and retrieve messages, "json" uses Jackson's ObjectMapper and "pojo" uses Java's serialization.
h3. 4) Define Message that will be used by the Queue (just a simple POJO)
public class SampleMessage implements Serializable {
private String field1;
private String field2;
public SampleMessage() {
}
public SampleMessage(String field1, String field2) {
super();
this.field1 = field1;
this.field2 = field2;
}
public String getField1() {
return field1;
}
public void setField1(String field1) {
this.field1 = field1;
}
public String getField2() {
return field2;
}
public void setField2(String field2) {
this.field2 = field2;
}
@Override
public String toString() {
return "SampleMessage [field1=" + field1 + ", field2=" + field2 + "]";
}
}
h3. 5) Publish a Message
public static void publish(SampleMessage q) {
RabbitMQPublisher.publish("myQueue", q);
render(q);
}
h3. 6) Creating a Message Consumer
@OnApplicationStart(async = true)
public class RabbitMQSampleConsumer extends RabbitMQConsumer {
protected void consume(SampleMessage message) {
System.out.println("******************************");
System.out.println("* Message Consumed: " + message);
System.out.println("******************************");
}
protected String queue() {
return "myQueue";
}
protected String routingKey() {
return this.queue();
}
protected int retries() {
// This is the default value defined by "rabbitmq.retries" on
// application.conf (please override if you need a new value)
return RabbitMQPlugin.retries();
}
protected Class getMessageType() {
return SampleMessage.class;
}
}
* Please note this is a Play! job so you can start it manualy or you can use the other annotations provided by Play! like @On or @Every. More information available at http://www.playframework.org/documentation/1.2/jobs.
** There are some cases where you don't want to retry re-processing the message, for example in cases you know you will keep getting the same error no matter how many times you retry that message.
First of all, we avoid infiniate loops by retrying only a certain amount of times, defined by the method retries() on the consumer class.
For the cases where you don't want to retry at all simply throw a RabbitMQNotRetriableException. The RabbitMQConsumer base class will catch it and acknowledge to RabbitMQ, instead of trying to reprocess the message.
*** Each consumer has a method routingKey() which you can optionally overwrite to setup a Topic Exchange with RabbitMQ. Same thing on the producer.
h3. 7) Firehose - Another way to publish messages in batch
@OnApplicationStart(async = true)
public class RabbitMQSampleFirehose extends RabbitMQFirehose {
public int count = 0;
protected List getData(int n) throws Exception {
if ( count >= 10 ) {
return null;
}
List results = new ArrayList();
for (int i = 0; i < n; i++) {
results.add(new SampleMessage("field1", "field2"));
count++;
}
return results;
}
protected int batchSize() {
return 2;
}
protected String queueName() {
return "myQueue";
}
}
* Please note this is a Play! job so you can start it manualy or you can use the other annotations provided by Play! like @On or @Every. More information available at http://www.playframework.org/documentation/1.2/jobs.
h3. 8) Live Stats and Live Streaming
Add the module routes in your routes.conf
* /rabbitmq/ module:rabbitmq
Live Stats and Live Streaming of messages produced and consumed are available at http://localhost:9000/rabbitmq/.
Live Stats uses Ajax to display the number of successful messages produced and consumed to a queue, errors as well.
Live Streaming uses WebSockets (requires newer browsers) to stream messages consumed and produced.
* To find out more about WebSockets in the Play! Framework, please visit http://www.playframework.org/documentation/1.2/asynchronous.
h3. Source Code
The source code is available on Github at https://github.com/feliperazeek/play-rabbitmq.
h3. Changelog
May 11th 2011 - Version 0.0.5: Fixed bugs on stats and added average time to stats service, stats page for a queue and also to the WebSocket live streaming.
May 18th 2011 - Version 0.0.6: Adding routingKey to Publisher, Consumer, Firehose, RabbitMQPlugin, etc to add support for RabbitMQ Topic Exchange.
PK > " app/controllers/RabbitMQStats.java/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @author Felipe Oliveira (http://mashup.fm)
*
*/
package controllers;
import play.modules.rabbitmq.sample.RabbitMQSampleConsumer;
import play.modules.rabbitmq.sample.RabbitMQSampleFirehose;
import play.mvc.Controller;
// TODO: Auto-generated Javadoc
/**
* The Class RabbitMQStats.
*/
public class RabbitMQStats extends Controller {
/** The service. */
private static play.modules.rabbitmq.stats.StatsService service = play.modules.rabbitmq.RabbitMQPlugin.statsService();
/**
* Index.
*/
public static void index() {
render();
}
/**
* Stream.
*/
public static void stream() {
render();
}
/**
* Queue stats.
*
* @param queueName
* the queue name
*/
public static void queueStats(String queueName) {
long producerSuccess = service.executions(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.PRODUCER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS);
long producerFailed = service.executions(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.PRODUCER, play.modules.rabbitmq.stats.StatsEvent.Status.ERROR);
long consumerSuccess = service.executions(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.CONSUMER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS);
long consumerFailed = service.executions(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.CONSUMER, play.modules.rabbitmq.stats.StatsEvent.Status.ERROR);
long producerSuccessAverageTime = service.averageTime(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.PRODUCER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS);
long producerFailedAverageTime = service.averageTime(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.PRODUCER, play.modules.rabbitmq.stats.StatsEvent.Status.ERROR);
long consumerSuccessAverageTime = service.averageTime(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.CONSUMER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS);
long consumerFailedAverageTime = service.averageTime(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.CONSUMER, play.modules.rabbitmq.stats.StatsEvent.Status.ERROR);
render(queueName, producerSuccess, producerFailed, consumerSuccess, consumerFailed, producerSuccessAverageTime, producerFailedAverageTime, consumerSuccessAverageTime, consumerFailedAverageTime);
}
/**
* Queue stats details.
*
* @param queueName
* the queue name
*/
public static void queueStatsDetails(String queueName) {
long producerSuccess = service.executions(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.PRODUCER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS);
long producerFailed = service.executions(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.PRODUCER, play.modules.rabbitmq.stats.StatsEvent.Status.ERROR);
long consumerSuccess = service.executions(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.CONSUMER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS);
long consumerFailed = service.executions(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.CONSUMER, play.modules.rabbitmq.stats.StatsEvent.Status.ERROR);
long producerSuccessAverageTime = service.averageTime(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.PRODUCER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS);
long producerFailedAverageTime = service.averageTime(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.PRODUCER, play.modules.rabbitmq.stats.StatsEvent.Status.ERROR);
long consumerSuccessAverageTime = service.averageTime(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.CONSUMER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS);
long consumerFailedAverageTime = service.averageTime(queueName, play.modules.rabbitmq.stats.StatsEvent.Type.CONSUMER, play.modules.rabbitmq.stats.StatsEvent.Status.ERROR);
render(queueName, producerSuccess, producerFailed, consumerSuccess, consumerFailed, producerSuccessAverageTime, producerFailedAverageTime, consumerSuccessAverageTime, consumerFailedAverageTime);
}
/**
* Fire sample firehose.
*/
public static void fireSampleQueue() {
new RabbitMQSampleConsumer().now();
new RabbitMQSampleFirehose().now();
render();
}
}
PK h>#w &