首页 > 学院 > 开发设计 > 正文

3.RabbitMQ教程—特性demo

2019-11-10 17:54:09
字体:
来源:转载
供稿:网友

Go to start of metadata

官方文档资料地址

SPRing地址:http://projects.spring.io/spring-amqp/

Spring提供的Demo地址:https://github.com/spring-projects/spring-amqp-samples

中文文档地址:http://rabbitmq.mr-ping.com/

RabbitMQ的6个特性

"Hello World!"

Work queues(工作队列)

Publish/Subscribe(发布订阅)

Routing(路由)

Topics(主题交换机)

RPC(远程调用)

(此6特性不举例说明,请查阅上述中文文档地址和下载spring的demo理解即可)

围绕几个特点给出demo

队列可恢复性,数据可恢复性

// 生产者public static void main(String[] args) {    String DURABLE_QUEUE_NAME = "queue_for_durable";    String UNDURABLE_QUEUE_NAME = "queue_for_undurable";         ConnectionFactory factory = new ConnectionFactory();    factory.setHost("10.200.0.150");    factory.setPort(5672);    factory.setUsername("root");    factory.setPassWord("123456");    try {        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        // 声明两个队列,第二个参数表示该队列在rabbit服务器重启后是否自动创建        channel.queueDeclare(DURABLE_QUEUE_NAME, truefalsefalsenull);        channel.queueDeclare(UNDURABLE_QUEUE_NAME, falsefalsefalsenull);                 // 数据是否可恢复(声明数据为文本类型,可恢复)        BasicProperties basicProperties = MessageProperties.PERSISTENT_TEXT_PLAIN;                 channel.basicPublish("", DURABLE_QUEUE_NAME, null"文本一".getBytes());        channel.basicPublish("", DURABLE_QUEUE_NAME, basicProperties, "文本二".getBytes());        channel.basicPublish("", UNDURABLE_QUEUE_NAME, null"文本三".getBytes());        channel.close();        connection.close();    catch (IOException e) {        e.printStackTrace();    catch (TimeoutException e) {        e.printStackTrace();    }}

执行上输代码,rabbit服务器会生成2个队列,持久化队列2条消息,临时队列1条消息;在linux服务器执行rabbit服务重启命令:service rabbitmq-server restart,重启后查看队列状态,此时只有1个持久化队列,且只有1条消息  

ack应答机制

// 消费者public static void main(String[] args) {    String DURABLE_QUEUE_NAME = "queue_for_durable";// 持久化队列         ConnectionFactory factory = new ConnectionFactory();    factory.setHost("10.200.0.150");    factory.setPort(5672);    factory.setUsername("root");    factory.setPassword("123456");    try {        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(DURABLE_QUEUE_NAME, truefalsefalsenull);// 创建消费队列(防止消费者先上线无法找到队列而抛出异常)        channel.basicQos(1);// 指定当前接收消息的容量        QueueingConsumer consumer = new QueueingConsumer(channel);                 /*         * 第二个参数如果为true则在接收到消息的一瞬间,rabbit服务器会删除消息         * 为false表明ack应答机制为手动响应,此时消息会存储在rabbit等待应答的队列中,在未收到应答,会重新进入准备状态         */        String resp = channel.basicConsume(DURABLE_QUEUE_NAME, false, consumer);        System.out.println("======:" + resp);                 QueueingConsumer.Delivery delivery = null;        while (null != (delivery = consumer.nextDelivery())) {            String message = new String(delivery.getBody());            System.out.println("获取到消息:" + message);                         // channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 手动回应服务器,当前任务执行成功        }    catch (IOException e) {        e.printStackTrace();    catch (TimeoutException e) {        e.printStackTrace();    catch (ShutdownSignalException e) {        e.printStackTrace();    catch (ConsumerCancelledException e) {        e.printStackTrace();    catch (InterruptedException e) {        e.printStackTrace();    }}

执行程序,此时观察rabbit服务器队列状态,持久化队列中的消息处于<待应答>状态,停止程序,此时rabbit服务器队列数据状态恢复到准备状态;打开注释channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);,执行程序后观察rabbit服务器队列发现数据已被移除;

