首页 > 开发 > Java > 正文

spring boot整合spring-kafka实现发送接收消息实例代码

2024-07-13 10:09:25
字体:
来源:转载
供稿:网友

前言

由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用.

没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.

实现方法

pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>  <groupId>org.linuxsogood.sync</groupId> <artifactId>linuxsogood-sync</artifactId> <version>1.0.0-SNAPSHOT</version>  <parent>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-parent</artifactId>  <version>1.4.0.RELEASE</version> </parent>  <properties>  <java.version>1.8</java.version>  <!-- 依赖版本 -->  <mybatis.version>3.3.1</mybatis.version>  <mybatis.spring.version>1.2.4</mybatis.spring.version>  <mapper.version>3.3.6</mapper.version>  <pagehelper.version>4.1.1</pagehelper.version> </properties>  <dependencies>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-web</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-jdbc</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-aop</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-freemarker</artifactId>  </dependency>  <!--<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-integration</artifactId>   <scope>compile</scope>  </dependency>  <dependency>   <groupId>org.springframework.integration</groupId>   <artifactId>spring-integration-kafka</artifactId>   <version>2.0.1.RELEASE</version>   <scope>compile</scope>  </dependency>  <dependency>   <groupId>org.springframework.integration</groupId>   <artifactId>spring-integration-core</artifactId>   <version>4.3.1.RELEASE</version>   <scope>compile</scope>  </dependency>-->  <dependency>   <groupId>org.springframework.kafka</groupId>   <artifactId>spring-kafka</artifactId>   <version>1.1.0.RELEASE</version>  </dependency>  <!--<dependency>   <groupId>org.springframework.kafka</groupId>   <artifactId>spring-kafka-test</artifactId>   <version>1.1.0.RELEASE</version>  </dependency>-->  <dependency>   <groupId>junit</groupId>   <artifactId>junit</artifactId>   <version>4.12</version>   <scope>test</scope>  </dependency>  <dependency>   <groupId>org.assertj</groupId>   <artifactId>assertj-core</artifactId>   <version>3.5.2</version>  </dependency>  <dependency>   <groupId>org.hamcrest</groupId>   <artifactId>hamcrest-all</artifactId>   <version>1.3</version>   <scope>test</scope>  </dependency>  <dependency>   <groupId>org.mockito</groupId>   <artifactId>mockito-all</artifactId>   <version>1.9.5</version>   <scope>test</scope>  </dependency>  <dependency>   <groupId>org.springframework</groupId>   <artifactId>spring-test</artifactId>   <version>4.2.3.RELEASE</version>   <scope>test</scope>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-test</artifactId>   <scope>test</scope>  </dependency>  <dependency>   <groupId>mysql</groupId>   <artifactId>mysql-connector-java</artifactId>  </dependency>  <dependency>   <groupId>com.microsoft.sqlserver</groupId>   <artifactId>sqljdbc4</artifactId>   <version>4.0.0</version>  </dependency>  <dependency>   <groupId>com.alibaba</groupId>   <artifactId>druid</artifactId>   <version>1.0.11</version>  </dependency>   <!--Mybatis-->  <dependency>   <groupId>org.mybatis</groupId>   <artifactId>mybatis</artifactId>   <version>${mybatis.version}</version>  </dependency>  <dependency>   <groupId>org.mybatis</groupId>   <artifactId>mybatis-spring</artifactId>   <version>${mybatis.spring.version}</version>  </dependency>  <!--<dependency>   <groupId>org.mybatis.spring.boot</groupId>   <artifactId>mybatis-spring-boot-starter</artifactId>   <version>1.1.1</version>  </dependency>-->  <!-- Mybatis Generator -->  <dependency>   <groupId>org.mybatis.generator</groupId>   <artifactId>mybatis-generator-core</artifactId>   <version>1.3.2</version>   <scope>compile</scope>   <optional>true</optional>  </dependency>  <!--分页插件-->  <dependency>   <groupId>com.github.pagehelper</groupId>   <artifactId>pagehelper</artifactId>   <version>${pagehelper.version}</version>  </dependency>  <!--通用Mapper-->  <dependency>   <groupId>tk.mybatis</groupId>   <artifactId>mapper</artifactId>   <version>${mapper.version}</version>  </dependency>  <dependency>   <groupId>com.alibaba</groupId>   <artifactId>fastjson</artifactId>   <version>1.2.17</version>  </dependency> </dependencies> <repositories>  <repository>   <id>repo.spring.io.milestone</id>   <name>Spring Framework Maven Milestone Repository</name>   <url>https://repo.spring.io/libs-milestone</url>  </repository> </repositories> <build>  <finalName>mybatis_generator</finalName>  <plugins>   <plugin>    <groupId>org.mybatis.generator</groupId>    <artifactId>mybatis-generator-maven-plugin</artifactId>    <version>1.3.2</version>    <configuration>     <verbose>true</verbose>     <overwrite>true</overwrite>    </configuration>   </plugin>   <plugin>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-maven-plugin</artifactId>    <configuration>     <mainClass>org.linuxsogood.sync.Starter</mainClass>    </configuration>   </plugin>  </plugins> </build></project>

orm层使用了MyBatis,又使用了通用Mapper和分页插件.

kafka消费端配置

import org.linuxsogood.sync.listener.Listener;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap;import java.util.Map; @Configuration@EnableKafkapublic class KafkaConsumerConfig {  @Value("${kafka.broker.address}") private String brokerAddress;  @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; }  @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }  @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; }  @Bean public Listener listener() { return new Listener(); }}

生产者的配置.

import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap;import java.util.Map; @Configuration@EnableKafkapublic class KafkaProducerConfig {  @Value("${kafka.broker.address}") private String brokerAddress;  @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); }  @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; }  @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); }}

监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.

在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式

import com.alibaba.fastjson.JSON;import org.linuxsogood.qilian.enums.CupMessageType;import org.linuxsogood.qilian.kafka.MessageWrapper;import org.linuxsogood.qilian.model.store.Store;import org.linuxsogood.sync.mapper.StoreMapper;import org.linuxsogood.sync.model.StoreExample;import org.apache.commons.lang3.StringUtils;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.annotation.KafkaListener;import java.util.List;import java.util.Optional;public class Listener { private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class); @Autowired private StoreMapper storeMapper; /**  * 监听kafka消息,如果有消息则消费,同步数据到新烽火的库  * @param record 消息实体bean  */ @KafkaListener(topics = "linuxsogood-topic", group = "sync-group") public void listen(ConsumerRecord<?, ?> record) {  Optional<?> kafkaMessage = Optional.ofNullable(record.value());  if (kafkaMessage.isPresent()) {   Object message = kafkaMessage.get();   try {    MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);    CupMessageType type = messageWrapper.getType();    //判断消息的数据类型,不同的数据入不同的表    if (CupMessageType.STORE == type) {     proceedStore(messageWrapper);    }   } catch (Exception e) {    LOGGER.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}",message.toString(),e);   }  } } /**  * 消息是店铺类型,店铺消息处理入库  * @param messageWrapper 从kafka中得到的消息  */ private void proceedStore(MessageWrapper messageWrapper) {  Object data = messageWrapper.getData();  Store cupStore = JSON.parseObject(data.toString(), Store.class);  StoreExample storeExample = new StoreExample();  String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();  storeExample.createCriteria().andStoreNameEqualTo(storeName);  List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample);  org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();  org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);  //如果查询不到记录则新增  if (stores.size() == 0) {   storeMapper.insert(store);  } else {   store.setStoreId(stores.get(0).getStoreId());   storeMapper.updateByPrimaryKey(store);  } }}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对VeVb武林网的支持。


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表