10.3. Apache Kafka 支持
Apache Kafka通过提供spring-kafka
项目的自动配置来支持。
Kafka 配置为spring.kafka.*
. 例如,您可以在 application.properties
中声明以下部分:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
要在启动时创建topic,请添加类型为
NewTopic
的bean 。如果主题已经存在,则忽略该 bean。
请参阅KafkaProperties
参考资料 了解更多支持的选项。
10.3.1. 发送消息
SpringKafkaTemplate
是自动配置的,您可以直接在自己的 bean 中自动装配它,如以下示例所示:
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
如果定义了spring.kafka.producer.transaction-id-prefix
属性,则会自动配置 KafkaTransactionManager
。此外,如果定义了一个RecordMessageConverter
bean,它会自动关联到自动配置的KafkaTemplate
.
10.3.2. 接收消息
当 Apache Kafka 基础设施存在时,可以对任何 bean 进行@KafkaListener
注释以创建侦听器端点。如果未定义KafkaListenerContainerFactory
,则会使用 spring.kafka.listener.*
中定义的键自动配置默认值。
以下组件在someTopic
主题上创建侦听器端点:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
如果KafkaTransactionManager
定义了一个bean,它会自动关联到容器工厂。类似地,如果定义了RecordFilterStrategy
、CommonErrorHandler
、AfterRollbackProcessor
或ConsumerAwareRebalanceListener
bean,它会自动关联到默认工厂。
根据侦听器类型,RecordMessageConverter
或BatchMessageConverter
bean 会与默认工厂关联。如果批处理侦听器仅存在一个 RecordMessageConverter
bean,则它将包装在BatchMessageConverter
.
自定义的
ChainedKafkaTransactionManager
必须标记@Primary
,因为它通常引用自动配置的KafkaTransactionManager
bean。
10.3.3. 卡夫卡流
Spring for Apache Kafka 提供了一个工厂 bean 来创建StreamsBuilder
对象并管理其流的生命周期。只要所需的beankafka-streams
位于类路径中,并且通过@EnableKafkaStreams
注释启用了 Kafka Streams,Spring Boot 就会自动配置所需的KafkaStreamsConfiguration
bean。
启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 进行配置,如果不设置则spring.kafka.streams.application-id
默认为spring.application.name
。后者可以全局设置或仅针对流专门覆盖。
使用专用属性可以使用几个附加属性;可以使用命名空间spring.kafka.streams.properties
设置其他任意 Kafka 属性。另请参阅其他 Kafka 属性以获取更多信息。
要使用工厂 bean,请按以下示例所示连接StreamsBuilder
到您的@Bean
:
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase());
}
}
默认情况下,对象管理的流StreamBuilder
会自动启动。您可以使用spring.kafka.streams.auto-startup
属性自定义此行为。
10.3.4. 其他 Kafka 属性
自动配置支持的属性显示在附录的“集成属性”部分中。请注意,在大多数情况下,这些属性(连字符或驼峰命名法)直接映射到 Apache Kafka 点分属性。有关详细信息,请参阅 Apache Kafka 文档。
名称中不包含客户端类型(producer
、consumer
、admin
或streams
)的属性被视为通用属性并适用于所有客户端。如果需要,可以为一种或多种客户端类型覆盖大多数常见属性。
Apache Kafka 将属性的重要性指定为“高”、“中”或“低”。Spring Boot 自动配置支持所有高重要性属性、一些选定的中和低属性以及任何没有默认值的属性。
只有 Kafka 支持的属性的子集可以直接通过KafkaProperties
类获得。如果您希望使用不直接支持的其他属性来配置各个客户端类型,请使用以下属性:
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
这将公共prop.one
Kafka 属性设置为first
(适用于生产者、消费者、管理员和流),将prop.two
管理属性设置为second
,将prop.three
消费者属性设置为third
,将prop.four
生产者属性设置为fourth
,将prop.five
流属性设置为fifth
。
您还可以按如下方式配置Spring Kafka JsonDeserializer
:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
同样,您可以禁用在标头中发送类型信息的默认JsonSerializer
行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
以这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。
10.3.5. 使用嵌入式 Kafka 进行测试
Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。要使用此功能,请使用spring-kafka-test
模块@EmbeddedKafka
注释测试类。有关更多信息,请参阅 Spring for Apache Kafka参考手册。
要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理配合使用,您需要将嵌入式代理地址的系统属性(由 填充)重新映射EmbeddedKafkaBroker
到 Apache Kafka 的 Spring Boot 配置属性。有几种方法可以做到这一点:
提供一个系统属性以将嵌入式代理地址映射到
spring.kafka.bootstrap-servers
测试类中:static { System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers"); }
在注释上配置属性名称
@EmbeddedKafka
:@SpringBootTest @EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers") class MyTest { // ... }
在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
最后更新于
这有帮助吗?