消息的监听

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="<a href="http://www.springframework.org/schema/beans" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans" xmlns:xsi="<a href="http://www.w3.org/2001/XMLSchema-instance" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="<a href="http://www.springframework.org/schema/rabbit" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/rabbit"    xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd        http://www.springframework.org/schema/beans  <a href="http://www.springframework.org/schema/beans/spring-beans-4.0.xsd" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">    <description>rabbitMQ连接服务配置</description>         <!-- 连接配置,addresses可以配置多个服务器地址,用英文逗号分隔开 -->    <rabbit:connection-factory id="connectionFactory" addresses="10.200.0.150:5672" username="root" password="123456" />    <rabbit:admin connection-factory="connectionFactory" />    <!-- 消息转换器 -->    <bean id="gsonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />    <!-- 消息队列 -->    <rabbit:queue name="queue_for_durable" />    <rabbit:queue name="queue_for_undurable" />         <!-- 连接模板 -->    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="gsonConverter" exchange="directExchange" />    <!-- 客户端发送器:全名匹配 -->    <rabbit:direct-exchange name="directExchange">        <rabbit:bindings>            <rabbit:binding queue="queue_for_durable" key="queue_for_durable" />            <rabbit:binding queue="queue_for_undurable" key="queue_for_undurable" />        </rabbit:bindings>    </rabbit:direct-exchange>    <!-- 消息接收:DirectListener.java继承MessageListener接口实现onMessage方法即可,concurrency表示并发处理的监听数量 -->    <bean id="directListener" class="com.xiangshang.mq.listener.DirectListener" />    <rabbit:listener-container connection-factory="connectionFactory" message-converter="gsonConverter" concurrency="2" acknowledge="auto">        <rabbit:listener queues="queue_for_durable,queue_for_undurable" ref="directListener" />    </rabbit:listener-container></beans>

junit加载上述文件,使用生产者向队列发送消息,此时可以看到监听器中接收到数据(提示:实现ChannelAwareMessageListener接口,listener也可以实现ack机制,此时acknowledge="manual")

  

发布订阅

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="<a href="http://www.springframework.org/schema/beans" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans" xmlns:xsi="<a href="http://www.w3.org/2001/XMLSchema-instance" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="<a href="http://www.springframework.org/schema/rabbit" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/rabbit"    xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd        http://www.springframework.org/schema/beans  <a href="http://www.springframework.org/schema/beans/spring-beans-4.0.xsd" "="" style="color: rgb(59, 115, 175); text-decoration: none; border-radius: 0px !important; background: none !important; border: 0px !important; bottom: auto !important; float: none !important; height: auto !important; left: auto !important; line-height: 20px !important; margin: 0px !important; outline: 0px !important; overflow: visible !important; padding: 0px !important; position: static !important; right: auto !important; top: auto !important; vertical-align: baseline !important; width: auto !important; box-sizing: content-box !important; min-height: auto !important;">http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">    <description>rabbitMQ连接服务配置</description>         <!-- 连接配置,addresses可以配置多个服务器地址,用英文逗号分隔开 -->    <rabbit:connection-factory id="connectionFactory" addresses="10.200.0.150:5672" username="root" password="123456" />    <rabbit:admin connection-factory="connectionFactory" />    <!-- 消息转换器 -->    <bean id="gsonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />    <!-- 消息队列 -->    <rabbit:queue name="queue_for_durable" />    <rabbit:queue name="queue_for_undurable" />         <!-- 连接模板 -->    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="gsonConverter" exchange="fanoutExchange" />    <!-- 发布订阅 -->    <rabbit:fanout-exchange name="fanoutExchange">        <rabbit:bindings>            <rabbit:binding queue="queue_for_durable"  />            <rabbit:binding queue="queue_for_undurable" />        </rabbit:bindings>    </rabbit:fanout-exchange></beans>

junit加载上述文件,使用生产者向队列发送消息,此时在rabbit服务器可以看到发布订阅模式路由下这两个队列,均能收到一条消息 

延迟消息队列(参见下一页整合spring的demo)优先级队列参见下一页整合spring的demo
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表