10.4. Apache Pulsar 支持

Apache Pulsar通过提供Spring for Apache Pulsar项目的自动配置来支持。

org.springframework.pulsar:spring-pulsar位于类路径上时,Spring Boot 将自动配置并注册经典(强制)Spring for Apache Pulsar 组件。当org.springframework.pulsar:spring-pulsar-reactive位于类路径上时,它将对反应式组件执行相同的操作。

spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive“Starters” 分别用于方便地收集命令式和反应式使用的依赖项。

10.4.1. 连接到 Pulsar

当您使用 Pulsar 启动器时,Spring Boot 将自动配置并注册一个PulsarClientbean。

默认情况下,应用程序尝试连接到位于 的本地 Pulsar 实例pulsar://localhost:6650。这可以通过将spring.pulsar.client.service-url属性设置为不同的值来调整。

该值必须是有效的Pulsar 协议URL

您可以通过指定任何带spring.pulsar.client.*前缀的应用程序属性来配置客户端。

如果您需要对配置进行更多控制,请考虑注册一个或多个PulsarClientBuilderCustomizerbean。

验证

要连接到需要身份验证的 Pulsar 集群,您需要通过设置pluginClassName和插件所需的任何参数来指定要使用的身份验证插件。您可以将参数设置为参数名称到参数值的映射。以下示例展示了如何配置AuthenticationOAuth2插件。

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance

您需要确保spring.pulsar.client.authentication.param.*下面定义的名称与您的身份验证插件所期望的名称完全匹配(通常是驼峰式大小写)。Spring Boot 不会尝试对这些条目进行任何类型的宽松绑定。

例如,如果您想为AuthenticationOAuth2auth 插件配置颁发者 url,则必须使用spring.pulsar.client.authentication.param.issuerUrl. 如果您使用其他形式,例如issuerurlissuer-url,该设置将不会应用于插件。

SSL协议

默认情况下,Pulsar 客户端以纯文本方式与 Pulsar 服务进行通信。您可以按照Spring for Apache Pulsar 参考文档中的这些步骤来启用 TLS 加密。

有关客户端和身份验证的完整详细信息,请参阅 Spring for Apache Pulsar参考文档

10.4.2. 反应式连接到 Pulsar

当Reactive自动配置被激活时,Spring Boot将自动配置并注册一个ReactivePulsarClientbean。

ReactivePulsarClient采用了前面描述的PulsarClient实例。因此,请按照上一节PulsarClient配置ReactivePulsarClient.

10.4.3. 连接到 Pulsar 管理

Spring for Apache Pulsar 的PulsarAdministration客户端也是自动配置的。

默认情况下,应用程序尝试连接到位于http://localhost:8080 的本地 Pulsar 实例。这可以通过在表单中将spring.pulsar.admin.service-url属性设置为(http|https)://<host>:<port>不同的值来调整。

如果您需要对配置进行更多控制,请考虑注册一个或多个PulsarAdminBuilderCustomizerbean。

验证

当访问需要身份验证的 Pulsar 集群时,管理客户端需要与常规 Pulsar 客户端相同的安全配置。您可以通过spring.pulsar.admin.authentication替换为 spring.pulsar.client.authentication来使用上述身份验证配置

要在启动时创建主题,请添加类型为PulsarTopic 的 bean 。如果主题已经存在,则忽略该 bean。

10.4.4。发送消息

SpringPulsarTemplate是自动配置的,您可以使用它来发送消息,如以下示例所示:

@Component
public class MyBean {

    private final PulsarTemplate<String> pulsarTemplate;

    public MyBean(PulsarTemplate<String> pulsarTemplate) {
        this.pulsarTemplate = pulsarTemplate;
    }

    public void someMethod() throws PulsarClientException {
        this.pulsarTemplate.send("someTopic", "Hello");
    }

}

PulsarTemplate依赖于PulsarProducerFactory创建底层的 Pulsar 生产者。Spring Boot 自动配置还提供了这个生产者工厂,默认情况下,它会缓存它创建的生产者。您可以通过指定任何spring.pulsar.producer.*spring.pulsar.producer.cache.*前缀的应用程序属性来配置生产者工厂和缓存设置。

如果您需要对生产者工厂配置进行更多控制,请考虑注册一个或多个ProducerBuilderCustomizerbean。这些定制器适用于所有创建的生产者。也可以在发送消息时传入aProducerBuilderCustomizer,只影响当前的生产者。

如果您需要对发送的消息进行更多控制,可以在发送消息时传入TypedMessageBuilderCustomizer

10.4.5. 反应式发送消息

当Reactive自动配置被激活时,SpringReactivePulsarTemplate会自动配置,你可以使用它来发送消息,如下例所示:

@Component
public class MyBean {

    private final ReactivePulsarTemplate<String> pulsarTemplate;

    public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
        this.pulsarTemplate = pulsarTemplate;
    }

    public void someMethod() {
        this.pulsarTemplate.send("someTopic", "Hello").subscribe();
    }

}

ReactivePulsarTemplate依赖于ReactivePulsarSenderFactory 来实际创建底层发送者。Spring Boot 自动配置还提供了这个发送者工厂,默认情况下,它会缓存它创建的生产者。您可以通过指定任何spring.pulsar.producer.*spring.pulsar.producer.cache.*前缀应用程序属性来配置发件人工厂和缓存设置。

