# 10.3. Apache Kafka 支持

[Apache Kafka](https://kafka.apache.org/)通过提供`spring-kafka`项目的自动配置来支持。

Kafka 配置为`spring.kafka.*`. 例如，您可以在 `application.properties`中声明以下部分：

```
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
```

> 要在启动时创建topic，请添加类型为 `NewTopic`的bean 。如果主题已经存在，则忽略该 bean。

请参阅[`KafkaProperties`](https://github.com/spring-projects/spring-boot/tree/v3.2.0/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java)参考资料 了解更多支持的选项。

**10.3.1. 发送消息**

Spring`KafkaTemplate`是自动配置的，您可以直接在自己的 bean 中自动装配它，如以下示例所示：

```
@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}
```

如果定义了`spring.kafka.producer.transaction-id-prefix`属性，则会自动配置 `KafkaTransactionManager`。此外，如果定义了一个`RecordMessageConverter` bean，它会自动关联到自动配置的`KafkaTemplate`.

**10.3.2. 接收消息**

当 Apache Kafka 基础设施存在时，可以对任何 bean 进行`@KafkaListener`注释以创建侦听器端点。如果未定义`KafkaListenerContainerFactory`，则会使用 `spring.kafka.listener.*`中定义的键自动配置默认值。

以下组件在`someTopic`主题上创建侦听器端点：

```
@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}
```

如果`KafkaTransactionManager`定义了一个bean，它会自动关联到容器工厂。类似地，如果定义了`RecordFilterStrategy`、`CommonErrorHandler`、`AfterRollbackProcessor`或`ConsumerAwareRebalanceListener`bean，它会自动关联到默认工厂。

根据侦听器类型，`RecordMessageConverter`或`BatchMessageConverter`bean 会与默认工厂关联。如果批处理侦听器仅存在一个 `RecordMessageConverter` bean，则它将包装在`BatchMessageConverter`.

> 自定义的 `ChainedKafkaTransactionManager`必须标记 `@Primary`，因为它通常引用自动配置的`KafkaTransactionManager`bean。

**10.3.3. 卡夫卡流**

Spring for Apache Kafka 提供了一个工厂 bean 来创建`StreamsBuilder`对象并管理其流的生命周期。只要所需的bean`kafka-streams`位于类路径中，并且通过`@EnableKafkaStreams`注释启用了 Kafka Streams，Spring Boot 就会自动配置所需的`KafkaStreamsConfiguration` bean。

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 进行配置，如果不设置则`spring.kafka.streams.application-id`默认为`spring.application.name`。后者可以全局设置或仅针对流专门覆盖。

使用专用属性可以使用几个附加属性；可以使用命名空间`spring.kafka.streams.properties`设置其他任意 Kafka 属性。另请参阅[其他 Kafka 属性](https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#messaging.kafka.additional-properties)以获取更多信息。

要使用工厂 bean，请按以下示例所示连接`StreamsBuilder`到您的`@Bean` ：

```
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}
```

默认情况下，对象管理的流`StreamBuilder`会自动启动。您可以使用`spring.kafka.streams.auto-startup`属性自定义此行为。

**10.3.4. 其他 Kafka 属性**

自动配置支持的属性显示在附录的[“集成属性”部分中。](https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#appendix.application-properties.integration)请注意，在大多数情况下，这些属性（连字符或驼峰命名法）直接映射到 Apache Kafka 点分属性。有关详细信息，请参阅 Apache Kafka 文档。

名称中不包含客户端类型（`producer`、`consumer`、`admin`或`streams`）的属性被视为通用属性并适用于所有客户端。如果需要，可以为一种或多种客户端类型覆盖大多数常见属性。

Apache Kafka 将属性的重要性指定为“高”、“中”或“低”。Spring Boot 自动配置支持所有高重要性属性、一些选定的中和低属性以及任何没有默认值的属性。

只有 Kafka 支持的属性的子集可以直接通过`KafkaProperties`类获得。如果您希望使用不直接支持的其他属性来配置各个客户端类型，请使用以下属性：

```
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
```

这将公共`prop.one`Kafka 属性设置为`first`（适用于生产者、消费者、管理员和流），将`prop.two`管理属性设置为`second`，将`prop.three`消费者属性设置为`third`，将`prop.four`生产者属性设置为`fourth`，将`prop.five`流属性设置为`fifth`。

您还可以按如下方式配置Spring Kafka `JsonDeserializer`：

```
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
```

同样，您可以禁用在标头中发送类型信息的默认`JsonSerializer`行为：

```
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
```

> 以这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。

**10.3.5. 使用嵌入式 Kafka 进行测试**

Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。要使用此功能，请使用`spring-kafka-test`模块`@EmbeddedKafka`注释测试类。有关更多信息，请参阅 Spring for Apache Kafka[参考手册](https://docs.spring.io/spring-kafka/docs/3.1.0/reference/testing.html#ekb)。

要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理配合使用，您需要将嵌入式代理地址的系统属性（由 填充）重新映射`EmbeddedKafkaBroker`到 Apache Kafka 的 Spring Boot 配置属性。有几种方法可以做到这一点：

* 提供一个系统属性以将嵌入式代理地址映射到`spring.kafka.bootstrap-servers`测试类中：

  ```
  static {
      System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
  }
  ```
* 在注释上配置属性名称`@EmbeddedKafka`：

  ```
  @SpringBootTest
  @EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  class MyTest {
  ​
      // ...
  ​
  }
  ```
* 在配置属性中使用占位符：

  ```
  spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
  ```
