Download Android App


Alternate Blog View: Timeslide Sidebar Magazine

Friday, September 9, 2011

Messaging using JMS, ActiveMQ, Apache Camel

Event drive architecture (EDA) is a powerful architecture and quite useful in some contexts. It is very useful in cases where applications and systems transmit events among loosely coupled software components and services. Event-driven architecture can complement service-oriented architecture (SOA) because services can be activated by triggers fired on incoming events.

As an example, in an E-Commerce system, if a shopper places an order, an order confirmation email will be sent out. "Order" is an event produced by the Ordering system, and multiple processing engine can react to this event. One such processor can send out Order confirmation email to the buyer.

So as you can see, EDA puts events in the middle of the design. It is about recognizing business events and how to design them in terms of data modeling. A business event is something that happens (change of state) where your business has planned to react upon in a predefined way. A business event is represented by a message, but not all messages are representations of business events. Also, EDA-approach in an asynchronous communication style.

 The Java Message Service (JMS) API is a Java Message Oriented Middleware (MOM) API for sending messages between two or more clients. ActiveMQ is one of the popular JMS providers.

The next section discusses utilizing JMS, ActiveMQ and Apache Camel for designing a simple event based notification mechanism. I assume that you have some basic understanding of JMS.

Scenario: Say for an E-Commerce business, when an out of stock product becomes available, it is a significant event. When such an event is received, the product should be available for sale on the website. We will design a simple notification system that generates a 'Inventory Received' event when inventory is received for an out of stock items.

Event producer: Warehouse management system, Event: Inventory Received , Event processor: NotificationService

Some observations here about this use case:
  • Event producer/Caller is not aware of Event consumer/callee
  • Many events can happen per second
  • Order of events is not important
  • No state management is required 
  • Communication is asynchronous
 ActiveMQ provides JMS implementation along with many advanced features like Virtual destinations, Total ordering, selectors etc.

Apache Camel is a rule-based routing and mediation engine which provides a Java object-based implementation of the Enterprise Integration Patterns using an API to configure routing and mediation rules. Apache Camel uses URIs so that it can easily work directly with any kind of Transport or messaging model such as HTTP, ActiveMQ, JMS, JBI, SCA, MINA or CXF Bus API together with working with pluggable Data Format options.

1. Configure ActiveMQ to enable Virtual destinations. Start ActiveMQ broker.
2. Event is a simple bean called InventoryUpdate.


 public class InventoryUpdate implements Serializable {

	private static final long serialVersionUID = 1L;

	private long itemId;
	private int quantity;
	private Date updateDate;

  ... 

}

3. Event producer


public class InventoryUpdateProducer {

    private static String url = "failover:(tcp://localhost:61616)?maxReconnectDelay=5000";

    // Name of the queue we will be sending messages to
    private static String subject = "InventoryUpdate";

    public static void main(String[] args) throws JMSException {

        // Getting JMS connection from the server and starting it
        ConnectionFactory connectionFactory =
            new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // JMS messages are sent and received using a Session. We will
        // create here a non-transactional session object. If you want
        // to use transactions you should set the first parameter to 'true'
        Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

        // Destination represents here our topic 'InventoryUpdate' on the
        // JMS server. You don't have to do anything special on the
        // server to create it, it will be created automatically.
        Destination destination = session.createTopic(subject);

        // MessageProducer is used for sending messages (as opposed
        // to MessageConsumer which is used for receiving them)
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // We will send a InventoryUpdate message.
        Random randomGenerator = new Random();
        InventoryUpdate iUpdate = new InventoryUpdate(randomGenerator.nextInt(10), 100, new Date());
        ObjectMessage objMessage = session.createObjectMessage(iUpdate);

        // Here we are sending the message!
        producer.send(objMessage);
        System.out.println("Sent message '" + objMessage + "'" + iUpdate);

        connection.close();
    }
}

4. Now let us look at the consumer side. A consumer can be a simple JMS client consuming from a Queue - "VirtualTopicConsumers.ConsumerB.InventoryUpdate" or we can use the power of Apache Camel to help us integrate by hiding the JMS details from the client. Remember, we are using Virtual Topic here.


public class InventoryUpdateConsumer {

    // URL of the JMS server
    private static String url = "failover:(tcp://localhost:61616)?maxReconnectDelay=5000";    // Name of the queue we will receive messages from
    private static String subject = "VirtualTopicConsumers.ConsumerB.InventoryUpdate";

    public static void main(String[] args) throws JMSException {
        
        // Getting JMS connection from the server
        ConnectionFactory connectionFactory
            = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Creating session for seding messages
        Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

        // Getting the queue 'InventoryUpdate'
        Destination destination = session.createQueue(subject);

        // MessageConsumer is used for receiving (consuming) messages
        MessageConsumer consumer = session.createConsumer(destination);

        // Here we receive the message.
        // By default this call is blocking, which means it will wait
        // for a message to arrive on the queue.
        Message message = consumer.receive();

        // There are many types of Message and TextMessage
        // is just one of them. Producer sent us a TextMessage
        // so we must cast to it to get access to its .getText()
        // method.
        if (message instanceof ObjectMessage){
        	InventoryUpdate update = (InventoryUpdate) ((ObjectMessage) message).getObject();
        	System.out.println(update);
        }

        System.out.println("Done");
        connection.close();
    }
}

