#命名规范:容器名称.[队列特点or路由特点].使用的平台名称.作用#@容器名称:queue、exchange#@队列特点:非持久化标记(undurable)、延时队列(delay)、优先级队列(priority)#@路由特点:direct、topic、fanout、headers#@使用的平台名称:xiangshang、xiangqian……#@作用:干什么的#eg:消息队列(queue.xiangshang.message)、延时消息队列(queue.delay.xiangshang.message)、普通路由(exchange.direct.xiangshang.common)、通用路由(exchange.direct.xiangshang.common)
依赖的jar
< properties > < spring.amqp.version >1.6.6.RELEASE</ spring.amqp.version > </ properties > < dependencies > < dependency > < groupId >org.springframework.amqp</ groupId > < artifactId >spring-amqp</ artifactId > < version >${spring.amqp.version}</ version > < exclusions > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-core</ artifactId > </ exclusion > </ exclusions > </ dependency > < dependency > < groupId >org.springframework.amqp</ groupId > < artifactId >spring-rabbit</ artifactId > < version >${spring.amqp.version}</ version > < exclusions > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-core</ artifactId > </ exclusion > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-messaging</ artifactId > </ exclusion > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-tx</ artifactId > </ exclusion > < exclusion > < groupId >org.springframework</ groupId > < artifactId >spring-context</ artifactId > </ exclusion > </ exclusions > </ dependency > </ dependencies > |
连接配置(rabbitmq-config.properties)
#RabbitMQ服务器工厂参数设置 rmq.addresses=10.200.0.150:5672 rmq.username=root rmq.passWord=123456 #命名规范:容器名称.[队列特点or路由特点].使用的平台名称.作用 #@容器名称:queue、exchange #@队列特点:非持久化标记(undurable)、延时队列(delay)、优先级队列(priority) #@路由特点:direct、topic、fanout、headers #@使用的平台名称:xiangshang、xiangqian…… #@作用:干什么的 #eg:消息队列(queue.xiangshang.message)、延时消息队列(queue.delay.xiangshang.message)、普通路由(exchange.direct.xiangshang.common)、通用路由(exchange.direct.xiangshang.common) rmq.queue.xiangshang.test=queue.xiangshang.test rmq.queue.undurable.xiangshang.test=queue.undurable.xiangshang.test rmq.queue.priority.xiangshang.test=queue.priority.xiangshang.test rmq.queue.delay.xiangshang.test=queue.delay.xiangshang.test rmq.exchange.direct.xiangshang.test=exchange.direct.xiangshang.test rmq.exchange.fanout.xiangshang.test=exchange.fanout.xiangshang.test rmq.exchange.headers.xiangshang.test=exchange.headers.xiangshang.test |
xml配置(rabbitConfiguration.xml)
<? xml version = "1.0" encoding = "UTF-8" ?> < beans xmlns = "<a href="http://www.springframework.org/schema/beans" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans" xmlns:xsi = "<a href="http://www.w3.org/2001/XMLSchema-instance" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit = "<a href="http://www.springframework.org/schema/rabbit" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/rabbit" xmlns:context = "<a href="http://www.springframework.org/schema/context" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd "> < description >rabbitMQ连接服务配置</ description > < context:property-placeholder location = "classpath:rabbitmq-config.properties" /> <!-- 连接配置 --> < rabbit:connection-factory id = "connectionFactory" addresses = "${rmq.addresses}" username = "${rmq.username}" password = "${rmq.password}" /> < rabbit:admin connection-factory = "connectionFactory" /> <!-- 消息转换器 --> < bean id = "gsonConverter" class = "org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 连接模板 --> < rabbit:template id = "rabbitTemplate" connection-factory = "connectionFactory" message-converter = "gsonConverter" exchange = "${rmq.exchange.direct.xiangshang.test}" /> <!-- 消息队列 begin --> < rabbit:queue name = "${rmq.queue.xiangshang.test}" durable = "true" /> < rabbit:queue name = "${rmq.queue.undurable.xiangshang.test}" durable = "false" /> < rabbit:queue name = "${rmq.queue.priority.xiangshang.test}" durable = "false" > < rabbit:queue-arguments > < entry key = "x-max-priority" > < value type = "java.lang.Integer" >10</ value > </ entry > </ rabbit:queue-arguments > </ rabbit:queue > < rabbit:queue name = "${rmq.queue.delay.xiangshang.test}" > < rabbit:queue-arguments > < entry key = "x-message-ttl" > < value type = "java.lang.Long" >60000</ value > </ entry > < entry key = "x-dead-letter-exchange" value = "${rmq.exchange.direct.xiangshang.test}" /> < entry key = "x-dead-letter-routing-key" value = "${rmq.queue.xiangshang.test}" /> </ rabbit:queue-arguments > </ rabbit:queue > <!-- 消息队列 end --> <!-- 消息路由 begin --> < rabbit:direct-exchange name = "${rmq.exchange.direct.xiangshang.test}" > < rabbit:bindings > < rabbit:binding queue = "${rmq.queue.xiangshang.test}" key = "${rmq.queue.xiangshang.test}" /> < rabbit:binding queue = "${rmq.queue.undurable.xiangshang.test}" key = "${rmq.queue.undurable.xiangshang.test}" /> < rabbit:binding queue = "${rmq.queue.priority.xiangshang.test}" key = "${rmq.queue.priority.xiangshang.test}" /> < rabbit:binding queue = "${rmq.queue.delay.xiangshang.test}" key = "${rmq.queue.delay.xiangshang.test}" /> </ rabbit:bindings > </ rabbit:direct-exchange > <!-- 发布订阅 --> < rabbit:fanout-exchange name = "${rmq.exchange.fanout.xiangshang.test}" > < rabbit:bindings > < rabbit:binding queue = "${rmq.queue.xiangshang.test}" /> < rabbit:binding queue = "${rmq.queue.undurable.xiangshang.test}" /> </ rabbit:bindings > </ rabbit:fanout-exchange > <!-- 路由转发 --> < rabbit:headers-exchange name = "${rmq.exchange.headers.xiangshang.test}" > < rabbit:bindings > < rabbit:binding queue = "${rmq.queue.delay.xiangshang.test}" > < rabbit:binding-arguments > < entry key = "x-match" value = "all" /> < entry key = "Operator" value = "xsjf" /> < entry key = "sex" value = "male" /> </ rabbit:binding-arguments > </ rabbit:binding > </ rabbit:bindings > </ rabbit:headers-exchange > <!-- 消息路由 end --> <!-- 监听器 begin --> <!-- 消息接收:DirectListener.java继承MessageListener接口实现onMessage方法即可,concurrency表示并发处理的监听数量 --> < bean id = "directListener" class = "com.xiangshang.mq.listener.DirectListener" /> < rabbit:listener-container connection-factory = "connectionFactory" message-converter = "gsonConverter" concurrency = "1" acknowledge = "manual" > < rabbit:listener queues = "${rmq.queue.xiangshang.test},${rmq.queue.undurable.xiangshang.test}" ref = "directListener" /> </ rabbit:listener-container > <!-- 监听器 end --> </ beans > |
客户端发送(DirectQueueTest.java示例)
package com.xiangshang.mq.test; import java.util.Random; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.BeansException; import org.springframework.context.ConfigurableapplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class DirectQueueTest { private static String RMQ_QUEUE_XIANGSHANG_TEST; private static String RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST; private static String RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST; private static String RMQ_QUEUE_DELAY_XIANGSHANG_TEST; private static String RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST; private static String RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST; private static String RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST; private static ConfigurableApplicationContext context = null ; private static AmqpTemplate rabbitTemplate = null ; static { RMQ_QUEUE_XIANGSHANG_TEST = "queue.xiangshang.test" ; RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST = "queue.undurable.xiangshang.test" ; RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST = "queue.priority.xiangshang.test" ; RMQ_QUEUE_DELAY_XIANGSHANG_TEST = "queue.delay.xiangshang.test" ; RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST = "exchange.direct.xiangshang.test" ; RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST = "exchange.fanout.xiangshang.test" ; RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST = "exchange.headers.xiangshang.test" ; try { context = new ClassPathXmlApplicationContext( "rabbitConfiguration.xml" ); } catch (BeansException e) { e.printStackTrace(); } rabbitTemplate = context.getBean( "rabbitTemplate" , AmqpTemplate. class ); } public static void main(String[] args) { demo1(); } /** * 对队列发送消息 */ public static void demo1() { String sendMsg = "队列消息." + String.valueOf( new Random().nextDouble()); /* * 默认路由已配置为exchange.direct.xiangshang.test * 发送普通消息:rabbit服务重启后,数据丢失 */ MessageProperties mp1 = new MessageProperties(); mp1.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); Message msg1 = new Message(sendMsg.getBytes(), mp1); rabbitTemplate.convertAndSend(RMQ_QUEUE_XIANGSHANG_TEST, msg1); rabbitTemplate.convertAndSend(RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST, RMQ_QUEUE_XIANGSHANG_TEST, msg1); // 效果同上 rabbitTemplate.convertAndSend(RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, msg1); /* * 发送持久化消息:rabbit服务重启后,持久化队列数据可恢复(默认情况是持久化的) */ rabbitTemplate.convertAndSend(RMQ_QUEUE_XIANGSHANG_TEST, sendMsg); rabbitTemplate.convertAndSend(RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, sendMsg); } /** * 发送广播:此时广播下所有队列均可接收到数据 */ public static void demo2() { String sendMsg = "广播消息." + String.valueOf( new Random().nextDouble()); rabbitTemplate.convertAndSend(RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST, RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, sendMsg); } /** * 优先级队列 */ public static void demo3() { for ( int i = 0 ; i < 10 ; i++) { int priority = new Random().nextInt( 10 ); String sendMsg = "队列消息." + String.valueOf( new Random().nextDouble()) + "[" + priority + "]" ; System.out.println(sendMsg); MessageProperties mp = new MessageProperties(); mp.setPriority(priority); Message msg = new Message(sendMsg.getBytes(), mp); rabbitTemplate.convertAndSend(RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST, msg); } } /** * 延迟消息队列 * 1.当前rabbitMQ没有自动排序的队列,它所支持的队列是无序的 * 2.队列遵守先进先出,因此它执行的顺序是确认第一条数据过期时间为基准,如果第二条过期时间在第一条之前,第二条不会在第一条之前执行 * eg:队列数据过期时间依次为 30s后、10秒后、20秒后,此时必须等到30秒后队列才出数据 * 3.因此,延迟消息功能需要规范以下几点: * 3.1 每个延迟业务申明独有的路由、队列,并标注业务使用说明 * 3.2 延迟队列最好设置消息的最大过期时间,到期后要转发的路由和队列 * 3.3 header采用全部匹配,路由中需匹配与生产端header相同 */ public static void demo4() { for ( int i = 0 ; i < 3 ; i++) { int delay = (i + 1 ) * 2 * 1000 ; String sendMsg = "队列消息." + String.valueOf( new Random().nextDouble()) + "[" + delay + "]" ; System.out.println(sendMsg); MessageProperties mp = new MessageProperties(); mp.setExpiration(String.valueOf(delay)); // 设置消息过期时间 mp.setHeader( "operator" , "xsjf" ); mp.setHeader( "sex" , "male" ); Message msg = new Message(sendMsg.getBytes(), mp); rabbitTemplate.convertAndSend(RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST, "" , msg); } } } |
消息监听(DirectListener.java示例)
package com.xiangshang.mq.listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import com.rabbitmq.client.Channel; public class DirectListener implements ChannelAwareMessageListener { /* (non-Javadoc) * @see org.springframework.amqp.core.MessageListener#onMessage(org.springframework.amqp.core.Message) */ @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println( "DirectListener 获取的消息内容:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); // 返回成功ack // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 返回失败ack,并且数据重新入队 Thread.currentThread().sleep( 1000 ); } }
|
新闻热点
疑难解答