本文最后更新于:2024年6月28日 中午
                  
                
              
            
            
              
                
                Kafka Stream 基本使用
Apache Kafka Streams 是一款强大的实时流处理库,为构建实时数据处理应用提供了灵活且高性能的解决方案。
Kafka Streams 是 Apache Kafka 生态系统中的一部分,它不仅简化了流处理应用的构建,还提供了强大的功能,如事件时间处理、状态管理、交互式查询等。其核心理念是将流处理与事件日志结合,使应用程序能够实时处理数据流。
1. 前言
由于公司需开发数据清洗服务,而且需要实时性高的数据处理,结合线上数据是输出到kafka,故采用 Kafka Streams 来作为数据清洗服务开发,本编结合一个demo,讲述 Kafka Streams 的基本使用。
Kafka Streams的特点:
- 设计为一个简单而轻量级的客户端库,可以很容易地嵌入到任何 Java 应用程序中,并与用户为其流应用程序提供的任何现有打包、部署和操作工具集成。
 
- 除了 Apache Kafka 本身作为内部消息传递层之外,对系统没有外部依赖关系;值得注意的是,它使用 Kafka 的分区模型来水平扩展处理,同时保持强大的排序保证。
 
- 支持容错本地状态,从而实现非常快速高效的有状态操作,如窗口联接和聚合。
 
- 支持 exact-once 处理语义,以保证每条记录将只处理一次,即使 Streams 客户端或 Kafka 代理在处理过程中出现故障也是如此。
 
- 采用一次一条记录的处理来实现毫秒级处理延迟,并支持基于事件时间的窗口化操作,以及记录的无序到达。
 
- 提供必要的流处理基元,以及高级流 DSL 和低级处理器 API。
 
2. 核心概念
- Stream: 一个无限的、有序的、可重放的、并且可失败的数据记录序列。在Kafka中,一个流可以看作是一个或多个Kafka主题的消息记录。
 
- Stream Processor: 流处理器是对流数据进行处理的逻辑单元。它可以是一个简单的消息转换(例如,增加数据的时间戳),也可以是一个复杂的,如聚合或连接多个流。
 
- Topologies: 流处理拓扑是构成流处理程序的逻辑流程。一个拓扑是由多个处理器节点(处理器和转换器)和源节点(用于读取流数据)和汇节点(用于输出处理后的数据)组成的。
 
- KStream: 主要代表一种记录流,其中每个数据记录代表一个独立的数据实体。
 
- KTable: 表示一个更新流,每个数据记录表示一个表中的行。在更新流中,具有相同键的数据记录会覆盖先前的记录,类似于传统数据库的更新操作。
 
- Global KTable: 与KTable类似,但在所有应用程序实例中都全局可用,并且是只读的。
 
- State Stores: 本地存储,用于存储中间处理状态。状态存储可以是持久化的也可以是非持久化的。它们使得流处理器可以提供有状态的操作。
 
- Windowing: 用于将无限的数据流分成有限的块进行处理。窗口可以是时间驱动的(如固定时间窗口、滑动时间窗口)或基于数据记录数的。
 
- Processor API: 一个低级别的,允许开发人员定义和连接自定义处理器的API。使用该API,开发人员可以控制数据的流动和事件处理的精细细节。
 
- DSL (Domain Specific Language): 高级流DSL是一个构建流处理拓扑的表达式式的API。它提供了一套简单的操作符用于过滤、映射、聚合等操作。
 
详细介绍请查看官方文档:https://kafka.apache.org/37/documentation/streams/core-concepts
3. 基本用法
本例结合官方文档中的示例,输入文本计算单词,用于处理无限的数据流,统计出单词数量输出。
Demo 仓库地址:https://github.com/Gumengyo/kafka-stream-demo

引入依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
   | <dependency>     <groupId>org.apache.kafka</groupId>     <artifactId>kafka-streams</artifactId> </dependency>
  <dependency>     <groupId>org.springframework.kafka</groupId>     <artifactId>spring-kafka</artifactId> </dependency>
  <dependency>     <groupId>org.projectlombok</groupId>     <artifactId>lombok</artifactId>     <optional>true</optional> </dependency>
 
  | 
 
创建Topic:
1 2 3
   | ./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-plaintext-input --replication-factor 1 --partitions 1
  ./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-wordcount-output --replication-factor 1 --partitions 1
 
  | 
 
3.1 结合Spring框架构建Kafka Streams
- 配置文件
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
   | server:   port: 9991 spring:   application:     name: kafka-demo   kafka:     bootstrap-servers: localhost:9092     producer:       retries: 10       key-serializer: org.apache.kafka.common.serialization.StringSerializer       value-serializer: org.apache.kafka.common.serialization.StringSerializer       compression-type: lz4     consumer:       group-id: ${spring.application.name}-test       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  kafka:   hosts: localhost:9092   group: ${spring.application.name}
 
 
  | 
 
- 配置 Kafka Streams
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | @Setter @Getter @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig {     private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;     private String hosts;     private String group;     @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)     public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {         Map<String, Object> props = new HashMap<>();         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);         props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");         props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");         props.put(StreamsConfig.RETRIES_CONFIG, 10);         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());         return new KafkaStreamsConfiguration(props);     } }
 
 
  | 
 
- 常量
 
1 2 3 4 5 6
   | public class KafkaConstants {     public static final String BOOTSTRAP_SERVERS = "localhost:9092";     public static final String INPUT_TOPIC = "streams-plaintext-input";     public static final String OUTPUT_TOPIC = "streams-wordcount-output"; }
 
 
  | 
 
- 创建 KStream
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
   | @Configuration @Slf4j public class KafkaStreamHelloListener {
      @Bean     public KStream<String,String> kStream(StreamsBuilder streamsBuilder){                  KStream<String, String> stream = streamsBuilder.stream(KafkaConstants.INPUT_TOPIC);         stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {             @Override             public Iterable<String> apply(String value) {                 return Arrays.asList(value.split(" "));             }         })                                  .groupBy((key,value)->value)                                  .windowedBy(TimeWindows.of(Duration.ofSeconds(1)))                                  .count()                 .toStream()                                  .map((key,value)->{                     System.out.println("key:"+key+",value:"+value);                     return new KeyValue<>(key.key().toString(),value.toString());                 })                                  .to(KafkaConstants.OUTPUT_TOPIC);         return stream;     } }
 
  | 
 
3.2 自定义配置构建 Kafka Streams
将Demo中 KafkaStreamConfig.java 和 KafkaStreamHelloListener.java 注释掉,
在 SpringBootTest 添加下面代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
   | @SpringBootTest class KafkaStreamDemoApplicationTests {
      @Value("${kafka.hosts}")     private String hosts;     @Value("${kafka.group}")     private String group;
           @Test     void testCreateKStream() throws InterruptedException {         Properties props = new Properties();         props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.group + "_stream_aid");         props.put(StreamsConfig.CLIENT_ID_CONFIG, this.group + "_stream_cid");         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);         props.put(StreamsConfig.RETRIES_CONFIG, 10);         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); 
                   StreamsBuilder streamsBuilder = new StreamsBuilder();         KStream<String, String> stream = streamsBuilder.stream(KafkaConstants.INPUT_TOPIC);                  stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split(" ")))                                  .groupBy((key,value)->value)                                  .windowedBy(TimeWindows.of(Duration.ofSeconds(1)))                                  .count()                 .toStream()                                  .map((key,value)->{                     System.out.println("key:"+key+",value:"+value);                     return new KeyValue<>(key.key().toString(),value.toString());                 })                                  .to(KafkaConstants.OUTPUT_TOPIC);         new CountDownLatch(1).await();     } }
 
 
  | 
 
3.3 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
   | public class ProducerQuickStart {
      public static void main(String[] args) {
                   Properties prop = new Properties();                  prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);                  prop.put(ProducerConfig.RETRIES_CONFIG, 5);                  prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");         
 
 
          prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");                  prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
                   KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);                  ProducerRecord<String, String> producerRecord1 = new ProducerRecord<String, String>(KafkaConstants.INPUT_TOPIC, "key_001", "hello kafka");         ProducerRecord<String, String> producerRecord2 = new ProducerRecord<String, String>(KafkaConstants.INPUT_TOPIC, "key_002", "hello world");                  producer.send(producerRecord1);         producer.send(producerRecord2);
                   producer.close();
      } }
 
  | 
 
执行上面main方法测试发送消息:
hello kafka
hello world
查看kafka 内消息:


可以看到已经正确统计单词结果,输出到topic 为 streams-wordcount-output 中
参考