首页 > 开发 > Java > 正文

Java编程rabbitMQ实现消息的收发

2024-07-13 10:11:50
字体:
来源:转载
供稿:网友

java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

本文不介绍amqp和rabbitmq相关知识,请自行网上查阅

本文是基于spring-rabbit中间件来实现消息的发送接受功能

see http://www.rabbitmq.com/tutorials/tutorial-one-java.html

see http://www.springsource.org/spring-amqp

Java编程通过操作rabbitMQ消息的收发实现代码如下:

<!-- for rabbitmq --> 	<dependency>		<groupId>com.rabbitmq</groupId>		<artifactId>amqp-client</artifactId>		<version>2.8.2</version>	</dependency>	<dependency>		<groupId>org.springframework.amqp</groupId>		<artifactId>spring-amqp</artifactId>		<version>1.1.1.RELEASE</version>	</dependency>	<dependency>		<groupId>org.springframework.amqp</groupId>		<artifactId>spring-rabbit</artifactId>		<version>1.1.1.RELEASE</version>	</dependency>	<dependency>		<groupId>com.caucho</groupId>		<artifactId>hessian</artifactId>		<version>4.0.7</version>	</dependency> </dependencies>

首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象

public class EventMessage implements Serializable{	private String queueName;	private String exchangeName;	private byte[] eventData;	public EventMessage(String queueName, String exchangeName, byte[] eventData) {		this.queueName = queueName;		this.exchangeName = exchangeName;		this.eventData = eventData;	}	public EventMessage() {	}		public String getQueueName() {		return queueName;	}	public String getExchangeName() {		return exchangeName;	}	public byte[] getEventData() {		return eventData;	}	@Override	public String toString() {		return "EopEventMessage [queueName=" + queueName + ", exchangeName="				+ exchangeName + ", eventData=" + Arrays.toString(eventData)				+ "]";	}}

为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂

public interface CodecFactory {	byte[] serialize(Object obj) throws IOException;	Object deSerialize(byte[] in) throws IOException;}

下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式

public class HessionCodecFactory implements CodecFactory {	private final Logger logger = Logger.getLogger(HessionCodecFactory.class);	@Override	public byte[] serialize(Object obj) throws IOException {		ByteArrayOutputStream baos = null;		HessianOutput output = null;		try {			baos = new ByteArrayOutputStream(1024);			output = new HessianOutput(baos);			output.startCall();			output.writeObject(obj);			output.completeCall();		} catch (final IOException ex) {			throw ex;		} finally {			if (output != null) {				try {					baos.close();				} catch (final IOException ex) {					this.logger.error("Failed to close stream.", ex);				}			}		}		return baos != null ? baos.toByteArray() : null;	}	@Override	public Object deSerialize(byte[] in) throws IOException {		Object obj = null;		ByteArrayInputStream bais = null;		HessianInput input = null;		try {			bais = new ByteArrayInputStream(in);			input = new HessianInput(bais);			input.startReply();			obj = input.readObject();			input.completeReply();		} catch (final IOException ex) {			throw ex;		} catch (final Throwable e) {			this.logger.error("Failed to decode object.", e);		} finally {			if (input != null) {				try {					bais.close();				} catch (final IOException ex) {					this.logger.error("Failed to close stream.", ex);  }  } } return obj;	}}

接下来就先实现发送功能,新增一个接口专门用来实现发送功能

public interface EventTemplate {	void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;	void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;}

SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage

public class DefaultEventTemplate implements EventTemplate {	private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);	private AmqpTemplate eventAmqpTemplate;	private CodecFactory defaultCodecFactory;//	private DefaultEventController eec;//	public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,//			CodecFactory defaultCodecFactory, DefaultEventController eec) {//		this.eventAmqpTemplate = eopAmqpTemplate;//		this.defaultCodecFactory = defaultCodecFactory;//		this.eec = eec;//	}	public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {		this.eventAmqpTemplate = eopAmqpTemplate;		this.defaultCodecFactory = defaultCodecFactory;	}	@Override	public void send(String queueName, String exchangeName, Object eventContent)			throws SendRefuseException {		this.send(queueName, exchangeName, eventContent, defaultCodecFactory);	} 	@Override	public void send(String queueName, String exchangeName, Object eventContent,			CodecFactory codecFactory) throws SendRefuseException {		if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {			throw new SendRefuseException("queueName exchangeName can not be empty.");		}//		if (!eec.beBinded(exchangeName, queueName))//			eec.declareBinding(exchangeName, queueName);		byte[] eventContentBytes = null;		if (codecFactory == null) {			if (eventContent == null) {				logger.warn("Find eventContent is null,are you sure...");			} else {				throw new SendRefuseException(						"codecFactory must not be null ,unless eventContent is null");			}		} else {			try {				eventContentBytes = codecFactory.serialize(eventContent);			} catch (IOException e) {				throw new SendRefuseException(e);			}		}		// 构造成Message		EventMessage msg = new EventMessage(queueName, exchangeName,				eventContentBytes);		try {			eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);		} catch (AmqpException e) {			logger.error("send event fail. Event Message : [" + eventContent + "]", e);			throw new SendRefuseException("send event fail", e);		}	}}

注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类

public interface EventProcesser {	public void process(Object e);}

 为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器

/** * MessageListenerAdapter的Pojo * <p>消息处理适配器,主要功能:</p> * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p> * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p> *  */public class MessageAdapterHandler {	private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);	private ConcurrentMap<String, EventProcessorWrap> epwMap;	public MessageAdapterHandler() {		this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();	}	public void handleMessage(EventMessage eem) {		logger.debug("Receive an EventMessage: [" + eem + "]");		// 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值		if (eem == null) {			logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");			return;		}		if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {			logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");			return;		}		// 解码,并交给对应的EventHandle执行		EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());		if (eepw == null) {			logger.warn("Receive an EopEventMessage, but no processor can do it.");			return;		}		try {			eepw.process(eem.getEventData());		} catch (IOException e) {			logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);			return;		}	}	protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {		if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {			throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");		}		EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);		EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);		if (oldProcessorWrap != null) {			logger.warn("The processor of this queue and exchange exists, and the new one can't be add");		}	}	protected Set<String> getAllBinding() {		Set<String> keySet = epwMap.keySet();		return keySet;	}	protected static class EventProcessorWrap {		private CodecFactory codecFactory;		private EventProcesser eep;		protected EventProcessorWrap(CodecFactory codecFactory,				EventProcesser eep) {			this.codecFactory = codecFactory;			this.eep = eep;		}		public void process(byte[] eventData) throws IOException{			Object obj = codecFactory.deSerialize(eventData);			eep.process(obj);		}	}}

这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息

public class MessageErrorHandler implements ErrorHandler{	private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);	@Override	public void handleError(Throwable t) {		logger.error("RabbitMQ happen a error:" + t.getMessage(), t);	}}

接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息

public class EventControlConfig {	private final static int DEFAULT_PORT = 5672;	private final static String DEFAULT_USERNAME = "guest";	private final static String DEFAULT_PASSWORD = "guest";	private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;	private static final int PREFETCH_SIZE = 1;	private String serverHost ;	private int port = DEFAULT_PORT;	private String username = DEFAULT_USERNAME;	private String password = DEFAULT_PASSWORD;	private String virtualHost;	/**	 * 和rabbitmq建立连接的超时时间	 */	private int connectionTimeout = 0;	/**	 * 事件消息处理线程数,默认是 CPU核数 * 2	 */	private int eventMsgProcessNum;	/**	 * 每次消费消息的预取值	 */	private int prefetchSize;		public EventControlConfig(String serverHost) {		this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());	}	public EventControlConfig(String serverHost, int port, String username,			String password, String virtualHost, int connectionTimeout,			int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {		this.serverHost = serverHost;		this.port = port>0?port:DEFAULT_PORT;		this.username = username;		this.password = password;		this.virtualHost = virtualHost;		this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;		this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;		this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;	}	public String getServerHost() {		return serverHost;	}	public int getPort() {		return port;	}	public String getUsername() {		return username;	}	public String getPassword() {		return password;	}	public String getVirtualHost() {		return virtualHost;	}	public int getConnectionTimeout() {		return connectionTimeout;	}	public int getEventMsgProcessNum() {		return eventMsgProcessNum;	}	public int getPrefetchSize() {		return prefetchSize;	}}

