How to not lose messages when publishing to RabbitMQ from Mule until the proper "Ack" is received? -


i have synchronized mule flow reads messages sonic topic , publish rabbit exchange.

i loosing messages when rabbit brought up/down. rabbit exchange publishing ha queues. how can make sure mule not consuming message until proper "ack" received rabbit broker? here flow.

<jms:connector name="sonicmqconnectorsub" validateconnections="true" connectionfactory-ref="factorysub" doc:name="jms" clientid="testclient" durable="true" maxredelivery="-1" >      <reconnect-forever frequency="30000"/> </jms:connector> <spring:beans>     <spring:bean id="soniqmqconnectionfactorybeansub" name="factorysub" class="progress.message.jclient.connectionfactory">         <spring:property name="connectionurls" value="tcp://server1:7800" />         <spring:property name="defaultuser" value="user" />         <spring:property name="defaultpassword" value="pass" />     </spring:bean> </spring:beans>  <amqp:connector name="amqp" validateconnections="true" host="server2" fallbackaddresses="server3" doc:name="amqp connector" port="5672" mandatory="true" activedeclarationsonly="true">     <reconnect-forever frequency="30000"/> </amqp:connector>   <flow name="rabbitflow1" doc:name="rabbitflow1" processingstrategy="synchronous">     <jms:inbound-endpoint doc:name="jms" connector-ref="sonicmqconnectorsub" topic="testtopic"/>      <logger message="message: #[message.payload]" level="info" doc:name="logger"/>      <amqp:outbound-endpoint exchangename="rabbitexchange" exchangedurable="true" responsetimeout="10000" connector-ref="amqp" doc:name="amqp" exchangetype="fanout"/>  </flow> 

updated 04/22

here exception trace when mule connecting 2nd broker. when loose message.

2014-04-22 09:49:29,453 - org.mule.exception.defaultsystemexceptionstrategy - error - ******************************************************************************** message               : connection shutdown detected for: amqp code                  : mule_error--2 -------------------------------------------------------------------------------- exception stack is: 1. software caused connection abort: recv failed (java.net.socketexception) java.net.socketinputstream:-2 (null) 2. connection error; reason: java.net.socketexception: software caused connection     abort: recv failed (com.rabbitmq.client.shutdownsignalexception) com.rabbitmq.client.impl.amqconnection:715 (null) 3. connection shutdown detected for: amqp (org.mule.transport.connectexception) org.mule.transport.amqp.amqpconnector$1:502     (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/transport/connectexception.html) -------------------------------------------------------------------------------- root exception stack trace: java.net.socketexception: software caused connection abort: recv failed     @ java.net.socketinputstream.socketread0(native method)     @ java.net.socketinputstream.read(socketinputstream.java:150)     @ java.net.socketinputstream.read(socketinputstream.java:121) + 3 more (set debug level logging or '-dmule.verbose.exceptions=true' everything) ********************************************************************************  2014-04-22 09:49:29,453 - org.mule.exception.defaultsystemexceptionstrategy - info -    exception caught connectexception, attempting reconnect... 2014-04-22 09:49:29,454 - org.mule.lifecycle.abstractlifecyclemanager - info - stopping   connector: amqp  2014-04-22 09:49:29,454 - org.mule.lifecycle.abstractlifecyclemanager - info -   stopping: 'amqp.dispatcher.1064499250'. object is: amqpmessagedispatcher 2014-04-22 09:49:29,454 - org.mule.lifecycle.abstractlifecyclemanager - info -  disposing: 'amqp.dispatcher.1064499250'. object is: amqpmessagedispatcher 2014-04-22 09:49:29,455 - org.mule.transport.amqp.amqpconnector - error - clean  connection shutdown; reason: attempt use closed connection 2014-04-22 09:49:29,461 - org.mule.transport.amqp.amqpconnector - info - connected:    amqpconnector { name=amqp lifecycle=stop this=33c5919e numberofconcurrenttransactedreceivers=4 createmultipletransactedreceivers=true connected=true supportedprotocols=[amqp] serviceoverrides=<none> }  2014-04-22 09:49:29,461 - org.mule.transport.amqp.amqpconnector - info - starting:   amqpconnector { name=amqp lifecycle=stop this=33c5919e numberofconcurrenttransactedreceivers=4 createmultipletransactedreceivers=true connected=true supportedprotocols=[amqp] serviceoverrides=<none> }  2014-04-22 09:49:29,461 - org.mule.lifecycle.abstractlifecyclemanager - info - starting  connector: amqp 

updated 04/23 exception received when jms transaction added amqp outbound endpoint:

message               : no active amqp transaction found endpoint:      defaultoutboundendpoint{endpointuri=amqp://rabbitexchange, connector=amqpconnector { name=amqp lifecycle=start this=25ec1ff7 numberofconcurrenttransactedreceivers=4 createmultipletransactedreceivers=true connected=true supportedprotocols=[amqp] serviceoverrides=<none> } ,  name='endpoint.amqp.rabbitexchange', mep=one_way, properties={exchangedurable=true,     exchangetype=fanout}, transactionconfig=transaction   {factory=org.mule.transport.jms.jmstransactionfactory@6491b172, action=always_join,   timeout=30000}, deleteunacceptedmessages=false, initialstate=started, responsetimeout=10000, endpointencoding=utf-8, disabletransporttransformer=false} code                  : mule_error--2  -------------------------------------------------------------------------------- root exception stack trace: org.mule.transaction.illegaltransactionstateexception: no active amqp transaction  found endpoint: defaultoutboundendpoint{endpointuri=amqp://rabbitexchange,   connector=amqpconnector { name=amqp lifecycle=start this=25ec1ff7 numberofconcurrenttransactedreceivers=4 createmultipletransactedreceivers=true connected=true supportedprotocols=[amqp] serviceoverrides=<none> } ,  name='endpoint.amqp.rabbitexchange', mep=one_way, properties= {exchangedurable=true, exchangetype=fanout}, transactionconfig=transaction {factory=org.mule.transport.jms.jmstransactionfactory@6491b172, action=always_join,  timeout=30000}, deleteunacceptedmessages=false, initialstate=started,   responsetimeout=10000, endpointencoding=utf-8, disabletransporttransformer=false} @ org.mule.transport.amqp.amqpmessagedispatcher.geteventchannel(amqpmessagedispatcher.java:298) @ org.mule.transport.amqp.amqpmessagedispatcher.dooutboundaction(amqpmessagedispatcher.java:152) @ org.mule.transport.amqp.amqpmessagedispatcher.dodispatch(amqpmessagedispatcher.java:127) + 3 more (set debug level logging or '-dmule.verbose.exceptions=true' everything) ********************************************************************************  2014-04-23 10:52:03,178 - org.mule.transport.jms.jmstransaction - warn - transaction   rollback attempted, no resource bound   org.mule.transport.jms.jmstransaction@d4ac3d8f-caf6-11e3-bf9a-8b266a026dee  [status=status_marked_rollback, key=null, resource=null] 

i see 2 options:

  • make jms client durable 1 , consume testtopic transactionally if amqp:outbound-endpoint fails, message redelivered.
  • wrap amqp:outbound-endpoint until-successful retry outbound dispatches until amqp connector reconnects rabbitmq.

Comments

Popular posts from this blog

c++ - How to add Crypto++ library to Qt project -

jQuery Mobile app not scrolling in Firefox -

how to receive file in java(servlet/jsp) -