🍊
翻译橙
🍊返回主站🤖参与贡献
  • hello,这里是翻译橙
  • spring boot参考文档
    • 1. 法律
    • 2. 寻求帮助
    • 3. 文档概述
    • 4. 入门
    • 5. 升级Spring Boot
    • 6. 使用 Spring Boot 进行开发
      • 6.1. 构建系统
      • 6.2. 构建你的代码
      • 6.3. 配置类
      • 6.4. 自动配置
      • 6.5. Spring Bean 和依赖注入
      • 6.6. 使用@SpringBootApplication注解
      • 6.7. 运行您的应用程序
      • 6.8. 开发者工具
      • 6.9. 打包您的生产应用程序
      • 6.10. 接下来读什么
    • 7.核心特性
      • 7.1. SpringApplication
      • 7.2. 外部化配置
      • 7.3.Profile配置
      • 7.4.日志记录
      • 7.5.国际化
      • 7.6 面向切面的编程
      • 7.7. JSON
      • 7.8. 任务执行与调度
      • 7.9. 单元测试
        • 7.9.1. 测试范围依赖
        • 7.9.2. 测试 Spring 应用程序
        • 7.9.3. 测试 Spring Boot 应用程序
        • 7.9.4. 测试容器
        • 7.9.5. 测试工具
      • 7.10. Docker Compose 支持
      • 7.11. 测试容器支持
      • 7.12. 创建您自己的自动配置
      • 7.13. Kotlin 支持
      • 7.14 SSL
      • 7.15.接下来要读什么
    • 8. 网络
      • 8.1. Servlet Web 应用程序
        • 8.1.1. “Spring Web MVC 框架”
        • 8.1.2. JAX-RS 和Jersey
        • 8.1.3. 嵌入式 Servlet 容器支持
      • 8.2 反应式网络应用程序
        • 8.2.1. “Spring WebFlux 框架”
        • 8.2.2. 嵌入式反应式服务器支持
        • 8.2.3. 反应式服务器资源配置
      • 8.3. 优雅关机
      • 8.4. spring安全
        • 8.4.1. MVC安全
        • 8.4.2. WebFlux 安全
        • 8.4.3. OAuth2
        • 8.4.4. SAML 2.0
      • 8.5. spring 会话
      • 8.6.GraphQL
      • 8.7. Spring HATEOAS
      • 8.8.接下来读什么
    • 9. 数据
      • 9.1. SQL数据库
      • 9.2. 使用 NoSQL 技术
      • 9.3. 接下来读什么
    • 10. 消息
      • 10.1. JMS
      • 10.2. AMQP
      • 10.3. Apache Kafka 支持
      • 10.4. Apache Pulsar 支持
      • 10.5. RSocket
      • 10.6. Spring Integration
      • 10.7. WebSockets
      • 10.8. What to Read Next
    • 11. IO
      • 11.1. 缓存
      • 11.2. Hazelcast
      • 11.3. Quartz 调度程序
      • 11.4. 发送电子邮件
      • 11.5. 验证
      • 11.6. 调用 REST 服务
      • 11.7. web services
      • 11.8. 使用 JTA 进行分布式事务
      • 11.9. 接下来读什么
    • 12. 容器镜像
  • Spring核心功能
    • 1.IOC容器和Bean简介
      • 1.2. 容器概述
      • 1.3. Bean概述
      • 1.4. 依赖项
        • 1.4.1. 依赖注入
        • 1.4.2. 详细的依赖关系和配置
        • 1.4.3. 使用depends-on
        • 1.4.4. 延迟初始化的 Bean
        • 1.4.5. 自动装配协作者
        • 1.4.6. 方法注入
    • 2. Resources
      • 2.1. 介绍
      • 2.2. Resource接口
      • 2.3. 内置Resource实现
      • 2.4. ResourceLoader接口
      • 2.5. ResourcePatternResolver接口
      • 2.6. ResourceLoaderAware接口
      • 2.7. 资源作为依赖
      • 2.8. 应用程序上下文和资源路径
    • 3. 验证、数据绑定和类型转换
      • 3.1. 使用 Spring 的 Validator 接口进行验证
      • 3.2. 将代码解析为错误消息
      • 3.3. Bean 操作和BeanWrapper
      • 3.4. spring类型转换
      • 3.5. spring字段格式
      • 3.6. 配置全局日期和时间格式
      • 3.7. Java Bean 验证
    • 4. SpEL表达式
    • 5. Spring 面向切面编程
      • 5.1. AOP 概念
      • 5.2. Spring AOP 的能力和目标
      • 5.3. AOP 代理
      • 5.4. @AspectJ 支持
        • 5.4.1. 启用@AspectJ 支持
        • 5.4.2. 声明一个切面
        • 5.4.3. 声明切入点
        • 5.4.4. 声明切点
        • 5.4.5. 切面说明
        • 5.4.6. 切面实例化模型
        • 5.4.7. AOP 示例
      • 5.5. 基于模式的 AOP 支持
      • 5.6. 选择要使用的 AOP 声明样式
      • 5.7. 混合切面类型
      • 5.8. 代理机制
      • 5.9. @AspectJ 代理的程序化创建
      • 5.10. 在 Spring 应用程序中使用 AspectJ
      • 5.11.更多资源
    • 6. Spring AOP API
      • 6.1. Spring中的切入点API
      • 6.2. Spring 中的 Advice API
      • 6.3. Spring 中的 Advisor API
      • 6.4. 使用ProxyFactoryBean创建 AOP 代理
      • 6.5. 简洁的代理定义
      • 6.6. 以编程方式创建 AOP 代理ProxyFactory
      • 6.7. 操作切面对象
      • 6.8. 使用“自动代理”工具
      • 6.9. 使用TargetSource实现
      • 6.10. 定义新的切面类型
    • 7. 空指针安全
    • 8. 数据缓冲器和编解码器
    • 9. 日志
    • 10. 附录
      • 10.1. XML 模式
      • 10.2. 自定义XML Schema
        • 10.2.1. 创作 Schema
        • 10.2.2. 编码一个NamespaceHandler
        • 10.2.3. 使用BeanDefinitionParser
        • 10.2.4. 注册处理程序和模式
        • 10.2.5. 在 Spring XML 配置中使用自定义扩展
        • 10.2.6. 更详细的例子
      • 10.3. 应用程序启动步骤
  • 使用redis实现分布式锁
  • Java 安全标准算法名称
  • JDK 9 JEP
  • JDK 10 JEP
  • 人件
    • 《人件》
    • 第一部分 管理人力资源
      • 01 此时此刻,一个项目正在走向失败
      • 02 干酪汉堡,做一个,卖一个
      • 03 维也纳在等你
      • 04 质量——如果时间允许
      • 05 再谈帕金森定律
      • 06 苦杏素
    • 第二部分 办公环境
      • 07 家具警察
      • 08 “朝九晚五在这里啥也完成不了。”
      • 09 在空间上省钱
      • 间奏曲:生产效率度量和不明飞行物
      • 10 大脑时问与身体时间
      • 11 电话
      • 12 门的回归
      • 13 采取保护步骤
    • 第三部分 正确的人
      • 14 霍恩布洛尔因素
      • 15 谈谈领导力
      • 16 雇一名杂耍演员
      • 17 与他人良好合作
      • 18 童年的终结
      • 19 在这儿很开心
      • 20 人力资本
    • 第四部分 高效团队养成
      • 21 整体大于部分之和
      • 22 黑衣团队
      • 23 团队自毁
      • 24 再谈团队自毁
      • 25 竞争
      • 26 一顿意面晚餐
      • 27 敞开和服
      • 28 团队形成的化学反应
    • 第五部分 沃土
      • 29 自我愈复系统
      • 30 与风险共舞
      • 3l 会议、独白和交流
      • 32 终极管理罪恶得主是……
      • 33 “邪恶”电邮
      • 34 让改变成为可能
      • 35 组织型学习
      • 36 构建社区
    • 第六部分 快乐地工作
      • 37 混乱与秩序
      • 38 自由电子
      • 39 霍尔加·丹斯克