Ofcourse, we can also use camel template for publishing InventoryUpdate events to the topic instead of using JMS client directly as described in step 3 above.

Now, let us start by configuring Apache Camel to receive the message and process it.

i. Configure camelContext.xml


 <beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
	xmlns:broker="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">

	<!-- If a bean is defined in Spring XML or scanned using the Spring 2.5
		component scanning mechanism and a <camelContext> is used or a CamelBeanPostProcessor
		then we process a number of Camel annotations to do various things such as
		injecting resources or producing, consuming or routing messages. -->
	<!-- lets configure the Camel ActiveMQ to use the ActiveMQ broker declared
		above -->
	<bean id="main-broker" class="org.apache.activemq.camel.component.ActiveMQComponent">
		<property name="brokerURL" value="tcp://localhost:61616" />
	</bean>

	<bean id="jmsToFileRoute" class="com.JmsToEmailRoute" />

	<camel:camelContext id="camelContext">
		<camel:routeBuilder ref="jmsToFileRoute" />
	</camel:camelContext>
</beans>

ii. Define the route

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository;

import com.mchange.v2.c3p0.ComboPooledDataSource;

public class JmsToEmailRoute extends RouteBuilder {

	@Override
	public void configure() throws Exception {

		ComboPooledDataSource dataSource = new ComboPooledDataSource();
		dataSource.setDriverClass("com.mysql.jdbc.Driver");
		dataSource.setJdbcUrl( "jdbc:mysql://localhost:3306/test" );
		dataSource.setUser("root");
		dataSource.setPassword("");

		from("activemq:queue:VirtualTopicConsumers.ConsumerA.InventoryUpdate")
				.idempotentConsumer(
						bean(MessageProcessor.class, "getId"),
						JdbcMessageIdRepository.jpaMessageIdRepository(
								dataSource, "notificationProcessor"))
				.bean(MessageProcessor.class,
						"process(InventoryUpdate)");
	}

}

iii. Define message processor.


import InventoryUpdate;

public class MessageProcessor {
	public void process(InventoryUpdate message){
		System.out.println("Got message: " + message);
	}

	public long getId(InventoryUpdate message){
		return message.getItemId();
	}
}


Explanation: If you look at  JmsToEmailRoute, it is defining a route from Queue "VirtualTopicConsumers.ConsumerA.InventoryUpdate" to the processor bean "MessageProcessor". As such, Camel will invoke "process(InventoryUpdate message)" method for every InventoryUpdate event published to the topic "InventoryUpdate".

One important consideration here is that of Idempotence i.e. how will your application react to duplicate messages? Duplicate messages can be delivered for reasons such as buggy producer, broker crashing just before the acknowledgement etc.

There are couple of ways to solve this problem. One way is to use JMS or JTA/XA transactions. This is relatively slow as 2 phase commits and multiple flush to disks are involved.

The other way is duplicate detection and caching of messages. Here is the pseudocode



[WebMethod]

ResponseMessage DoRequest(RequestMessage)
{
	Extract the request identifier from the request.

	Check if the request has already been processed by querying the persistent log.

	If (request processed) then 
		return response
	Else
	{
                ITransaction trans = CreateTransaction()

                ProcessMessage(RequestMessage, trans);
 
                //Store the request identifier the persistent log or database
                UpdateLog(request identifier, trans)

                trans.Commit();

                Return responsemessage;
	}
}


Camel provides support for Idempotent Consumer. Look at the route definition again:


//Use C3P0 connection pooling 
ComboPooledDataSource dataSource = new ComboPooledDataSource();
dataSource.setDriverClass("com.mysql.jdbc.Driver");
dataSource.setJdbcUrl( "jdbc:mysql://localhost:3306/test" );
dataSource.setUser("root");
dataSource.setPassword("");

from("activemq:queue:VirtualTopicConsumers.ConsumerA.InventoryUpdate")
	.idempotentConsumer(
			bean(MessageProcessor.class, "getId"),
			JdbcMessageIdRepository.jpaMessageIdRepository(
				dataSource, "notificationProcessor"))
				.bean(MessageProcessor.class,
					"process(InventoryUpdate)");


Here, the bean method  MessageProcessor.getId() is invoked to get the request identifier. ' JdbcMessageIdRepository.jpaMessageIdRepository ' checks this identifier against the database. If no existing identifier is found, it stores the identifier in the database and invokes  'process(InventoryUpdate)' method.

iv. Finally, start the consumer.
 
public class TestMessageProducer {

	public static void main(String[] args) {

		//Talks to InventoryUpdateProducer
		ConfigurableApplicationContext context = new ClassPathXmlApplicationContext("camelContext.xml");
        }
}

Happy messaging!

1 comment: