锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. spring boot 使用 Kafka

spring boot 使用 Kafka

0
  • JAVA
  • 发布于 2024-08-08
  • 0 次阅读
黄健
黄健

本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net

一、Kafka 作为消息队列的好处

  1. 高吞吐量:Kafka 能够处理大规模的数据流,并支持高吞吐量的消息传输。

  2. 持久性:Kafka 将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。

  3. 分布式:Kafka 是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。

  4. 支持多种协议:Kafka 支持多种协议,如 TCP、HTTP、UDP 等,可以与不同的系统进行集成。

  5. 灵活的消费模式:Kafka 支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。

  6. 可配置性强:Kafka 的配置参数非常丰富,可以根据需要进行灵活配置。

  7. 社区支持:Kafka 作为 Apache 旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。

二、springboot 中使用 Kafka

  1. 添加依赖:在 pom.xml 文件中添加 Kafka 的依赖,包括 spring-kafka 和 kafka-clients。确保版本与你的项目兼容。

  2. 创建生产者:创建一个 Kafka 生产者类,实现 Producer 接口,并使用 KafkaTemplate 发送消息。

  3. 配置生产者:在 Spring Boot 的配置文件中配置 Kafka 生产者的相关参数,例如 bootstrap 服务器地址、Kafka 主题等。

  4. 发送消息:在需要发送消息的地方,注入 Kafka 生产者,并使用其发送消息到指定的 Kafka 主题。

  5. 创建消费者:创建一个 Kafka 消费者类,实现 Consumer 接口,并使用 KafkaTemplate 订阅指定的 Kafka 主题。

  6. 配置消费者:在 Spring Boot 的配置文件中配置 Kafka 消费者的相关参数,例如 group id、auto offset reset 等。

  7. 接收消息:在需要接收消息的地方,注入 Kafka 消费者,并使用其接收消息。

  8. 处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。

三、使用 Kafka

pom 中填了依赖

<dependency>  
    <groupId>org.springframework.kafka</groupId>  
    <artifactId>spring-kafka</artifactId>  
    <version>2.8.1</version>  
</dependency>  
<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka-clients</artifactId>  
    <version>2.8.1</version>  
</dependency>
  1. 创建生产者:创建一个 Kafka 生产者类,实现 Producer 接口,并使用 KafkaTemplate 发送消息。
import org.apache.kafka.clients.producer.*;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  
  
@Component  
public class KafkaProducer {  
    @Value("${kafka.bootstrap}")  
    private String bootstrapServers;  
  
    @Value("${kafka.topic}")  
    private String topic;  
  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {  
        this.kafkaTemplate = kafkaTemplate;  
    }  
  
    public void sendMessage(String message) {  
        Producer<String, String> producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer());  
        try {  
            producer.send(new ProducerRecord<>(topic, message));  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            producer.close();  
        }  
    }  
}
  1. 配置生产者:在 Spring Boot 的配置文件中配置 Kafka 生产者的相关参数,例如 bootstrap 服务器地址、Kafka 主题等。
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.core.DefaultKafkaProducerFactory;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.core.ProducerFactory;  
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  
import org.springframework.kafka.core.ConsumerFactory;  
import org.springframework.kafka.core.ConsumerConfig;  
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;  
import org.springframework.kafka.listener.MessageListener;  
import org.springframework.context.annotation.PropertySource;  
import java.util.*;  
import org.springframework.beans.factory.*;  
import org.springframework.*;  
import org.springframework.*;expression.*;value; 																																		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	  @Value("${kafka}")   Properties kafkaProps = new Properties(); @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf){ KafkaTemplate<String, String> template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory<String, String> producerFactory(){ DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory<String, String> consumerFactory(){ DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer<String, String> container(ConsumerFactory<String, String> consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener

消费者

import org.apache.kafka.clients.consumer.*;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  
  
@Component  
public class KafkaConsumer {  
    @Value("${kafka.bootstrap}")  
    private String bootstrapServers;  
  
    @Value("${kafka.group}")  
    private String groupId;  
  
    @Value("${kafka.topic}")  
    private String topic;  
  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public KafkaConsumer(KafkaTemplate<String, String> kafkaTemplate) {  
        this.kafkaTemplate = kafkaTemplate;  
    }  
  
    public void consume() {  
        Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs());  
        consumer.subscribe(Collections.singletonList(topic));  
        while (true) {  
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  
            for (ConsumerRecord<String, String> record : records) {  
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  
            }  
        }  
    }  
  
    private Properties consumerConfigs() {  
        Properties props = new Properties();  
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);  
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  
        return props;  
    }  
}

四、kafka 与 rocketMQ 比较

Kafka 和 RocketMQ 都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:

  1. 数据可靠性:
  • Kafka 使用异步刷盘方式,而 RocketMQ 支持异步实时刷盘、同步刷盘、同步复制和异步复制。这使得 RocketMQ 在单机可靠性上比 Kafka 更高,因为它不会因为操作系统崩溃而导致数据丢失。此外,RocketMQ 新增的同步刷盘机制也进一步保证了数据的可靠性。
  1. 性能:
  • Kafka 和 RocketMQ 在性能方面各有千秋。由于 Kafka 的数据以 partition 为单位,一个 Kafka 实例上可能有多达上百个 partition,而一个 RocketMQ 实例上只有一个 partition。这使得 RocketMQ 可以充分利用 IO 组的 commit 机制,批量传输数据,从而在 replication 时具有更好的性能。然而,Kafka 的异步 replication 性能理论上低于 RocketMQ 的 replication,因为同步 replication 与异步 replication 相比,性能上会有约 20%-30% 的损耗。
  1. 消息传递方式:
  • Kafka 和 RocketMQ 在消息传递方式上也有所不同。Kafka 采用 Producer 发送消息后,broker 马上把消息投递给 consumer,这种方式实时性较高,但会增加 broker 的负载。而 RocketMQ 基于 Pull 模式和 Push 模式的长轮询机制,来平衡 Push 和 Pull 模式各自的优缺点。RocketMQ 的消息及时性较好,严格的消息顺序得到了保证。
  1. 其他特性:
  • Kafka 在单机支持的队列数超过 64 个队列,而 RocketMQ 最高支持 5 万个队列。队列越多,可以支持的业务就越多。

五、kafka 使用场景

  1. 实时数据流处理:Kafka 可以处理大量的实时数据流,这些数据流可以来自不同的源,如用户行为、传感器数据、日志文件等。通过 Kafka,可以将这些数据流进行实时的处理和分析,例如进行实时数据分析和告警。
  2. 消息队列:Kafka 可以作为一个消息队列使用,用于在分布式系统中传递消息。它能够处理高吞吐量的消息,并保证消息的有序性和可靠性。
  3. 事件驱动架构:Kafka 可以作为事件驱动架构的核心组件,将事件数据发布到不同的消费者,以便进行实时处理。这种架构可以简化应用程序的设计和开发,提高系统的可扩展性和灵活性。
  4. 数据管道:Kafka 可以用于数据管道,将数据从一个系统传输到另一个系统。例如,可以将数据从数据库或日志文件传输到大数据平台或数据仓库。
  5. 业务事件通知:Kafka 可以用于通知业务事件,例如订单状态变化、库存更新等。通过订阅 Kafka 主题,相关的应用程序和服务可以实时地接收到这些事件通知,并进行相应的处理。
  6. 流数据处理框架集成:Kafka 可以与流处理框架集成,如 Apache Flink、Apache Spark 等。通过集成,可以将流数据从 Kafka 中实时导入到流处理框架中进行处理,实现流式计算和实时分析。
标签: #JAVA 991
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.