由 GitBook 提供支持
在本页

这有帮助吗?

在GitHub上编辑
  1. spring boot参考文档
  2. 10. 消息

10.3. Apache Kafka 支持

上一页10.2. AMQP下一页10.4. Apache Pulsar 支持

最后更新于1年前

这有帮助吗?

通过提供spring-kafka项目的自动配置来支持。

Kafka 配置为spring.kafka.*. 例如,您可以在 application.properties中声明以下部分:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

要在启动时创建topic,请添加类型为 NewTopic的bean 。如果主题已经存在,则忽略该 bean。

请参阅参考资料 了解更多支持的选项。

10.3.1. 发送消息

SpringKafkaTemplate是自动配置的,您可以直接在自己的 bean 中自动装配它,如以下示例所示:

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}

如果定义了spring.kafka.producer.transaction-id-prefix属性,则会自动配置 KafkaTransactionManager。此外,如果定义了一个RecordMessageConverter bean,它会自动关联到自动配置的KafkaTemplate.

10.3.2. 接收消息

当 Apache Kafka 基础设施存在时,可以对任何 bean 进行@KafkaListener注释以创建侦听器端点。如果未定义KafkaListenerContainerFactory,则会使用 spring.kafka.listener.*中定义的键自动配置默认值。

以下组件在someTopic主题上创建侦听器端点:

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

