主页 > 创业  > 

docker安装kafka,并通过springboot快速集成kafka

docker安装kafka,并通过springboot快速集成kafka

目录

一、docker安装和配置Kafka

1.拉取 Zookeeper 的 Docker 镜像

2.运行 Zookeeper 容器

3.拉取 Kafka 的 Docker 镜像

4.运行 Kafka 容器

5.下载 Kafdrop

6.运行 Kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下载很慢,可以找一台网络比较好的机器,输入这两个命令进行下载,下载后使用docker save -o保存为tar文件,然后将tar文件传输到目标机器后,使用docker load -i加载tar文件为docker镜像文件

8.使用 Kafka 自带的工具来创建一个名为 users 的主题

9.验证 Kafka,可以使用 Kafka 自带的工具来验证 Kafka 是否正常工作。例如,启动一个 Kafka 消费者来监听 users 主题:

二、在Spring Boot项目中集成和使用Kafka

1. 添加依赖

2. 配置Kafka

3. 创建消息对象

4. 创建生产者

5. 创建消费者

6. 测试

三、web访问Kafdrop


一、docker安装和配置Kafka 1.拉取 Zookeeper 的 Docker 镜像

docker pull wurstmeister/zookeeper

2.运行 Zookeeper 容器

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper

3.拉取 Kafka 的 Docker 镜像

docker pull wurstmeister/kafka

4.运行 Kafka 容器

docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.7.46:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper:zookeeper wurstmeister/kafka

5.下载 Kafdrop

docker pull obsidiandynamics/kafdrop

6.运行 Kafdrop

docker run -p 9000:9000 -d --name kafdrop -e KAFKA_BROKERCONNECT=192.168.7.46:9092 -e JVM_OPTS="-Xms16M -Xmx48M" obsidiandynamics/kafdrop

7.如果docker pull wurstmeister/zookeeper或docker pull wurstmeister/kafka下载很慢,可以找一台网络比较好的机器,输入这两个命令进行下载,下载后使用docker save -o保存为tar文件,然后将tar文件传输到目标机器后,使用docker load -i加载tar文件为docker镜像文件

下载: docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka (kafdrop是一个kafka的web图形管理界面) docker pull obsidiandynamics/kafdrop 打包: docker save -o ./zookeeper.tar wurstmeister/zookeeper docker save -o ./kafka.tar wurstmeister/kafka docker save -o ./kafdrop.tar obsidiandynamics/kafdrop 传输: scp kafka.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可 scp zookeeper.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可 scp kafdrop.tar root@192.168.7.46:/usr/root/kafka 回车后输入密码即可

目标机加载成docker镜像 docker load -i /usr/root/kafka/kafka.tar docker load -i /usr/root/kafka/zookeeper.tar docker load -i /usr/root/kafka/kafdrop.tar 查看镜像列表 docker images

8.使用 Kafka 自带的工具来创建一个名为 users 的主题

docker exec -it kafka kafka-topics.sh --create --topic users --partitions 1 --replication-factor 1 --bootstrap-server 192.168.7.46:9092

9.验证 Kafka,可以使用 Kafka 自带的工具来验证 Kafka 是否正常工作。例如,启动一个 Kafka 消费者来监听 users 主题:

docker exec -it kafka kafka-console-consumer.sh --topic users --from-beginning --bootstrap-server 192.168.7.46:9092

这个命令,会启动一个额外的 Kafka 消费者来监听 users 主题。这个消费者是通过 Kafka 自带的 kafka-console-consumer.sh 工具启动的,主要用于测试和验证目的。它会持续监听并打印出发送到 users 主题的所有消息。

二、在Spring Boot项目中集成和使用Kafka 1. 添加依赖

首先,在你的pom.xml文件中添加Kafka的依赖:

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

2. 配置Kafka

在application.properties或application.yml文件中配置Kafka的相关属性。这里以application.properties为例:

# Kafka broker地址

spring.kafka.bootstrap-servers=localhost:9092

# 生产者配置

spring.kafka.producer.key-serializer=org.apache.kafka mon.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# 消费者配置

spring.kafka.consumer.group-id=my-group

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.key-deserializer=org.apache.kafka mon.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.trusted.packages=*

3. 创建消息对象

假设我们要发送和接收一个简单的KafkaMsgs 对象:

public class KafkaMsgs {

    private String id;

    private String msg;

    private Long date;

    // 构造函数、getter和setter省略

}

4. 创建生产者

创建一个生产者类来发送消息:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Service;

@Service

public class KafkaProducer {

    @Autowired

    private KafkaTemplate<String, KafkaMsgs> kafkaTemplate;

    public void sendMessage(String topic, KafkaMsgs kafkaMsgs) {

        kafkaTemplate.send(topic, kafkaMsgs);

    }

}

5. 创建消费者

创建一个消费者类来接收消息:

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Service;

@Service

public class KafkaConsumer {

    @KafkaListener(topics = "users", groupId = "my-group")

    public void listen(KafkaMsgs kafkaMsgs) {

        System.out.println("Received message: " + kafkaMsgs);

    }

}

6. 测试

你可以创建一个简单的测试类来验证生产和消费是否正常工作:

import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.IdUtil; import com.esop.resurge.core.config.kafka.KafkaProducer; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import lombok.extern.slf4j.Slf4j; import org.airbubble.kingdom.army.reponse.FeedBack; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Api(tags="kafka数据控制器") @RestController @RequestMapping("/kafka") @Slf4j public class KafkaController {     @Autowired     KafkaProducer kafkaProducer;

    @ApiOperation(value = "测试发送数据到kafka", httpMethod = "GET")     @GetMapping(value = "/sendKafkaData")     public FeedBack<String> sendKafkaData(             @ApiParam(value = "topic", required = true) @RequestParam(required = true,value = "topic") String topic,             @ApiParam(value = "msg", required = true) @RequestParam(required = true,value = "msg") String msg     ) throws Exception {         kafkaProducer.sendMessage(topic, new com.esop.resurge.core.config.kafka.KafkaMsgs(                 IdUtil.fastUUID(),                 msg,                 Long.valueOf(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT).replace(" ", "").replace(":", "").replace("-", ""))         ));         return FeedBack.getInstance("发送成功");     }

}

三、web访问Kafdrop

 打开浏览器,访问 http://192.168.7.46:9000,你应该能够看到 Kafdrop 的 Web 界面

标签:

docker安装kafka,并通过springboot快速集成kafka由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“docker安装kafka,并通过springboot快速集成kafka