Thursday, July 14, 2016

WSO2 - Send a message to a Topic/Exchange

I ran into small issue trying to integrate Rabbit MQ with WSO2 ESB. Well configuring rabbit itself with WSO2 is very simple but the issue was publishing a message directly to a Topic or an exchange.
WSO2 provides a standard means of AMQP transports and using send mediator you could configure message sender in your proxy service like as shown below,
<?xml version="1.0" encoding="UTF­8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"name="AMQPProducerSample"transports="http"statistics="disable"trace="disable"startOnLoad="true">
<target>
 <inSequence>
    <property name="OUT_ONLY" value="true"/>
    <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
     <send>
      <endpoint>
       <address uri="rabbitmq:/AMQPProducerSample?rabbitmq.server.host.name=localhost&amp;rabbitmq.server.port=5672&amp;rabbitmq.queue.name=queue&amp;rabbitmq.queue.route.key=route&amp;rabbitmq.exchange.name=exchange"/>
      </endpoint>
    </send>
 </inSequence>
 <outSequence>
        <send/>
 </outSequence>
</target>
<description/>
</proxy>
Notice the uri attribute's of address element, the issue is you can not send a message without queue information. That means you will not be able to drop a message directly into the exchange to which one or more queues are listening. This forces to change your architecture a bit to introduce a dummy queue or reuse one of your listening queue to drop a message. But without making these changes you could write a simple class mediator using rabbitmq client and drop a message directly into the exchange.
Here is the code that does it,
public class PublisherMediator extends AbstractMediator {
 private static final String EXCHANGE_NAME = "my_exchange";
 private static final String ROUTING_KEY = "my_route_key";

 @Override
 public boolean mediate(MessageContext mc) {
  String myMessage = (String) mc.getProperty("my_message");
  byte[] message = myMessage.getBytes();
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = null;
  Channel channel = null;
  try {
   connection = factory.newConnection();
   channel = connection.createChannel();
   channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
   channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message);
   System.out
     .println(" Message Sent to Exchange " + EXCHANGE_NAME
       + " using routing key " + ROUTING_KEY + ":"
       + message + "'");
  } catch (IOException e) {
   e.printStackTrace();
  } catch (TimeoutException e) {
   e.printStackTrace();
  } finally {
   try {
    if(channel != null) 
     channel.close();
    if(connection != null)
     connection.close();
   } catch (IOException e) {
    e.printStackTrace();
   } catch (TimeoutException e) {
    e.printStackTrace();
   }
  }

  return true;
 }
}

Now add this class mediator into proxy service, the revised proxy looks like this,

<?xml version="1.0" encoding="UTF­8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"name="AMQPProducerSample"transports="http"statistics="disable"trace="disable"startOnLoad="true">
<target>
 <inSequence>
    <property name="OUT_ONLY" value="true"/>
    <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
 <property description="Set Message" expression="message expression|evaluation"
    name="my_message" scope="default" />
     <class name="com.vinay.PublisherMediator" /> 
 </inSequence>
 <outSequence>
        <send/>
 </outSequence>
</target>
<description/>
</proxy>
 Make sure to add Rabbit MQ client jar to the classpath of mediator project. That's all it takes to drop a message directly into the Rabbit MQ exchange, no biggie but hope this helps someone.

No comments:

Post a Comment