上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【Spring连载】使用Spring访问 Apache Kafka(十九)----Apache Kafka Streams支持

guduadmin501月前

【Spring连载】使用Spring访问 Apache Kafka(十九)----Apache Kafka Streams支持

  • 一、基础Basics
  • 二、Spring管理
  • 三、KafkaStreams Micrometer支持
  • 四、Streams JSON 序列化和反序列化
  • 五、使用Kafka流分支器KafkaStreamBrancher
  • 六、配置Configuration
  • 七、Header Enricher
  • 八、MessagingProcessor
  • 九、从反序列化异常中恢复Recovery from Deserialization Exceptions
  • 十、Kafka Streams示例

    从1.1.4版本开始,Spring for Apache Kafka为Kafka Streams提供了非常好的支持。要从Spring应用程序中使用它,kafka-streams jar必须存在于类路径中。它是Spring for Apache Kafka项目的一个可选依赖项,不会自动下载。

    一、基础Basics

    参考Apache Kafka Streams文档建议使用该API的方法如下:

    // Use the builders to define the actual processing topology, e.g. to specify
    // from which input topics to read, which stream operations (filter, map, etc.)
    // should be called, and so on.
    StreamsBuilder builder = ...;  // when using the Kafka Streams DSL
    // Use the configuration to tell your application where the Kafka cluster is,
    // which serializers/deserializers to use by default, to specify security settings,
    // and so on.
    StreamsConfig config = ...;
    KafkaStreams streams = new KafkaStreams(builder, config);
    // Start the Kafka Streams instance
    streams.start();
    // Stop the Kafka Streams instance
    streams.close();
    

    因此,我们有两个主要组件:

    • StreamsBuilder:使用API构建KStream(或KTable)实例。
    • KafkaStreams:管理这些实例的生命周期。

      被单个StreamsBuilder暴露给KafkaStreams实例的所有KStream实例都会同时启动和停止,即使它们具有不同的逻辑。换句话说,StreamsBuilder定义的所有流都与单个生命周期控件绑定。一旦一个KafkaStreams实例被streams.close()关闭,它就无法重新启动。因此,必须创建一个新的KafkaStreams实例来重新启动流处理。

      二、Spring管理

      为了从Spring应用程序上下文的角度简化使用Kafka Streams,并使用容器进行生命周期管理,Spring for Apache Kafka引入了StreamsBuilderFactoryBean。这是一个AbstractFactoryBean实现,用于将单例StreamsBuilder公开为bean。以下示例创建了这样一个bean:

      @Bean
      public FactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
          return new StreamsBuilderFactoryBean(streamsConfig);
      }
      

      从版本2.2开始,流配置现在作为KafkaStreamsConfiguration对象而不是StreamsConfig提供。

      StreamsBuilderFactoryBean还实现了SmartLifecycle来管理内部KafkaStreams实例的生命周期。与Kafka Streams API类似,在启动KafkaStreams之前,必须定义KStream实例。这也适用于Kafka Streams的Spring API。因此,当在StreamsBuilderFactoryBean上使用默认的autoStartup = true时,必须在刷新应用程序上下文之前在StreamsBuilder上声明KStream实例。例如,KStream可以是一个常规的bean定义,而Kafka Streams API的使用也没有任何影响。以下示例显示了如何执行此操作:

      @Bean
      public KStream kStream(StreamsBuilder kStreamBuilder) {
          KStream stream = kStreamBuilder.stream(STREAMING_TOPIC1);
          // Fluent KStream API
          return stream;
      }
      

      如果你想手动控制生命周期(例如,根据某些条件停止和启动),可以使用工厂bean 加(&)前缀直接引用StreamsBuilderFactoryBean bean。由于StreamsBuilderFactoryBean使用其内部KafkaStreams实例,因此可以安全地停止并重新启动它。每个start()都会创建一个新的KafkaStreams。如果你想单独控制KStream实例的生命周期,也可以考虑使用不同的StreamsBuilderFactoryBean实例。

      你还可以在StreamsBuilderFactoryBean上指定“KafkaStreams.StateListener”、“Thread.UncoughtException Handler”和“StateRestoreListener”选项,这些选项被委托给内部KafkaStream实例。此外,除了在StreamsBuilderFactoryBean上间接设置这些选项外,从2.1.5版本开始,你还可以使用KafkaStreamsCustomizer回调接口来配置内部KafkaStreams实例。请注意,KafkaStreamsCustomizer会覆盖StreamsBuilderFactoryBean提供的选项。如果你需要直接执行一些KafkaStreams操作,你可以使用StreamsBuilderFactoryBean.getKafkaStreams()访问该内部KafkaStreams实例。你可以按类型自动装配StreamsBuilderFactoryBean bean,但应确保在bean定义中使用完整类型,如下例所示:

      @Bean
      public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
          return new StreamsBuilderFactoryBean(streamsConfig);
      }
      ...
      @Autowired
      private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
      

      或者,如果使用接口bean定义,可以为按名称注入添加@Qualifier。下面的示例展示了如何这样做:

      @Bean
      public FactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
          return new StreamsBuilderFactoryBean(streamsConfig);
      }
      ...
      @Autowired
      @Qualifier("&myKStreamBuilder")
      private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
      

      工厂bean有一个infrastructureCustomizer属性,类型为KafkaStreamsInfrastructureCustomizer;这允许在创建流之前自定义Topology 和 StreamsBuilder(例如添加状态存储)。

      public interface KafkaStreamsInfrastructureCustomizer {
      	void configureBuilder(StreamsBuilder builder);
      	void configureTopology(Topology topology);
      }
      

      默认情况下,框架提供了无任何操作实现,以避免在其中一个方法不需要的情况下必须同时实现这两种方法。

      框架提供了CompositeKafkaStreamsInfrastructureCustomizer,用于需要应用多个自定义程序。

      三、KafkaStreams Micrometer支持

      你可以配置一个KafkaStreamsMicrometerListener来自动注册由factory bean管理的KafkaStreams 对象的micrometer meters:

      streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
              Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
      

      四、Streams JSON 序列化和反序列化

      为了在读取或写入JSON格式的主题或状态存储时序列化和反序列化数据,Spring For Apache Kafka提供了一个使用JSON的JsonSerde实现,将其委托给序列化、反序列化和消息转换中描述的JsonSerializer和JsonDeserializer。JsonSerde实现通过其构造函数(目标类型或ObjectMapper)提供相同的配置选项。在下面的例子中,我们使用JsonSerde来序列化和反序列化Kafka流的Cat payload(JsonSerde 可以以类似的方式用于任何需要实例的地方):

      stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
      

      当以编程方式构造用于生产者/消费者工厂的序列化器/反序列化器时,您可以使用fluent API,这简化了配置。

      stream.through(new JsonSerde<>(MyKeyType.class)
              .forKeys()
              .noTypeInfo(),
          new JsonSerde<>(MyValueType.class)
              .noTypeInfo(),
          "myTypes");
      

      五、使用Kafka流分支器KafkaStreamBrancher

      KafkaStreamBrancher类引入了一种更方便的方法来在KStream之上构建条件分支。先看下面这个不使用KafkaStreamBrancher的例子:

      KStream[] branches = builder.stream("source").branch(
            (key, value) -> value.contains("A"),
            (key, value) -> value.contains("B"),
            (key, value) -> true
           );
      branches[0].to("A");
      branches[1].to("B");
      branches[2].to("C");
      

      以下例子使用了KafkaStreamBrancher:

      new KafkaStreamBrancher()
         .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
         .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
         //default branch should not necessarily be defined in the end of the chain!
         .defaultBranch(ks -> ks.to("C"))
         .onTopOf(builder.stream("source"));
         //onTopOf method returns the provided stream so we can continue with method chaining
      

      六、配置Configuration

      要配置Kafka Streams环境,StreamsBuilderFactoryBean需要一个KafkaStreamsConfiguration实例。有关所有可能的选项,请参阅Apache Kafka文档。

      流配置现在作为KafkaStreamsConfiguration 对象提供,而不是作为StreamsConfig提供。

      为了避免在大多数情况下使用样板代码,特别是在开发微服务时,Spring for Apache Kafka提供了@EnableKafkaStreams注释,你应该将其放在@Configuration类中。你只需要声明一个名为defaultKafkaStreamsConfig的KafkaStreamsConfiguration bean。在应用程序上下文中自动声明一个名为defaultKafkaStreamsBuilder的StreamsBuilderFactoryBean bean。你也可以声明和使用任何额外的StreamsBuilderFactoryBean bean。您可以通过提供一个实现StreamsBuilderFactoryBeanConfigurer的bean来执行该bean的额外定制。如果有多个这样的bean,则将根据它们的Ordered.order属性应用它们。

      默认情况下,当工厂bean停止时,会调用KafkaStreams.cleanUp()方法。从2.1.2版本开始,工厂bean有额外的构造函数,使用CleanupConfig对象,该对象具有属性可以控制在start()或stop()期间调用cleanUp()方法,或者两者都不调用。从2.7版本开始,默认情况是从不清理本地状态。

      七、Header Enricher

      3.0版本增加了ContextualProcessor的子类HeaderEnricherProcessor;提供与弃用的实现Transformer接口的HeaderEnricher相同的功能。这可以用于在流处理中添加头;报头值是SpEL表达式;表达式求值的根对象具有3个属性:

      • record ——org.apache.kafka.streams.processor.api.Record(key, value, timestamp, headers)
      • key——当前记录的键
      • value——当前记录的值
      • context——ProcessorContext,允许访问当前记录元数据

        表达式必须返回byte[]或String(将使用UTF-8将其转换为byte[])。

        要在流中使用enricher:

        .process(() -> new HeaderEnricherProcessor(expressions))
        

        processor不改变键或值;它只是添加标头。

        你需要为每条记录创建一个新实例。

        .process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
        

        下面是一个简单的例子,添加一个文字头和一个变量:

        Map headers = new HashMap<>();
        headers.put("header1", new LiteralExpression("value1"));
        SpelExpressionParser parser = new SpelExpressionParser();
        headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
        ProcessorSupplier supplier = () -> new HeaderEnricher enricher = new HeaderEnricher<>(headers);
        KStream stream = builder.stream(INPUT);
        stream
                .process(() -> supplier)
                .to(OUTPUT);
        

        八、MessagingProcessor

        3.0版本增加了ContextualProcessor的子类MessagingProcessor;提供与已弃用的MessagingTransformer相同的功能,后者实现了已弃用的Transformer接口。这允许Kafka Streams topology 与Spring Messaging组件交互,例如Spring Integration flow。转换器需要MessagingFunction的实现。

        @FunctionalInterface
        public interface MessagingFunction {
        	Message exchange(Message message);
        }
        

        Spring Integration使用其GatewayProxyFactoryBean自动提供实现。它还需要一个MessagingMessageConverter来将键、值和元数据(包括头)转换为Spring Messaging Message。请参阅以下示例[从KStream调用Spring Integration Flow]:

        @Bean
        public KStream kStream(StreamsBuilder kStreamBuilder,
                MessagingTransformer transformer)  transformer) {
            KStream stream = kStreamBuilder.stream(STREAMING_TOPIC1);
            stream.mapValues((ValueMapper) String::toUpperCase)
                    ...
                    .transform(() -> transformer)
                    .to(streamingTopic2);
            stream.print(Printed.toSysOut());
            return stream;
        }
        @Bean
        @DependsOn("flow")
        public MessagingTransformer transformer(
                MessagingFunction function) {
            MessagingMessageConverter converter = new MessagingMessageConverter();
            converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
            return new MessagingTransformer<>(function, converter);
        }
        @Bean
        public IntegrationFlow flow() {
            return IntegrationFlow.from(MessagingFunction.class)
                ...
                .get();
        }
        

        九、从反序列化异常中恢复Recovery from Deserialization Exceptions

        版本2.3引入了RecoveringDeserializationExceptionHandler,它可以在发生反序列化异常时采取一些操作。请参阅有关DeserializationExceptionHandler的Kafka文档,RecoveringDeserializationExceptionHandler就是其中的一个实现。RecoveringDeserializationExceptionHandler是使用ConsumerRecordRecoverer实现配置的。该框架提供了DeadLetterPublishingRecoverer,它将失败的记录发送到死信主题。有关此恢复器的详细信息,请参阅发布死信记录。

        要配置恢复器,请将以下属性添加到流配置中:

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map props = new HashMap<>();
            ...
            props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                    RecoveringDeserializationExceptionHandler.class);
            props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
            ...
            return new KafkaStreamsConfiguration(props);
        }
        @Bean
        public DeadLetterPublishingRecoverer recoverer() {
            return new DeadLetterPublishingRecoverer(kafkaTemplate(),
                    (record, ex) -> new TopicPartition("recovererDLQ", -1));
        }
        

        当然,recoverer() bean可以是你自己的ConsumerRecordRecoverer实现。

        十、Kafka Streams示例

        下面的例子结合了本文所涉及的所有主题:

        @Configuration
        @EnableKafka
        @EnableKafkaStreams
        public static class KafkaStreamsConfig {
            @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
            public KafkaStreamsConfiguration kStreamsConfigs() {
                Map props = new HashMap<>();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
                props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
                return new KafkaStreamsConfiguration(props);
            }
            @Bean
            public StreamsBuilderFactoryBeanConfigurer configurer() {
                return fb -> fb.setStateListener((newState, oldState) -> {
                    System.out.println("State transition from " + oldState + " to " + newState);
                });
            }
            @Bean
            public KStream kStream(StreamsBuilder kStreamBuilder) {
                KStream stream = kStreamBuilder.stream("streamingTopic1");
                stream
                        .mapValues((ValueMapper) String::toUpperCase)
                        .groupByKey()
                        .windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
                        .reduce((String value1, String value2) -> value1 + value2,
                        		Named.as("windowStore"))
                        .toStream()
                        .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                        .filter((i, s) -> s.length() > 40)
                        .to("streamingTopic2");
                stream.print(Printed.toSysOut());
                return stream;
            }
        }
        

网友评论

搜索
最新文章
热门文章
热门标签
 
 梦见拉屎在裤子里  易经八卦六十四卦算命  如何三步看出家里有鬼