锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. 【Java】SpringBoot快速整合Kafka

【Java】SpringBoot快速整合Kafka

0
  • JAVA
  • 发布于 2024-08-08
  • 0 次阅读
黄健
黄健

本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net

目录

1. 什么是 Kafka?

主要特点和概念:

主要组成部分:

2.Kafka 可以用来做什么?

3.SpringBoot 整合 Kafka 步骤:

1. 添加依赖:

2. 配置 Kafka:

3. 创建 Kafka 生产者:

4. 创建 Kafka 消费者:

5. 发布消息:

6. 使用 Postman 进行测试:

如果你没有 Kafka,可以参考这篇文章进行安装【Docker】手把手教你使用 Docker 搭建 kafka【详细教程】_docker 安装 kafka-CSDN 博客

1. 什么是 Kafka?

        Kafka 是一个开源的流式平台,用于构建实时数据流应用程序和实时数据管道。Kafka 旨在处理大规模的数据流,具有高吞吐量、可扩展性、持久性和容错性的特点。

主要特点和概念:

  1. 发布 - 订阅模型: Kafka 采用发布 - 订阅模型,数据生产者将消息发布到一个或多个主题(topics),而数据消费者则订阅这些主题以接收消息。

  2. 分布式架构: Kafka 是一个分布式系统,允许横向扩展,通过分布式存储和分区机制来实现高吞吐量和可扩展性。

  3. 持久性存储: Kafka 使用持久性存储来保留消息,可以在消息发送后保留一定的时间,确保消费者可以在需要时检索历史消息。

  4. 数据分区: 主题被划分为多个分区,每个分区可以在不同的服务器上,以实现并行处理和提高性能。

  5. 流式处理: Kafka 提供了流处理功能,允许应用程序实时处理和分析数据流,执行复杂的事件处理操作。

  6. 高可用性: Kafka 在集群中的多个节点之间复制数据,提高了系统的容错性和可用性。

  7. 数据保证: Kafka 提供了不同级别的数据传递保证,包括至多一次、至少一次和精确一次语义。

  8. 生态系统: Kafka 生态系统丰富,包括连接器(Connectors)、Kafka Streams、MirrorMaker 等组件,用于与各种外部系统集成和实现各种应用场景。

主要组成部分:

  • Producer(生产者): 负责向 Kafka 主题发布消息。

  • Broker(代理): Kafka 集群中的服务器,负责存储和管理消息。

  • Consumer(消费者): 订阅并处理 Kafka 主题中的消息。

  • Topic(主题): 消息的类别或标签,生产者将消息发布到主题,而消费者从主题订阅消息。

  • Partition(分区): 主题可以划分为多个分区,每个分区独立存储和处理消息。

2.Kafka 可以用来做什么?

  1. 消息队列:

    场景: 在电子商务平台上,订单服务产生订单消息,并将其发布到 Kafka 主题。支付服务、物流服务等通过订阅相应主题,异步处理订单信息,实现订单处理的解耦和异步通信。

  2. 实时数据流处理:

    场景: 在在线广告平台上,使用 Kafka Streams 处理实时产生的广告点击数据。可以实时计算点击率、过滤无效点击、将数据与用户信息连接,以实现实时广告效果分析。

  3. 日志收集与分析:

    场景: 在一个大规模的云服务中,使用 Kafka 收集分布在不同服务器上的应用程序日志。日志分析服务通过消费 Kafka 主题,实时分析日志以监控系统性能、检测异常和进行故障排除。

  4. 事件溯源(Event Sourcing):

    场景: 在金融领域的交易系统中,使用 Kafka 追踪交易事件。每笔交易引发一个事件,将其发布到 Kafka 主题,以便在需要时进行审计、回溯和重新处理。

  5. 数据同步:

    场景: 在企业的分布式系统中,使用 Kafka 同步用户信息。用户服务在用户数据变更时将事件发布到 Kafka 主题,其他服务通过消费主题以保持用户数据同步。

  6. 消息广播:

    场景: 在社交媒体应用中,使用 Kafka 将用户发布的状态更新广播给其关注者。关注者通过订阅用户状态的 Kafka 主题,实现实时消息广播。

  7. 分布式应用解耦:

    场景: 在电子商务微服务架构中,购物车服务、订单服务、支付服务等通过 Kafka 进行异步通信。例如,购物车服务可以通过 Kafka 发布购物车更新的事件,订单服务通过订阅事件来处理相关订单逻辑。

  8. 大数据集成:

    场景: 在一个大数据处理流水线中,使用 Kafka 将产生的数据传输到 Spark 进行实时分析。生产者将数据发布到 Kafka 主题,而 Spark 应用程序通过订阅主题来接收实时数据。

  9. 实时推荐系统:

    场景: 在在线视频平台上,使用 Kafka 收集用户观看记录。推荐引擎通过消费 Kafka 主题,实时更新用户的个性化推荐列表,提高用户体验。

  10. 异步通信:

场景: 在电商平台中,使用 Kafka 实现异步订单处理。当订单支付成功时,订单服务通过 Kafka 发布订单处理完成的消息,而邮件服务通过订阅该主题来异步发送订单确认邮件。

下面就使用 SpringBoot 整合 kafka 的发布订阅机制,实现消息的发布和订阅。

3.SpringBoot 整合 Kafka 步骤:

1. 添加依赖:

确保在你的 pom.xml 文件中包含了 Spring Boot 和 Spring Kafka 的依赖。

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
 
        <!-- Spring Kafka Starter -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
</dependencies>

2. 配置 Kafka:

在 application.properties 或 application.yml 中配置 Kafka 连接信息。

spring:
  kafka:
    bootstrap-servers: your-kafka-server:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3. 创建 Kafka 生产者:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
 
@Service
public class KafkaMessageProducer {
 
    private static final String TOPIC = "admin-messages";
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    public void sendAdminMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

4. 创建 Kafka 消费者:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
 
@Service
public class KafkaMessageConsumer {
 
    @KafkaListener(topics = "admin-messages")
    public void receiveAdminMessage(String message) {
        System.out.println("Received message: " + message);
        // ...
    }
}

5. 发布消息:

在管理员需要发布消息的地方调用 KafkaMessageProducer 的 sendAdminMessage 方法。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
 
@RestController
@RequestMapping("/message")
public class AdminController {
 
    @Autowired
    private KafkaMessageProducer kafkaMessageProducer;
 
    @GetMapping("/publish")
    public void publishAdminMessage(@RequestParam("message") String message) {
        kafkaMessageProducer.sendAdminMessage(message);
    }
}

        当调用 publishAdminMessage 方法时,所有监听 admin-messages 主题的用户将会接收到相应的消息。

6. 使用 Postman 进行测试:

控制台输出结果:

这样就使用 SpringBoot 整合了 Kafka 并写了一个简单的案例。

标签: #JAVA 991
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.