如果您需要对发送者工厂配置进行更多控制,请考虑注册一个或多个ReactiveMessageSenderBuilderCustomizerbean。这些定制器适用于所有创建的发件人。也可以在发送消息时传入 ReactiveMessageSenderBuilderCustomizer ,只影响当前发送者。

如果您需要对发送的消息进行更多控制,可以在发送消息时传入 MessageSpecBuilderCustomizer

10.4.6. 接收消息

当 Apache Pulsar 基础设施存在时,可以对任何 bean 进行注释@PulsarListener以创建侦听器端点。以下组件在someTopic主题上创建侦听器端点:

@Component
public class MyBean {

    @PulsarListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

Spring Boot 自动配置提供了PulsarListener 所需的所有组件,例如PulsarListenerContainerFactory用来构造底层 Pulsar 消费者的消费者工厂。您可以通过指定任何spring.pulsar.listener.*spring.pulsar.consumer.*前缀的应用程序属性来配置这些组件。

如果您需要对消费者工厂配置进行更多控制,请考虑注册一个或多个ConsumerBuilderCustomizerbean。这些定制器适用于工厂创建的所有使用者,因此适用于所有@PulsarListener实例。您还可以通过设置@PulsarListener注解的consumerCustomizer属性来自定义单个监听器。

10.4.7. 反应性地接收消息

当 Apache Pulsar 基础设施存在并且响应式自动配置被激活时,可以对任何 bean 进行注释以@ReactivePulsarListener创建响应式侦听器端点。以下组件在该someTopic主题上创建一个反应式侦听器端点:

@Component
public class MyBean {

    @ReactivePulsarListener(topics = "someTopic")
    public Mono<Void> processMessage(String content) {
        // ...
        return Mono.empty();
    }

}

Spring Boot 自动配置提供了 所需的所有组件ReactivePulsarListener,例如ReactivePulsarListenerContainerFactory它用来构造底层反应式 Pulsar 消费者的消费者工厂。您可以通过指定任何spring.pulsar.listener.spring.pulsar.consumer.前缀的应用程序属性来配置这些组件。

如果您需要对消费者工厂配置进行更多控制,请考虑注册一个或多个ReactiveMessageConsumerBuilderCustomizerbean。这些定制器适用于工厂创建的所有使用者,因此适用于所有@ReactivePulsarListener实例。您还可以通过设置@ReactivePulsarListener注解的consumerCustomizer属性来自定义单个监听器。

10.4.8. 阅读消息

Pulsar 阅读器界面使应用程序能够手动管理光标。当您使用阅读器连接到主题时,您需要指定阅读器在连接到主题时从哪条消息开始阅读。

当 Apache Pulsar 基础设施存在时,任何 Bean 都可以通过阅读器进行@PulsarReader注释以使用消息。以下组件创建一个读取器端点,该端点从someTopic主题的开头开始读取消息:

@Component
public class MyBean {

    @PulsarReader(topics = "someTopic", startMessageId = "earliest")
    public void processMessage(String content) {
        // ...
    }

}

@PulsarReader依赖于PulsarReaderFactory创建底层 Pulsar 阅读器。Spring Boot 自动配置提供了这个读取器工厂,可以通过设置任何带spring.pulsar.reader.*前缀的应用程序属性来自定义它。

如果您需要对读取器工厂配置进行更多控制,请考虑注册一个或多个ReaderBuilderCustomizerbean。这些定制器适用于工厂创建的所有读取器,因此适用于所有@PulsarReader实例。您还可以通过设置注解@PulsarReaderreaderCustomizer属性来自定义单个监听器。

10.4.9. 反应式地阅读消息

当 Apache Pulsar 基础设施存在并且响应式自动配置被激活时,将提供 Spring ReactivePulsarReaderFactory,您可以使用它创建一个阅读器,以便以响应式方式读取消息。以下组件使用提供的工厂创建一个阅读器,并从someTopic主题读取 5 分钟前的一条消息:

@Component
public class MyBean {

    private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

    public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
        this.pulsarReaderFactory = pulsarReaderFactory;
    }

    public void someMethod() {
        ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
            .topic("someTopic")
            .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
        Mono<Message<String>> message = this.pulsarReaderFactory
            .createReader(Schema.STRING, List.of(readerBuilderCustomizer))
            .readOne();
        // ...
    }

}

Spring Boot 自动配置提供了这个读取器工厂,可以通过设置任何带spring.pulsar.reader.*前缀的应用程序属性来自定义它。

如果您需要对读取器工厂配置进行更多控制,请考虑在使用工厂创建读取器时传递一个或多个ReactiveMessageReaderBuilderCustomizer实例。

如果您需要对读取器工厂配置进行更多控制,请考虑注册一个或多个ReactiveMessageReaderBuilderCustomizerbean。这些定制器适用于所有创建的阅读器。您还可以在创建阅读器时传递一个或多个ReactiveMessageReaderBuilderCustomizer,以仅将自定义应用到创建的阅读器。

有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar参考文档

10.4.10. 额外的脉冲星特性

自动配置支持的属性显示在附录的“集成属性”部分中。请注意,在大多数情况下,这些属性(连字符或驼峰命名法)直接映射到 Apache Pulsar 配置属性。有关详细信息,请参阅 Apache Pulsar 文档。

只有 Pulsar 支持的属性的子集可以直接通过PulsarProperties类获得。如果您希望使用不直接支持的其他属性来调整自动配置的组件,则可以使用上述每个组件支持的定制器。

最后更新于