具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信

public interface EventController {		/**	 * 控制器启动方法	 */	void start();	/**	 * 获取发送模版	 */	EventTemplate getEopEventTemplate();	/**	 * 绑定消费程序到对应的exchange和queue	 */	EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);	/*in map, the key is queue name, but value is exchange name*/	EventController add(Map<String,String> bindings, EventProcesser eventProcesser);}

它的实现类如下:

/** * 和rabbitmq通信的控制器,主要负责: * <p>1、和rabbitmq建立连接</p> * <p>2、声明exChange和queue以及它们的绑定关系</p> * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p> * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p> * @author yangyong * */public class DefaultEventController implements EventController {	private CachingConnectionFactory rabbitConnectionFactory;	private EventControlConfig config;	private RabbitAdmin rabbitAdmin;	private CodecFactory defaultCodecFactory = new HessionCodecFactory();	private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container	private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();	private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定	//queue cache, key is exchangeName	private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();	//queue cache, key is queueName	private Map<String, Queue> queues = new HashMap<String, Queue>();	//bind relation of queue to exchange cache, value is exchangeName | queueName	private Set<String> binded = new HashSet<String>();	private EventTemplate eventTemplate; // 给App使用的Event发送客户端	private AtomicBoolean isStarted = new AtomicBoolean(false);	private static DefaultEventController defaultEventController;	public synchronized static DefaultEventController getInstance(EventControlConfig config){		if(defaultEventController==null){			defaultEventController = new DefaultEventController(config);		}		return defaultEventController;	}	private DefaultEventController(EventControlConfig config){		if (config == null) {			throw new IllegalArgumentException("Config can not be null.");		}		this.config = config;		initRabbitConnectionFactory();		// 初始化AmqpAdmin		rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);		// 初始化RabbitTemplate		RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);		rabbitTemplate.setMessageConverter(serializerMessageConverter);		eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);	}	/**	 * 初始化rabbitmq连接	 */	private void initRabbitConnectionFactory() {		rabbitConnectionFactory = new CachingConnectionFactory();		rabbitConnectionFactory.setHost(config.getServerHost());		rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());		rabbitConnectionFactory.setPort(config.getPort());		rabbitConnectionFactory.setUsername(config.getUsername());		rabbitConnectionFactory.setPassword(config.getPassword());		if (!StringUtils.isEmpty(config.getVirtualHost())) {			rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());		}	}	/**	 * 注销程序	 */	public synchronized void destroy() throws Exception {		if (!isStarted.get()) {			return;		}		msgListenerContainer.stop();		eventTemplate = null;		rabbitAdmin = null;		rabbitConnectionFactory.destroy();	}	@Override	public void start() {		if (isStarted.get()) {			return;		}		Set<String> mapping = msgAdapterHandler.getAllBinding();		for (String relation : mapping) {			String[] relaArr = relation.split("//|");			declareBinding(relaArr[1], relaArr[0]);		}		initMsgListenerAdapter();		isStarted.set(true);	}	/**	 * 初始化消息监听器容器	 */	private void initMsgListenerAdapter(){		MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);		msgListenerContainer = new SimpleMessageListenerContainer();		msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);		msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);		msgListenerContainer.setMessageListener(listener);		msgListenerContainer.setErrorHandler(new MessageErrorHandler());		msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值		msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());		msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数		msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()]));		msgListenerContainer.start();	}	@Override	public EventTemplate getEopEventTemplate() {		return eventTemplate;	}	@Override	public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {		return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);	}		public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {		msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);		if(isStarted.get()){			initMsgListenerAdapter();		}		return this;	}	@Override	public EventController add(Map<String, String> bindings,			EventProcesser eventProcesser) {		return add(bindings, eventProcesser,defaultCodecFactory);	}	public EventController add(Map<String, String> bindings,			EventProcesser eventProcesser, CodecFactory codecFactory) {		for(Map.Entry<String, String> item: bindings.entrySet()) 			msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);		return this;	}	/**	 * exchange和queue是否已经绑定	 */	protected boolean beBinded(String exchangeName, String queueName) {		return binded.contains(exchangeName+"|"+queueName);	}	/**	 * 声明exchange和queue已经它们的绑定关系	 */	protected synchronized void declareBinding(String exchangeName, String queueName) {		String bindRelation = exchangeName+"|"+queueName;		if (binded.contains(bindRelation)) return;				boolean needBinding = false;		DirectExchange directExchange = exchanges.get(exchangeName);		if(directExchange == null) {			directExchange = new DirectExchange(exchangeName, true, false, null);			exchanges.put(exchangeName, directExchange);			rabbitAdmin.declareExchange(directExchange);//声明exchange			needBinding = true;		}		Queue queue = queues.get(queueName);		if(queue == null) {			queue = new Queue(queueName, true, false, false);			queues.put(queueName, queue);			rabbitAdmin.declareQueue(queue);	//声明queue			needBinding = true;		}		if(needBinding) {			Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange			rabbitAdmin.declareBinding(binding);//声明绑定关系			binded.add(bindRelation);		}	}}

搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO

@SuppressWarnings("serial")public class People implements Serializable{	private int id;	private String name;	private boolean male;	private People spouse;	private List<People> friends;	public int getId() {		return id;	}	public void setId(int id) {		this.id = id;	}	public String getName() {		return name;	}	public void setName(String name) {		this.name = name;	}	public boolean isMale() {		return male;	}	public void setMale(boolean male) {		this.male = male;	}	public People getSpouse() {		return spouse;	}	public void setSpouse(People spouse) {		this.spouse = spouse;	}	public List<People> getFriends() {		return friends;	}	public void setFriends(List<People> friends) {		this.friends = friends;	}	@Override	public String toString() {		// TODO Auto-generated method stub		return "People[id="+id+",name="+name+",male="+male+"]";	}}

建立单元测试

public class RabbitMqTest{	private String defaultHost = "127.0.0.1";	private String defaultExchange = "EXCHANGE_DIRECT_TEST";	private String defaultQueue = "QUEUE_TEST";	private DefaultEventController controller;	private EventTemplate eventTemplate;	@Before	public void init() throws IOException{		EventControlConfig config = new EventControlConfig(defaultHost);		controller = DefaultEventController.getInstance(config);		eventTemplate = controller.getEopEventTemplate();		controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());		controller.start();	}	@Test	public void sendString() throws SendRefuseException{		eventTemplate.send(defaultQueue, defaultExchange, "hello world");	}		@Test	public void sendObject() throws SendRefuseException{		eventTemplate.send(defaultQueue, defaultExchange, mockObj());	}		@Test	public void sendTemp() throws SendRefuseException, InterruptedException{		String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange		String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue		eventTemplate.send(tempQueue, tempExchange, mockObj());		//发送成功后此时不会接受到消息,还需要绑定对应的消费程序		controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());	}		@After	public void end() throws InterruptedException{		Thread.sleep(2000);	}		private People mockObj(){		People jack = new People();		jack.setId(1);		jack.setName("JACK");		jack.setMale(true);				List<People> friends = new ArrayList<>();		friends.add(jack);		People hanMeiMei = new People();		hanMeiMei.setId(1);		hanMeiMei.setName("韩梅梅");		hanMeiMei.setMale(false);		hanMeiMei.setFriends(friends);				People liLei = new People();		liLei.setId(2);		liLei.setName("李雷");		liLei.setMale(true);		liLei.setFriends(friends);		liLei.setSpouse(hanMeiMei);		hanMeiMei.setSpouse(liLei);		return hanMeiMei;	}	class ApiProcessEventProcessor implements EventProcesser{		@Override		public void process(Object e) {//消费程序这里只是打印信息			Assert.assertNotNull(e);			System.out.println(e);			if(e instanceof People){				People people = (People)e;				System.out.println(people.getSpouse());				System.out.println(people.getFriends());			}		}	}}

源码地址请点击这里

总结

以上就是本文关于java实现rabbitmq消息的发送接受的全部内容,希望对大家有所帮助。

感谢大家对本站的支持。


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表