javascript
spring接入rabbitmq -ag凯发k8国际
在原有的spring项目中接入rabbitmq,使用的spring集成rabbitmq,这样 rabbitmq的功能调用通过spring封装,调用更加简洁。
首先pom 文件中引入依赖
<dependency><groupid>org.springframework.amqpgroupid><artifactid>spring-rabbitartifactid><version>${spring-rabbit-version}version><exclusions><exclusion><groupid>org.springframeworkgroupid><artifactid>spring-coreartifactid>exclusion><exclusion><groupid>org.springframeworkgroupid><artifactid>spring-messagingartifactid>exclusion><exclusion><groupid>org.springframeworkgroupid><artifactid>spring-txartifactid>exclusion>exclusions>dependency>因为已有项目中spring的版本与引入的spring-rabbit包中依赖的版本不一致,故exclusions中设置了多条 exclusion。
引入 jar 包后,开始 rabbitmq 的配置,思路等同于数据库,要将程序与 rabbitmq建立连接,以配置的形式。ag凯发k8国际官网给出的 demo中,rabbitmq接入 spring 以xml 文件的形式;接入 springboot以注解的形式。以下以xml 配置的形式。配置文件如下:
xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemalocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"><rabbit:connection-factory id="mqconnectionfactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" port="${rabbitmq.port}" publisher-confirms="true" publisher-returns="true"/><rabbit:admin connection-factory="mqconnectionfactory"/><rabbit:listener-container connection-factory="mqconnectionfactory" acknowledge="manual"><rabbit:listener queue-names="q.crm.msg" ref="msgsenderconsumerserviceimpl"/> rabbit:listener-container><rabbit:template id="rabbittemplate" connection-factory="mqconnectionfactory"confirm-callback="confirmcallbacklistener" return-callback="returncallbacklistener" mandatory="true"/> beans>将这个配置文件加载到 spring 的配置文件中,文件中的 host等通过 properties 文件设置。
生产者,消费者部分:
生产者:在需要使用的类中注入
@autowired private rabbittemplate rabbittemplate;生产部分
rabbittemplate.convertandsend("e.crm", "k.crm.push.msg", json.tojsonstring(msg));第一个参数(e.crm)指定 exchange,第二个参数(k.crm.push.msg)指定binding key,第三个参数为消息内容。
消费部分
@service("msgsenderconsumerserviceimpl") public class msgsenderconsumerserviceimpl implements channelawaremessagelistener {public void onmessage(message message, channel channel) throws ioexception {string taskstr = new string(message.getbody(), "utf-8");}实现的是channelawaremessagelistener接口,为在方法中使用channel,不需要使用的话可直接实现messagelistener接口。
配置中生产者确认机制在配置中需要的类有:
@service("confirmcallbacklistener") public class confirmcallbacklistener implements confirmcallback{@overridepublic void confirm(correlationdata correlationdata, boolean ack, string cause) {//exchange未到达成功 ack=false//处理逻辑 }} @service("returncallbacklistener") public class returncallbacklistener implements returncallback{@overridepublic void returnedmessage(message message, int replycode, string replytext, string exchange, string routingkey) {//入队列未成功,则调用此方法//处理逻辑 }}通过以上配置,rabbitmq 接入完成。
但是哩,每个 queue的接入都需要修改配置文件,虽然解耦的方式,但是queue 接入后业务比较固定,修改配置文件就不如注解方式灵活。故接入将消费者修改成注解的方式。
放开 rabbitmq配置中可使用注解的配置,这样在类中可直接使用 rabbitmq相关的注解。
@service("msgsenderconsumerserviceimpl") @rabbitlistener(queues = "q.crm.msg") public class msgsenderconsumerserviceimpl {@rabbithandlerpublic void onmessage(@payload string message) throws ioexception {system.out.println(message);}}这种形式则 rabbitmq配置文件中的这部分就不需要了
<rabbit:listener-container connection-factory="mqconnectionfactory"><rabbit:listener queue-names="q.crm.msg" ref="msgsenderconsumerserviceimpl"/> rabbit:listener-container>好,接下来,我们的业务想要在消费者端手动确认,
消费端通过 xml配置时比较好设置,如下:
<rabbit:listener-container connection-factory="mqconnectionfactory" acknowledge="manual"><rabbit:listener queue-names="q.crm.msg" ref="msgsenderconsumerserviceimpl"/> rabbit:listener-container>但是哩,我们想要用注解的形式,从网上搜了各种资料,一直没找到ag凯发k8国际的解决方案。
注解使用@rabbitlistener时,需要在配置文件中指定rabbitlistenercontainerfactory,但是手动确认的设置是在rabbitlistenercontainer中,这样就找不到设置的入口。
网上搜索的很多资料,通过设置:spring.rabbitmq.listener.simple.acknowledge-mode=manual
我将设置添加到 spring 的配置文件中,也是不好使,这个配置应该是 spring-boot 项目中有效的。
那直接设置的方案使用没有找到可用的,最终的ag凯发k8国际的解决方案是,使用自己定义的rabbitlistenercontainerfactory,
内容完全参照的spring-rabbit包中的simplerabbitlistenercontainerfactory类,修改部分是:
@overrideprotected simplemessagelistenercontainer createcontainerinstance() {simplemessagelistenercontainer container = new simplemessagelistenercontainer();container.setacknowledgemode(acknowledgemode.manual);return container;}这样手动确认就配置好了,手动确认消费者端的代码为:
@service("msgsenderconsumerserviceimpl") @rabbitlistener(queues = "q.crm.msg") public class msgsenderconsumerserviceimpl {@rabbithandlerpublic void onmessage(@payload string message, channel channel, @header(amqpheaders.delivery_tag) long deliverytag) throws ioexception {//处理逻辑channel.basicack(deliverytag, false);}以上~
转载于:https://www.cnblogs.com/youyj/p/7332414.html
总结
以上是ag凯发k8国际为你收集整理的spring接入rabbitmq的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇:
- 下一篇: