首页 > 开发 > Java > 正文

消息队列 RabbitMQ 与 Spring 整合使用的实例代码

2024-07-13 10:10:53
字体:
来源:转载
供稿:网友

一、什么是 RabbitMQ

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

RabbitMQ 是由 Erlang 语言开发,安装 RabbitMQ 服务需要先安装 Erlang 语言包。

二、如何与 Spring 集成

1. 我们都需要哪些 Jar 包?

抛开单独使用 Spring 的包不说,引入 RabbitMQ 我们还需要两个:

<!-- RabbitMQ --><dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version></dependency><dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version></dependency>

2. 使用外部参数文件 application.properties:

mq.host=127.0.0.1mq.username=queuemq.password=1234mq.port=8001# 统一XML配置中易变部分的命名mq.queue=test_mq

易变指的是在实际项目中,如果测试与生产环境使用的同一个 RabbitMQ 服务器。那我们在部署时直接修改 properties 文件的参数即可,防止测试与生产环境混淆。

 修改 applicationContext.xml 文件,引入我们创建的 properties 文件

<context:property-placeholder location="classpath:application.properties"/><util:properties id="appConfig" location="classpath:application.properties"></util:properties>

3. 连接 RabbitMQ 服务器

<!-- 连接配置 --><rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"  password="${mq.password}" port="${mq.port}" />  <rabbit:admin connection-factory="connectionFactory"/>

4. 声明一个 RabbitMQ Template

 

复制代码 代码如下:

<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory"  />

 

5. 在 applicationContext.xml 中声明一个交换机,name 属性需配置到 RabbitMQ 服务器。

<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false"> <rabbit:bindings>  <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/> </rabbit:bindings></rabbit:topic-exchange>

交换机的四种模式:

  • direct:转发消息到 routigKey 指定的队列。
  • topic:按规则转发消息(最灵活)。
  • headers:(这个还没有接触到)
  • fanout:转发消息到所有绑定队列

交换器的属性:

  • 持久性:如果启用,交换器将会在server重启前都有效。
  • 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
  • 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。

如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。

一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。

topic 类型交换器通过模式匹配分析消息的 routing-key 属性。它将 routing-key 和 binding-key 的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key:*.stock.#匹配 routing key:usd.stcok 和 eur.stock.db,但是不匹配 stock.nana。

因为交换器是命名实体,声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。

6. 在 applicationContext.xml 中声明一个队列,name 属性是需要配置到 RabbitMQ 服务器的。

 

复制代码 代码如下:

<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />

 

  • durable:是否持久化
  • exclusive:仅创建者可以使用的私有队列,断开后自动删除
  • auto-delete:当所有消费端连接断开后,是否自动删除队列

7. 创建生产者端

import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;/** * @Description: 消息队列发送者 * @Author:  * @CreateTime:  */@Servicepublic class Producer { @Autowired private AmqpTemplate amqpTemplate;  public void sendQueue(String exchange_key, String queue_key, Object object) {  // convertAndSend 将Java对象转换为消息发送至匹配key的交换机中Exchange  amqpTemplate.convertAndSend(exchange_key, queue_key, object); }}

8. 在 applicationContext.xml 中配置监听及消费者端 

<!-- 消费者 --><bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean> <!-- 配置监听 --><rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">  <!--     queues 监听队列,多个用逗号分隔     ref 监听器  -->  <rabbit:listener queues="test_queue" ref="rabbitmqService"/></rabbit:listener-container>

消费者 Java 代码:

import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageListener;public class RabbitmqService implements MessageListener { public void onMessage(Message message) {  System.out.println("消息消费者 = " + message.toString()); }}

至此,我们的所有配置文件就写完了,最终如下:

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xmlns:context="http://www.springframework.org/schema/context"  xmlns:util="http://www.springframework.org/schema/util"  xmlns:aop="http://www.springframework.org/schema/aop"  xmlns:tx="http://www.springframework.org/schema/tx"  xmlns:rabbit="http://www.springframework.org/schema/rabbit"  xmlns:p="http://www.springframework.org/schema/p"  xsi:schemaLocation="  http://www.springframework.org/schema/context  http://www.springframework.org/schema/context/spring-context-3.0.xsd  http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  http://www.springframework.org/schema/util  http://www.springframework.org/schema/util/spring-util-3.0.xsd  http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  http://www.springframework.org/schema/rabbit  http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">       <!-- RabbitMQ start -->  <!-- 连接配置 --> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"  password="${mq.password}" port="${mq.port}" />   <rabbit:admin connection-factory="connectionFactory"/>  <!-- 消息队列客户端 --> <rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />  <!-- queue 队列声明 --> <!--   durable 是否持久化   exclusive 仅创建者可以使用的私有队列,断开后自动删除   auto-delete 当所有消费端连接断开后,是否自动删除队列 --> <rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />  <!-- 交换机定义 --> <!--   交换机:一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。  如果没有队列绑定到交换机上,则发送到该交换机上的信息则会丢失。    direct模式:消息与一个特定的路由器完全匹配,才会转发  topic模式:按规则转发消息,最灵活  --> <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">  <rabbit:bindings>   <!-- 设置消息Queue匹配的pattern (direct模式为key) -->   <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>  </rabbit:bindings> </rabbit:topic-exchange>  <bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean>  <!-- 配置监听 消费者 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">  <!--    queues 监听队列,多个用逗号分隔    ref 监听器 -->  <rabbit:listener queues="test_queue" ref="rabbitmqService"/> </rabbit:listener-container></beans>

9. 如何使用 RabbitMQ 发送一个消息

  @Autowired private Producer producer; @Value("#{appConfig['mq.queue']}") private String queueId;  /**  * @Description: 消息队列  * @Author:   * @CreateTime:   */ @ResponseBody @RequestMapping("/sendQueue") public String testQueue() {  try {   Map<String, Object> map = new HashMap<String, Object>();   map.put("data", "hello rabbitmq");   producer.sendQueue(queueId + "_exchange", queueId + "_patt", map);  } catch (Exception e) {   e.printStackTrace();  }  return "发送完毕"; }

嗯。这个测试是 SpringMVC 框架。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持VeVb武林网。


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表