How to not lose messages when publishing to RabbitMQ from Mule until the proper "Ack" is received? -
i have synchronized mule flow reads messages sonic topic , publish rabbit exchange.
i loosing messages when rabbit brought up/down. rabbit exchange publishing ha queues. how can make sure mule not consuming message until proper "ack" received rabbit broker? here flow.
<jms:connector name="sonicmqconnectorsub" validateconnections="true" connectionfactory-ref="factorysub" doc:name="jms" clientid="testclient" durable="true" maxredelivery="-1" > <reconnect-forever frequency="30000"/> </jms:connector> <spring:beans> <spring:bean id="soniqmqconnectionfactorybeansub" name="factorysub" class="progress.message.jclient.connectionfactory"> <spring:property name="connectionurls" value="tcp://server1:7800" /> <spring:property name="defaultuser" value="user" /> <spring:property name="defaultpassword" value="pass" /> </spring:bean> </spring:beans> <amqp:connector name="amqp" validateconnections="true" host="server2" fallbackaddresses="server3" doc:name="amqp connector" port="5672" mandatory="true" activedeclarationsonly="true"> <reconnect-forever frequency="30000"/> </amqp:connector> <flow name="rabbitflow1" doc:name="rabbitflow1" processingstrategy="synchronous"> <jms:inbound-endpoint doc:name="jms" connector-ref="sonicmqconnectorsub" topic="testtopic"/> <logger message="message: #[message.payload]" level="info" doc:name="logger"/> <amqp:outbound-endpoint exchangename="rabbitexchange" exchangedurable="true" responsetimeout="10000" connector-ref="amqp" doc:name="amqp" exchangetype="fanout"/> </flow>
updated 04/22
here exception trace when mule connecting 2nd broker. when loose message.
2014-04-22 09:49:29,453 - org.mule.exception.defaultsystemexceptionstrategy - error - ******************************************************************************** message : connection shutdown detected for: amqp code : mule_error--2 -------------------------------------------------------------------------------- exception stack is: 1. software caused connection abort: recv failed (java.net.socketexception) java.net.socketinputstream:-2 (null) 2. connection error; reason: java.net.socketexception: software caused connection abort: recv failed (com.rabbitmq.client.shutdownsignalexception) com.rabbitmq.client.impl.amqconnection:715 (null) 3. connection shutdown detected for: amqp (org.mule.transport.connectexception) org.mule.transport.amqp.amqpconnector$1:502 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/transport/connectexception.html) -------------------------------------------------------------------------------- root exception stack trace: java.net.socketexception: software caused connection abort: recv failed @ java.net.socketinputstream.socketread0(native method) @ java.net.socketinputstream.read(socketinputstream.java:150) @ java.net.socketinputstream.read(socketinputstream.java:121) + 3 more (set debug level logging or '-dmule.verbose.exceptions=true' everything) ******************************************************************************** 2014-04-22 09:49:29,453 - org.mule.exception.defaultsystemexceptionstrategy - info - exception caught connectexception, attempting reconnect... 2014-04-22 09:49:29,454 - org.mule.lifecycle.abstractlifecyclemanager - info - stopping connector: amqp 2014-04-22 09:49:29,454 - org.mule.lifecycle.abstractlifecyclemanager - info - stopping: 'amqp.dispatcher.1064499250'. object is: amqpmessagedispatcher 2014-04-22 09:49:29,454 - org.mule.lifecycle.abstractlifecyclemanager - info - disposing: 'amqp.dispatcher.1064499250'. object is: amqpmessagedispatcher 2014-04-22 09:49:29,455 - org.mule.transport.amqp.amqpconnector - error - clean connection shutdown; reason: attempt use closed connection 2014-04-22 09:49:29,461 - org.mule.transport.amqp.amqpconnector - info - connected: amqpconnector { name=amqp lifecycle=stop this=33c5919e numberofconcurrenttransactedreceivers=4 createmultipletransactedreceivers=true connected=true supportedprotocols=[amqp] serviceoverrides=<none> } 2014-04-22 09:49:29,461 - org.mule.transport.amqp.amqpconnector - info - starting: amqpconnector { name=amqp lifecycle=stop this=33c5919e numberofconcurrenttransactedreceivers=4 createmultipletransactedreceivers=true connected=true supportedprotocols=[amqp] serviceoverrides=<none> } 2014-04-22 09:49:29,461 - org.mule.lifecycle.abstractlifecyclemanager - info - starting connector: amqp
updated 04/23 exception received when jms transaction added amqp outbound endpoint:
message : no active amqp transaction found endpoint: defaultoutboundendpoint{endpointuri=amqp://rabbitexchange, connector=amqpconnector { name=amqp lifecycle=start this=25ec1ff7 numberofconcurrenttransactedreceivers=4 createmultipletransactedreceivers=true connected=true supportedprotocols=[amqp] serviceoverrides=<none> } , name='endpoint.amqp.rabbitexchange', mep=one_way, properties={exchangedurable=true, exchangetype=fanout}, transactionconfig=transaction {factory=org.mule.transport.jms.jmstransactionfactory@6491b172, action=always_join, timeout=30000}, deleteunacceptedmessages=false, initialstate=started, responsetimeout=10000, endpointencoding=utf-8, disabletransporttransformer=false} code : mule_error--2 -------------------------------------------------------------------------------- root exception stack trace: org.mule.transaction.illegaltransactionstateexception: no active amqp transaction found endpoint: defaultoutboundendpoint{endpointuri=amqp://rabbitexchange, connector=amqpconnector { name=amqp lifecycle=start this=25ec1ff7 numberofconcurrenttransactedreceivers=4 createmultipletransactedreceivers=true connected=true supportedprotocols=[amqp] serviceoverrides=<none> } , name='endpoint.amqp.rabbitexchange', mep=one_way, properties= {exchangedurable=true, exchangetype=fanout}, transactionconfig=transaction {factory=org.mule.transport.jms.jmstransactionfactory@6491b172, action=always_join, timeout=30000}, deleteunacceptedmessages=false, initialstate=started, responsetimeout=10000, endpointencoding=utf-8, disabletransporttransformer=false} @ org.mule.transport.amqp.amqpmessagedispatcher.geteventchannel(amqpmessagedispatcher.java:298) @ org.mule.transport.amqp.amqpmessagedispatcher.dooutboundaction(amqpmessagedispatcher.java:152) @ org.mule.transport.amqp.amqpmessagedispatcher.dodispatch(amqpmessagedispatcher.java:127) + 3 more (set debug level logging or '-dmule.verbose.exceptions=true' everything) ******************************************************************************** 2014-04-23 10:52:03,178 - org.mule.transport.jms.jmstransaction - warn - transaction rollback attempted, no resource bound org.mule.transport.jms.jmstransaction@d4ac3d8f-caf6-11e3-bf9a-8b266a026dee [status=status_marked_rollback, key=null, resource=null]
i see 2 options:
- make jms client durable 1 , consume
testtopic
transactionally ifamqp:outbound-endpoint
fails, message redelivered. - wrap
amqp:outbound-endpoint
until-successful
retry outbound dispatches until amqp connector reconnects rabbitmq.
Comments
Post a Comment