java - spring amqp rabbitmq MessageListener not working -
i trying use rabbitmq using spring amqp, below configuration.
<rabbit:connection-factory id="rabbitconnectionfactory" port="${rabbitmq.port}" host="${rabbitmq.host}" /> <rabbit:admin connection-factory="rabbitconnectionfactory" /> <rabbit:queue name="${rabbitmq.import.queue}" /> <rabbit:template id="importamqptemplate" connection-factory="rabbitconnectionfactory" queue="${rabbitmq.import.queue}" /> <beans:bean id="importexchangemessagelistener" class="com.stockopedia.batch.foundation.importmessagelistener" /> <rabbit:listener-container connection-factory="rabbitconnectionfactory" concurrency="5"> <rabbit:listener queues="${rabbitmq.import.queue}" ref="importmessagelistener" /> </rabbit:listener-container>
this simple message listener class,
import org.springframework.amqp.core.message; import org.springframework.amqp.core.messagelistener; public class importmessagelistener implements messagelistener { @override public void onmessage(message message) { system.out.println("consumer output: " + message); } }
this producer (which itemwriter of spring batch),
public class importitemwriter<t> implements itemwriter<t> { private amqptemplate template; public amqptemplate gettemplate() { return template; } public void settemplate(amqptemplate template) { this.template = template; } public void write(list<? extends t> items) throws exception { (t item : items) { object reply = template.convertsendandreceive(item.tostring()); system.out.println("producer output: " + reply); } } }
when run spring batch job, importitemwriter.write gets called. importmessagelistener.onmessage not work. doesnt print message. below output items on console
producer output: null producer output: null producer output: null producer output: null producer output: null producer output: null producer output: null
your consumer not sending result...
@override public void onmessage(message message) { system.out.println("consumer output: " + message); }
change simple pojo; container's messagelisteneradapter
take care of conversion you, , send result.
@override public string handlemessage(string message) { system.out.println("consumer output: " + message); return "result"; }
edit:
you haven't set exchange or routing queue. if want use default exchange/routing, use...
convertsendandreceive("", queuename, item.tostring());
edit2:
or, set routingkey
on template queue name , can use simpler method.
the ...sendandreceive()
methods meant request/reply scenarios blocking required. asynchronously, have use 1 of ...send()
methods, , wire own simplelistenercontainer
receive replies; have own correlation. use
public void convertandsend(object message, messagepostprocessor postprocessor)
and in message post processor, set replyto
, correlationid
headers...
message.getmessageproperties().setreplyto("foo"); message.getmessageproperties().setcorrelationid("bar");
or, build message
object (e.g by using messagebuilder
) , use send
method...
template.send(messagebuilder.withbody("foo".getbytes()) .setreplyto("bar") .setcorrelationid("baz".getbytes()) .build());
each request needs unique correlationid
can correlate response.
Comments
Post a Comment