首页 > 学院 > 开发设计 > 正文

4.RabbitMQ使用规范&整合spring

2019-11-09 19:26:37
字体:
来源:转载
供稿:网友

特性规范 

sPRing(建议4.2以上,mq支持注解)+RabbitMQ1.6.6默认配置的连接是5个,当rabbit服务器重启后,会自动重连所有队列、路由默认是持久化的,当rabbit服务器重启后,会自动恢复创建优先级队列级别设置建议为10(权重越大表示优先执行)延迟队列延迟时间最大建议设置为???所有接收器设置其ack为手动响应,且在listener使用后必须回写,防止队列阻塞所有数据发送采用字符串,且存储数据信息尽量小于10K(已配置使用默认的json转换器)队列、路由在创建后,属性不可更改,如有需要,需申请运维协助(建议使用新的队列,旧的队列数据处理完后可以删除)

 

命名规范

#命名规范:容器名称.[队列特点or路由特点].使用的平台名称.作用#@容器名称:queue、exchange#@队列特点:非持久化标记(undurable)、延时队列(delay)、优先级队列(priority)#@路由特点:direct、topic、fanout、headers#@使用的平台名称:xiangshang、xiangqian……#@作用:干什么的#eg:消息队列(queue.xiangshang.message)、延时消息队列(queue.delay.xiangshang.message)、普通路由(exchange.direct.xiangshang.common)、通用路由(exchange.direct.xiangshang.common)

 

整合Spring

