# 10.3. Apache Kafka 支持

[Apache Kafka](https://kafka.apache.org/)通过提供`spring-kafka`项目的自动配置来支持。

Kafka 配置为`spring.kafka.*`. 例如，您可以在 `application.properties`中声明以下部分：

```
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
```

> 要在启动时创建topic，请添加类型为 `NewTopic`的bean 。如果主题已经存在，则忽略该 bean。

请参阅[`KafkaProperties`](https://github.com/spring-projects/spring-boot/tree/v3.2.0/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java)参考资料 了解更多支持的选项。

**10.3.1. 发送消息**

Spring`KafkaTemplate`是自动配置的，您可以直接在自己的 bean 中自动装配它，如以下示例所示：

```
@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}
```

如果定义了`spring.kafka.producer.transaction-id-prefix`属性，则会自动配置 `KafkaTransactionManager`。此外，如果定义了一个`RecordMessageConverter` bean，它会自动关联到自动配置的`KafkaTemplate`.

**10.3.2. 接收消息**

当 Apache Kafka 基础设施存在时，可以对任何 bean 进行`@KafkaListener`注释以创建侦听器端点。如果未定义`KafkaListenerContainerFactory`，则会使用 `spring.kafka.listener.*`中定义的键自动配置默认值。

以下组件在`someTopic`主题上创建侦听器端点：

```
@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}
```

如果`KafkaTransactionManager`定义了一个bean，它会自动关联到容器工厂。类似地，如果定义了`RecordFilterStrategy`、`CommonErrorHandler`、`AfterRollbackProcessor`或`ConsumerAwareRebalanceListener`bean，它会自动关联到默认工厂。

根据侦听器类型，`RecordMessageConverter`或`BatchMessageConverter`bean 会与默认工厂关联。如果批处理侦听器仅存在一个 `RecordMessageConverter` bean，则它将包装在`BatchMessageConverter`.

> 自定义的 `ChainedKafkaTransactionManager`必须标记 `@Primary`，因为它通常引用自动配置的`KafkaTransactionManager`bean。

**10.3.3. 卡夫卡流**

Spring for Apache Kafka 提供了一个工厂 bean 来创建`StreamsBuilder`对象并管理其流的生命周期。只要所需的bean`kafka-streams`位于类路径中，并且通过`@EnableKafkaStreams`注释启用了 Kafka Streams，Spring Boot 就会自动配置所需的`KafkaStreamsConfiguration` bean。

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 进行配置，如果不设置则`spring.kafka.streams.application-id`默认为`spring.application.name`。后者可以全局设置或仅针对流专门覆盖。

使用专用属性可以使用几个附加属性；可以使用命名空间`spring.kafka.streams.properties`设置其他任意 Kafka 属性。另请参阅[其他 Kafka 属性](https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#messaging.kafka.additional-properties)以获取更多信息。

要使用工厂 bean，请按以下示例所示连接`StreamsBuilder`到您的`@Bean` ：

```
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}
```

默认情况下，对象管理的流`StreamBuilder`会自动启动。您可以使用`spring.kafka.streams.auto-startup`属性自定义此行为。

**10.3.4. 其他 Kafka 属性**

自动配置支持的属性显示在附录的[“集成属性”部分中。](https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#appendix.application-properties.integration)请注意，在大多数情况下，这些属性（连字符或驼峰命名法）直接映射到 Apache Kafka 点分属性。有关详细信息，请参阅 Apache Kafka 文档。

名称中不包含客户端类型（`producer`、`consumer`、`admin`或`streams`）的属性被视为通用属性并适用于所有客户端。如果需要，可以为一种或多种客户端类型覆盖大多数常见属性。

Apache Kafka 将属性的重要性指定为“高”、“中”或“低”。Spring Boot 自动配置支持所有高重要性属性、一些选定的中和低属性以及任何没有默认值的属性。

只有 Kafka 支持的属性的子集可以直接通过`KafkaProperties`类获得。如果您希望使用不直接支持的其他属性来配置各个客户端类型，请使用以下属性：

```
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
```

这将公共`prop.one`Kafka 属性设置为`first`（适用于生产者、消费者、管理员和流），将`prop.two`管理属性设置为`second`，将`prop.three`消费者属性设置为`third`，将`prop.four`生产者属性设置为`fourth`，将`prop.five`流属性设置为`fifth`。

您还可以按如下方式配置Spring Kafka `JsonDeserializer`：

```
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
```

同样，您可以禁用在标头中发送类型信息的默认`JsonSerializer`行为：

```
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
```

> 以这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。

**10.3.5. 使用嵌入式 Kafka 进行测试**

Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。要使用此功能，请使用`spring-kafka-test`模块`@EmbeddedKafka`注释测试类。有关更多信息，请参阅 Spring for Apache Kafka[参考手册](https://docs.spring.io/spring-kafka/docs/3.1.0/reference/testing.html#ekb)。

要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理配合使用，您需要将嵌入式代理地址的系统属性（由 填充）重新映射`EmbeddedKafkaBroker`到 Apache Kafka 的 Spring Boot 配置属性。有几种方法可以做到这一点：

* 提供一个系统属性以将嵌入式代理地址映射到`spring.kafka.bootstrap-servers`测试类中：

  ```
  static {
      System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
  }
  ```
* 在注释上配置属性名称`@EmbeddedKafka`：

  ```
  @SpringBootTest
  @EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  class MyTest {
  ​
      // ...
  ​
  }
  ```
* 在配置属性中使用占位符：

  ```
  spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
  ```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://doc.shiker.tech/spring-boot-can-kao-wen-dang/10.-xiao-xi/10.3.-apache-kafka-zhi-chi.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
