I ran into small issue trying to integrate Rabbit MQ with WSO2 ESB. Well configuring rabbit itself with WSO2 is very simple but the issue was publishing a message directly to a Topic or an exchange.
WSO2 provides a standard means of AMQP transports and using send mediator you could configure message sender in your proxy service like as shown below,
Now add this class mediator into proxy service, the revised proxy looks like this,
Make sure to add Rabbit MQ client jar to the classpath of mediator project. That's all it takes to drop a message directly into the Rabbit MQ exchange, no biggie but hope this helps someone.
WSO2 provides a standard means of AMQP transports and using send mediator you could configure message sender in your proxy service like as shown below,
<?xml version="1.0" encoding="UTF8"?> <proxy xmlns="http://ws.apache.org/ns/synapse"name="AMQPProducerSample"transports="http"statistics="disable"trace="disable"startOnLoad="true"> <target> <inSequence> <property name="OUT_ONLY" value="true"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> <send> <endpoint> <address uri="rabbitmq:/AMQPProducerSample?rabbitmq.server.host.name=localhost&rabbitmq.server.port=5672&rabbitmq.queue.name=queue&rabbitmq.queue.route.key=route&rabbitmq.exchange.name=exchange"/> </endpoint> </send> </inSequence> <outSequence> <send/> </outSequence> </target> <description/> </proxy>
Notice the uri attribute's of address element, the issue is you can not send a message without queue information. That means you will not be able to drop a message directly into the exchange to which one or more queues are listening. This forces to change your architecture a bit to introduce a dummy queue or reuse one of your listening queue to drop a message. But without making these changes you could write a simple class mediator using rabbitmq client and drop a message directly into the exchange.
Here is the code that does it,
public class PublisherMediator extends AbstractMediator { private static final String EXCHANGE_NAME = "my_exchange"; private static final String ROUTING_KEY = "my_route_key"; @Override public boolean mediate(MessageContext mc) { String myMessage = (String) mc.getProperty("my_message"); byte[] message = myMessage.getBytes(); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message); System.out .println(" Message Sent to Exchange " + EXCHANGE_NAME + " using routing key " + ROUTING_KEY + ":" + message + "'"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { if(channel != null) channel.close(); if(connection != null) connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } return true; } }
<?xml version="1.0" encoding="UTF8"?> <proxy xmlns="http://ws.apache.org/ns/synapse"name="AMQPProducerSample"transports="http"statistics="disable"trace="disable"startOnLoad="true"> <target> <inSequence> <property name="OUT_ONLY" value="true"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> <property description="Set Message" expression="message expression|evaluation" name="my_message" scope="default" /> <class name="com.vinay.PublisherMediator" /> </inSequence> <outSequence> <send/> </outSequence> </target> <description/> </proxy>
No comments:
Post a Comment