大纲只需以下几步,即可实现测试,测试环境xshell账号(服务器:10.200.0.150       用户名:root           密码:xs360..COM)rabbitMQ管理平台(http://10.200.0.150:15672/         用户名:guest  密码:guest  连接端口:5672) 项目结构  

依赖的jar

<properties>    <spring.amqp.version>1.6.6.RELEASE</spring.amqp.version></properties><dependencies>    <dependency>        <groupId>org.springframework.amqp</groupId>        <artifactId>spring-amqp</artifactId>        <version>${spring.amqp.version}</version>        <exclusions>            <exclusion>                <groupId>org.springframework</groupId>                <artifactId>spring-core</artifactId>            </exclusion>        </exclusions>    </dependency>    <dependency>        <groupId>org.springframework.amqp</groupId>        <artifactId>spring-rabbit</artifactId>        <version>${spring.amqp.version}</version>        <exclusions>            <exclusion>                <groupId>org.springframework</groupId>                <artifactId>spring-core</artifactId>            </exclusion>            <exclusion>                <groupId>org.springframework</groupId>                <artifactId>spring-messaging</artifactId>            </exclusion>            <exclusion>                <groupId>org.springframework</groupId>                <artifactId>spring-tx</artifactId>            </exclusion>            <exclusion>                <groupId>org.springframework</groupId>                <artifactId>spring-context</artifactId>            </exclusion>        </exclusions>    </dependency></dependencies>

连接配置(rabbitmq-config.properties)

#RabbitMQ服务器工厂参数设置rmq.addresses=10.200.0.150:5672rmq.username=rootrmq.passWord=123456#命名规范:容器名称.[队列特点or路由特点].使用的平台名称.作用#@容器名称:queue、exchange#@队列特点:非持久化标记(undurable)、延时队列(delay)、优先级队列(priority)#@路由特点:direct、topic、fanout、headers#@使用的平台名称:xiangshang、xiangqian……#@作用:干什么的#eg:消息队列(queue.xiangshang.message)、延时消息队列(queue.delay.xiangshang.message)、普通路由(exchange.direct.xiangshang.common)、通用路由(exchange.direct.xiangshang.common)rmq.queue.xiangshang.test=queue.xiangshang.testrmq.queue.undurable.xiangshang.test=queue.undurable.xiangshang.testrmq.queue.priority.xiangshang.test=queue.priority.xiangshang.testrmq.queue.delay.xiangshang.test=queue.delay.xiangshang.testrmq.exchange.direct.xiangshang.test=exchange.direct.xiangshang.testrmq.exchange.fanout.xiangshang.test=exchange.fanout.xiangshang.testrmq.exchange.headers.xiangshang.test=exchange.headers.xiangshang.test

xml配置(rabbitConfiguration.xml)

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="<a href="http://www.springframework.org/schema/beans" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans" xmlns:xsi="<a href="http://www.w3.org/2001/XMLSchema-instance" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="<a href="http://www.springframework.org/schema/rabbit" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/rabbit"xmlns:context="<a href="http://www.springframework.org/schema/context" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/context"    xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd        http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans-4.0.xsd        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd ">    <description>rabbitMQ连接服务配置</description>         <context:property-placeholder location="classpath:rabbitmq-config.properties"/>         <!-- 连接配置 -->    <rabbit:connection-factory id="connectionFactory" addresses="${rmq.addresses}" username="${rmq.username}" password="${rmq.password}" />    <rabbit:admin connection-factory="connectionFactory" />    <!-- 消息转换器 -->    <bean id="gsonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />    <!-- 连接模板 -->    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="gsonConverter" exchange="${rmq.exchange.direct.xiangshang.test}" />    <!-- 消息队列  begin -->    <rabbit:queue name="${rmq.queue.xiangshang.test}" durable="true" />    <rabbit:queue name="${rmq.queue.undurable.xiangshang.test}" durable="false" />    <rabbit:queue name="${rmq.queue.priority.xiangshang.test}" durable="false">        <rabbit:queue-arguments>            <entry key="x-max-priority">                <value type="java.lang.Integer">10</value>            </entry>        </rabbit:queue-arguments>    </rabbit:queue>    <rabbit:queue name="${rmq.queue.delay.xiangshang.test}">        <rabbit:queue-arguments>            <entry key="x-message-ttl">                <value type="java.lang.Long">60000</value>            </entry>            <entry key="x-dead-letter-exchange" value="${rmq.exchange.direct.xiangshang.test}"/>            <entry key="x-dead-letter-routing-key" value="${rmq.queue.xiangshang.test}"/>        </rabbit:queue-arguments>    </rabbit:queue>    <!-- 消息队列 end -->    <!-- 消息路由  begin -->    <rabbit:direct-exchange name="${rmq.exchange.direct.xiangshang.test}">        <rabbit:bindings>            <rabbit:binding queue="${rmq.queue.xiangshang.test}" key="${rmq.queue.xiangshang.test}" />            <rabbit:binding queue="${rmq.queue.undurable.xiangshang.test}" key="${rmq.queue.undurable.xiangshang.test}" />            <rabbit:binding queue="${rmq.queue.priority.xiangshang.test}" key="${rmq.queue.priority.xiangshang.test}" />            <rabbit:binding queue="${rmq.queue.delay.xiangshang.test}" key="${rmq.queue.delay.xiangshang.test}" />        </rabbit:bindings>    </rabbit:direct-exchange>         <!-- 发布订阅 -->    <rabbit:fanout-exchange name="${rmq.exchange.fanout.xiangshang.test}">        <rabbit:bindings>            <rabbit:binding queue="${rmq.queue.xiangshang.test}"  />            <rabbit:binding queue="${rmq.queue.undurable.xiangshang.test}" />        </rabbit:bindings>    </rabbit:fanout-exchange>         <!-- 路由转发 -->    <rabbit:headers-exchange name="${rmq.exchange.headers.xiangshang.test}">        <rabbit:bindings>            <rabbit:binding queue="${rmq.queue.delay.xiangshang.test}">                <rabbit:binding-arguments>                    <entry key="x-match" value="all"/>                    <entry key="Operator" value="xsjf"/>                    <entry key="sex" value="male"/>                </rabbit:binding-arguments>            </rabbit:binding>        </rabbit:bindings>    </rabbit:headers-exchange>    <!-- 消息路由  end -->         <!-- 监听器  begin -->    <!-- 消息接收:DirectListener.java继承MessageListener接口实现onMessage方法即可,concurrency表示并发处理的监听数量 -->    <bean id="directListener" class="com.xiangshang.mq.listener.DirectListener" />    <rabbit:listener-container connection-factory="connectionFactory" message-converter="gsonConverter" concurrency="1" acknowledge="manual">        <rabbit:listener queues="${rmq.queue.xiangshang.test},${rmq.queue.undurable.xiangshang.test}" ref="directListener" />    </rabbit:listener-container>    <!-- 监听器  end --></beans>

客户端发送(DirectQueueTest.java示例)

package com.xiangshang.mq.test;import java.util.Random;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessageProperties;import org.springframework.beans.BeansException;import org.springframework.context.ConfigurableapplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class DirectQueueTest {    private static String RMQ_QUEUE_XIANGSHANG_TEST;    private static String RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST;    private static String RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST;    private static String RMQ_QUEUE_DELAY_XIANGSHANG_TEST;         private static String RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST;    private static String RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST;    private static String RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST;         private static ConfigurableApplicationContext context = null;    private static AmqpTemplate rabbitTemplate = null;         static {        RMQ_QUEUE_XIANGSHANG_TEST = "queue.xiangshang.test";        RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST = "queue.undurable.xiangshang.test";        RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST = "queue.priority.xiangshang.test";        RMQ_QUEUE_DELAY_XIANGSHANG_TEST = "queue.delay.xiangshang.test";                 RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST = "exchange.direct.xiangshang.test";        RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST = "exchange.fanout.xiangshang.test";        RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST = "exchange.headers.xiangshang.test";                 try {            context = new ClassPathXmlApplicationContext("rabbitConfiguration.xml");        catch (BeansException e) {            e.printStackTrace();        }        rabbitTemplate = context.getBean("rabbitTemplate", AmqpTemplate.class);    }         public static void main(String[] args) {        demo1();    }         /**     * 对队列发送消息     */    public static void demo1() {        String sendMsg = "队列消息." + String.valueOf(new Random().nextDouble());                 /*         * 默认路由已配置为exchange.direct.xiangshang.test         * 发送普通消息:rabbit服务重启后,数据丢失         */        MessageProperties mp1 = new MessageProperties();        mp1.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);        Message msg1 = new Message(sendMsg.getBytes(), mp1);        rabbitTemplate.convertAndSend(RMQ_QUEUE_XIANGSHANG_TEST, msg1);        rabbitTemplate.convertAndSend(RMQ_EXCHANGE_DIRECT_XIANGSHANG_TEST, RMQ_QUEUE_XIANGSHANG_TEST, msg1);// 效果同上        rabbitTemplate.convertAndSend(RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, msg1);                 /*         *  发送持久化消息:rabbit服务重启后,持久化队列数据可恢复(默认情况是持久化的)         */        rabbitTemplate.convertAndSend(RMQ_QUEUE_XIANGSHANG_TEST, sendMsg);        rabbitTemplate.convertAndSend(RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, sendMsg);    }         /**     * 发送广播:此时广播下所有队列均可接收到数据     */    public static void demo2() {        String sendMsg = "广播消息." + String.valueOf(new Random().nextDouble());        rabbitTemplate.convertAndSend(RMQ_EXCHANGE_FANOUT_XIANGSHANG_TEST, RMQ_QUEUE_UNDURABLE_XIANGSHANG_TEST, sendMsg);    }         /**     * 优先级队列     */    public static void demo3() {        for (int i = 0; i < 10; i++) {            int priority = new Random().nextInt(10);            String sendMsg = "队列消息." + String.valueOf(new Random().nextDouble()) + "[" + priority + "]";            System.out.println(sendMsg);            MessageProperties mp = new MessageProperties();            mp.setPriority(priority);            Message msg = new Message(sendMsg.getBytes(), mp);            rabbitTemplate.convertAndSend(RMQ_QUEUE_PRIORITY_XIANGSHANG_TEST, msg);        }    }         /**     * 延迟消息队列     * 1.当前rabbitMQ没有自动排序的队列,它所支持的队列是无序的     * 2.队列遵守先进先出,因此它执行的顺序是确认第一条数据过期时间为基准,如果第二条过期时间在第一条之前,第二条不会在第一条之前执行     *   eg:队列数据过期时间依次为 30s后、10秒后、20秒后,此时必须等到30秒后队列才出数据     * 3.因此,延迟消息功能需要规范以下几点:     *   3.1 每个延迟业务申明独有的路由、队列,并标注业务使用说明     *   3.2 延迟队列最好设置消息的最大过期时间,到期后要转发的路由和队列     *   3.3 header采用全部匹配,路由中需匹配与生产端header相同     */    public static void demo4() {        for (int i = 0; i < 3; i++) {            int delay = (i + 1) * 2 1000;            String sendMsg = "队列消息." + String.valueOf(new Random().nextDouble()) + "[" + delay + "]";            System.out.println(sendMsg);                         MessageProperties mp = new MessageProperties();            mp.setExpiration(String.valueOf(delay));// 设置消息过期时间                         mp.setHeader("operator""xsjf");            mp.setHeader("sex""male");                         Message msg = new Message(sendMsg.getBytes(), mp);            rabbitTemplate.convertAndSend(RMQ_EXCHANGE_HEADERS_XIANGSHANG_TEST, "", msg);        }    }}

消息监听(DirectListener.java示例)

package com.xiangshang.mq.listener;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import com.rabbitmq.client.Channel; public class DirectListener implements ChannelAwareMessageListener {    /* (non-Javadoc)     * @see org.springframework.amqp.core.MessageListener#onMessage(org.springframework.amqp.core.Message)     */    @Override    public void onMessage(Message message, Channel channel) throws Exception {        System.out.println("DirectListener 获取的消息内容:" new String(message.getBody()));        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 返回成功ack        // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 返回失败ack,并且数据重新入队        Thread.currentThread().sleep(1000);    }}

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表