如果KafkaTransactionManager定义了一个bean,它会自动关联到容器工厂。类似地,如果定义了RecordFilterStrategy、CommonErrorHandler、AfterRollbackProcessor或ConsumerAwareRebalanceListenerbean,它会自动关联到默认工厂。

根据侦听器类型,RecordMessageConverter或BatchMessageConverterbean 会与默认工厂关联。如果批处理侦听器仅存在一个 RecordMessageConverter bean,则它将包装在BatchMessageConverter.

自定义的 ChainedKafkaTransactionManager必须标记 @Primary,因为它通常引用自动配置的KafkaTransactionManagerbean。

10.3.3. 卡夫卡流

Spring for Apache Kafka 提供了一个工厂 bean 来创建StreamsBuilder对象并管理其流的生命周期。只要所需的beankafka-streams位于类路径中,并且通过@EnableKafkaStreams注释启用了 Kafka Streams,Spring Boot 就会自动配置所需的KafkaStreamsConfiguration bean。

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 进行配置,如果不设置则spring.kafka.streams.application-id默认为spring.application.name。后者可以全局设置或仅针对流专门覆盖。

要使用工厂 bean,请按以下示例所示连接StreamsBuilder到您的@Bean :

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}

默认情况下,对象管理的流StreamBuilder会自动启动。您可以使用spring.kafka.streams.auto-startup属性自定义此行为。

10.3.4. 其他 Kafka 属性

名称中不包含客户端类型(producer、consumer、admin或streams)的属性被视为通用属性并适用于所有客户端。如果需要,可以为一种或多种客户端类型覆盖大多数常见属性。

Apache Kafka 将属性的重要性指定为“高”、“中”或“低”。Spring Boot 自动配置支持所有高重要性属性、一些选定的中和低属性以及任何没有默认值的属性。

只有 Kafka 支持的属性的子集可以直接通过KafkaProperties类获得。如果您希望使用不直接支持的其他属性来配置各个客户端类型,请使用以下属性:

spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth

这将公共prop.oneKafka 属性设置为first(适用于生产者、消费者、管理员和流),将prop.two管理属性设置为second,将prop.three消费者属性设置为third,将prop.four生产者属性设置为fourth,将prop.five流属性设置为fifth。

您还可以按如下方式配置Spring Kafka JsonDeserializer:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another

同样,您可以禁用在标头中发送类型信息的默认JsonSerializer行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false

以这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。

10.3.5. 使用嵌入式 Kafka 进行测试

要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理配合使用,您需要将嵌入式代理地址的系统属性(由 填充)重新映射EmbeddedKafkaBroker到 Apache Kafka 的 Spring Boot 配置属性。有几种方法可以做到这一点:

  • 提供一个系统属性以将嵌入式代理地址映射到spring.kafka.bootstrap-servers测试类中:

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
    }
  • 在注释上配置属性名称@EmbeddedKafka:

    @SpringBootTest
    @EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
    class MyTest {
    ​
        // ...
    ​
    }
  • 在配置属性中使用占位符:

    spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

使用专用属性可以使用几个附加属性;可以使用命名空间spring.kafka.streams.properties设置其他任意 Kafka 属性。另请参阅以获取更多信息。

自动配置支持的属性显示在附录的请注意,在大多数情况下,这些属性(连字符或驼峰命名法)直接映射到 Apache Kafka 点分属性。有关详细信息,请参阅 Apache Kafka 文档。

Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。要使用此功能,请使用spring-kafka-test模块@EmbeddedKafka注释测试类。有关更多信息,请参阅 Spring for Apache Kafka。

Apache Kafka
KafkaProperties
其他 Kafka 属性
“集成属性”部分中。
参考手册