首页 > 开发 > Java > 正文

spring boot 与kafka集成的示例代码

2024-07-14 08:40:18
字体:
来源:转载
供稿:网友

新建spring boot项目

这里使用intellij IDEA

spring,boot,集成,kafka

spring,boot,集成,kafka

spring,boot,集成,kafka

spring,boot,集成,kafka

添加kafka集成maven

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>com.example</groupId>  <artifactId>demo</artifactId>  <version>0.0.1-SNAPSHOT</version>  <packaging>jar</packaging>  <name>demo</name>  <description>Demo project for Spring Boot</description>  <parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>1.5.8.RELEASE</version>    <relativePath/> <!-- lookup parent from repository -->  </parent>  <properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>    <java.version>1.8</java.version>  </properties>  <dependencies>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-test</artifactId>      <scope>test</scope>    </dependency>  </dependencies>  <build>    <plugins>      <plugin>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-maven-plugin</artifactId>      </plugin>    </plugins>  </build></project>

项目中application.properties 添加

spring.kafka.bootstrap-servers=vm208:9092,vm:9092,vm50:9092spring.kafka.consumer.auto-offset-reset=latestspring.kafka.consumer.group-id=local_testspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.acks=1

新建KafkaConsumer消费类

package com.example.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class KafkaConsumer {  private Logger logger = LoggerFactory.getLogger(this.getClass());  @KafkaListener(topics = {"test"})  public void listen(ConsumerRecord<?, ?> record) {    System.out.printf("offset = %d,key =%s,value=%s/n", record.offset(), record.key(), record.value());  }}

启动spring-boot程序,在kafka集群,模拟发送topic,检验接收

 

复制代码 代码如下:
bin/kafka-console-producer.sh --broker-list    vm208:9092,vm210:9092,vm50:9092  --topic  test

 

编写producer代码

package com.example.demo.producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class KafkaProducer {  @Autowired  private KafkaTemplate kafkaTemplate;  String topic="test";  public void sendMessage(String key,String data){    kafkaTemplate.send(new ProducerRecord(topic,key,data));  }}

建立一个restful模拟发送( //http://localhost:8080/kafka/send.do?key=2&data=allen-test-message)

package com.example.demo.controller;import com.example.demo.producer.KafkaProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class ProducerController {  @Autowired  private KafkaProducer kafkaProducer;  @RequestMapping(value = "/kafka/send.do", method = RequestMethod.GET)  public String sendMessage(@RequestParam(value = "key") String key, @RequestParam(value = "data") String data) {    kafkaProducer.sendMessage(key, data);    return "sucess";  }}

可以发现 spring-kafka大大减少了代码工作量.

官方文档: https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持VeVb武林网。


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表