# 10.4. Apache Pulsar 支持

[Apache Pulsar通过提供](https://pulsar.apache.org/)[Spring for Apache Pulsar](https://docs.spring.io/spring-pulsar/docs/1.0.0/reference/)项目的自动配置来支持。

当`org.springframework.pulsar:spring-pulsar`位于类路径上时，Spring Boot 将自动配置并注册经典（强制）Spring for Apache Pulsar 组件。当`org.springframework.pulsar:spring-pulsar-reactive`位于类路径上时，它将对反应式组件执行相同的操作。

有`spring-boot-starter-pulsar`和`spring-boot-starter-pulsar-reactive`“Starters” 分别用于方便地收集命令式和反应式使用的依赖项。

**10.4.1. 连接到 Pulsar**

当您使用 Pulsar 启动器时，Spring Boot 将自动配置并注册一个`PulsarClient`bean。

默认情况下，应用程序尝试连接到位于 的本地 Pulsar 实例`pulsar://localhost:6650`。这可以通过将`spring.pulsar.client.service-url`属性设置为不同的值来调整。

> 该值必须是有效的[Pulsar 协议](https://pulsar.apache.org/docs/client-libraries-java/#connection-urls)URL

您可以通过指定任何带`spring.pulsar.client.*`前缀的应用程序属性来配置客户端。

如果您需要对配置进行更多控制，请考虑注册一个或多个`PulsarClientBuilderCustomizer`bean。

**验证**

要连接到需要身份验证的 Pulsar 集群，您需要通过设置`pluginClassName`和插件所需的任何参数来指定要使用的身份验证插件。您可以将参数设置为参数名称到参数值的映射。以下示例展示了如何配置`AuthenticationOAuth2`插件。

```
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
```

> 您需要确保`spring.pulsar.client.authentication.param.*`下面定义的名称与您的身份验证插件所期望的名称完全匹配（通常是驼峰式大小写）。Spring Boot 不会尝试对这些条目进行任何类型的宽松绑定。
>
> 例如，如果您想为`AuthenticationOAuth2`auth 插件配置颁发者 url，则必须使用`spring.pulsar.client.authentication.param.issuerUrl`. 如果您使用其他形式，例如`issuerurl`或`issuer-url`，该设置将不会应用于插件。

**SSL协议**

默认情况下，Pulsar 客户端以纯文本方式与 Pulsar 服务进行通信。您可以按照Spring for Apache Pulsar 参考文档中的[这些步骤](https://docs.spring.io/spring-pulsar/docs/1.0.0/reference/reference/pulsar.html#tls-encryption)来启用 TLS 加密。

有关客户端和身份验证的完整详细信息，请参阅 Spring for Apache Pulsar[参考文档](https://docs.spring.io/spring-pulsar/docs/1.0.0/reference/reference/pulsar.html#pulsar-client)。

**10.4.2. 反应式连接到 Pulsar**

当Reactive自动配置被激活时，Spring Boot将自动配置并注册一个`ReactivePulsarClient`bean。

`ReactivePulsarClient`采用了前面描述的`PulsarClient`实例。因此，请按照上一节`PulsarClient`配置`ReactivePulsarClient`.

**10.4.3. 连接到 Pulsar 管理**

Spring for Apache Pulsar 的`PulsarAdministration`客户端也是自动配置的。

默认情况下，应用程序尝试连接到位于`http://localhost:8080` 的本地 Pulsar 实例。这可以通过在表单中将`spring.pulsar.admin.service-url`属性设置为`(http|https)://<host>:<port>`不同的值来调整。

如果您需要对配置进行更多控制，请考虑注册一个或多个`PulsarAdminBuilderCustomizer`bean。

**验证**

当访问需要身份验证的 Pulsar 集群时，管理客户端需要与常规 Pulsar 客户端相同的安全配置。您可以通过`spring.pulsar.admin.authentication`替换为 `spring.pulsar.client.authentication`来使用上述[身份验证配置](https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#messaging.pulsar.connecting.auth)。

> 要在启动时创建主题，请添加类型为`PulsarTopic` 的 bean 。如果主题已经存在，则忽略该 bean。

**10.4.4。发送消息**

Spring`PulsarTemplate`是自动配置的，您可以使用它来发送消息，如以下示例所示：

```
@Component
public class MyBean {
​
    private final PulsarTemplate<String> pulsarTemplate;
​
    public MyBean(PulsarTemplate<String> pulsarTemplate) {
        this.pulsarTemplate = pulsarTemplate;
    }
​
    public void someMethod() throws PulsarClientException {
        this.pulsarTemplate.send("someTopic", "Hello");
    }
​
}
```

`PulsarTemplate`依赖于`PulsarProducerFactory`创建底层的 Pulsar 生产者。Spring Boot 自动配置还提供了这个生产者工厂，默认情况下，它会缓存它创建的生产者。您可以通过指定任何`spring.pulsar.producer.*`和`spring.pulsar.producer.cache.*`前缀的应用程序属性来配置生产者工厂和缓存设置。

如果您需要对生产者工厂配置进行更多控制，请考虑注册一个或多个`ProducerBuilderCustomizer`bean。这些定制器适用于所有创建的生产者。也可以在发送消息时传入a`ProducerBuilderCustomizer`，只影响当前的生产者。

如果您需要对发送的消息进行更多控制，可以在发送消息时传入`TypedMessageBuilderCustomizer` 。

**10.4.5. 反应式发送消息**

当Reactive自动配置被激活时，Spring`ReactivePulsarTemplate`会自动配置，你可以使用它来发送消息，如下例所示：

```
@Component
public class MyBean {
​
    private final ReactivePulsarTemplate<String> pulsarTemplate;
​
    public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
        this.pulsarTemplate = pulsarTemplate;
    }
​
    public void someMethod() {
        this.pulsarTemplate.send("someTopic", "Hello").subscribe();
    }
​
}
```

`ReactivePulsarTemplate`依赖于`ReactivePulsarSenderFactory` 来实际创建底层发送者。Spring Boot 自动配置还提供了这个发送者工厂，默认情况下，它会缓存它创建的生产者。您可以通过指定任何`spring.pulsar.producer.*`和`spring.pulsar.producer.cache.*`前缀应用程序属性来配置发件人工厂和缓存设置。

如果您需要对发送者工厂配置进行更多控制，请考虑注册一个或多个`ReactiveMessageSenderBuilderCustomizer`bean。这些定制器适用于所有创建的发件人。也可以在发送消息时传入 `ReactiveMessageSenderBuilderCustomizer` ，只影响当前发送者。

如果您需要对发送的消息进行更多控制，可以在发送消息时传入 `MessageSpecBuilderCustomizer` 。

**10.4.6. 接收消息**

当 Apache Pulsar 基础设施存在时，可以对任何 bean 进行注释`@PulsarListener`以创建侦听器端点。以下组件在`someTopic`主题上创建侦听器端点：

```
@Component
public class MyBean {
​
    @PulsarListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }
​
}
```

Spring Boot 自动配置提供了`PulsarListener` 所需的所有组件，例如`PulsarListenerContainerFactory`用来构造底层 Pulsar 消费者的消费者工厂。您可以通过指定任何`spring.pulsar.listener.*`和`spring.pulsar.consumer.*`前缀的应用程序属性来配置这些组件。

如果您需要对消费者工厂配置进行更多控制，请考虑注册一个或多个`ConsumerBuilderCustomizer`bean。这些定制器适用于工厂创建的所有使用者，因此适用于所有`@PulsarListener`实例。您还可以通过设置`@PulsarListener`注解的`consumerCustomizer`属性来自定义单个监听器。

**10.4.7. 反应性地接收消息**

当 Apache Pulsar 基础设施存在并且响应式自动配置被激活时，可以对任何 bean 进行注释以`@ReactivePulsarListener`创建响应式侦听器端点。以下组件在该`someTopic`主题上创建一个反应式侦听器端点：

```
@Component
public class MyBean {
​
    @ReactivePulsarListener(topics = "someTopic")
    public Mono<Void> processMessage(String content) {
        // ...
        return Mono.empty();
    }
​
}
```

Spring Boot 自动配置提供了 所需的所有组件`ReactivePulsarListener`，例如`ReactivePulsarListenerContainerFactory`它用来构造底层反应式 Pulsar 消费者的消费者工厂。您可以通过指定任&#x4F55;**`spring.pulsar.listener.`**&#x548C;**`spring.pulsar.consumer.`**&#x524D;缀的应用程序属性来配置这些组件。

如果您需要对消费者工厂配置进行更多控制，请考虑注册一个或多个`ReactiveMessageConsumerBuilderCustomizer`bean。这些定制器适用于工厂创建的所有使用者，因此适用于所有`@ReactivePulsarListener`实例。您还可以通过设置`@ReactivePulsarListener`注解的`consumerCustomizer`属性来自定义单个监听器。

**10.4.8. 阅读消息**

Pulsar 阅读器界面使应用程序能够手动管理光标。当您使用阅读器连接到主题时，您需要指定阅读器在连接到主题时从哪条消息开始阅读。

当 Apache Pulsar 基础设施存在时，任何 Bean 都可以通过阅读器进行`@PulsarReader`注释以使用消息。以下组件创建一个读取器端点，该端点从`someTopic`主题的开头开始读取消息：

```
@Component
public class MyBean {
​
    @PulsarReader(topics = "someTopic", startMessageId = "earliest")
    public void processMessage(String content) {
        // ...
    }
​
}
```

`@PulsarReader`依赖于`PulsarReaderFactory`创建底层 Pulsar 阅读器。Spring Boot 自动配置提供了这个读取器工厂，可以通过设置任何带`spring.pulsar.reader.*`前缀的应用程序属性来自定义它。

如果您需要对读取器工厂配置进行更多控制，请考虑注册一个或多个`ReaderBuilderCustomizer`bean。这些定制器适用于工厂创建的所有读取器，因此适用于所有`@PulsarReader`实例。您还可以通过设置注解`@PulsarReader`的`readerCustomizer`属性来自定义单个监听器。

**10.4.9. 反应式地阅读消息**

当 Apache Pulsar 基础设施存在并且响应式自动配置被激活时，将提供 Spring `ReactivePulsarReaderFactory`，您可以使用它创建一个阅读器，以便以响应式方式读取消息。以下组件使用提供的工厂创建一个阅读器，并从`someTopic`主题读取 5 分钟前的一条消息：

```
@Component
public class MyBean {
​
    private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
​
    public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
        this.pulsarReaderFactory = pulsarReaderFactory;
    }
​
    public void someMethod() {
        ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
            .topic("someTopic")
            .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
        Mono<Message<String>> message = this.pulsarReaderFactory
            .createReader(Schema.STRING, List.of(readerBuilderCustomizer))
            .readOne();
        // ...
    }
​
}
```

Spring Boot 自动配置提供了这个读取器工厂，可以通过设置任何带`spring.pulsar.reader.*`前缀的应用程序属性来自定义它。

如果您需要对读取器工厂配置进行更多控制，请考虑在使用工厂创建读取器时传递一个或多个`ReactiveMessageReaderBuilderCustomizer`实例。

如果您需要对读取器工厂配置进行更多控制，请考虑注册一个或多个`ReactiveMessageReaderBuilderCustomizer`bean。这些定制器适用于所有创建的阅读器。您还可以在创建阅读器时传递一个或多个`ReactiveMessageReaderBuilderCustomizer`，以仅将自定义应用到创建的阅读器。

> 有关上述任何组件的更多详细信息以及发现其他可用功能，请参阅 Spring for Apache Pulsar[参考文档](https://docs.spring.io/spring-pulsar/docs/1.0.0/reference/)。

**10.4.10. 额外的脉冲星特性**

自动配置支持的属性显示在附录的[“集成属性”部分中。](https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#appendix.application-properties.integration)请注意，在大多数情况下，这些属性（连字符或驼峰命名法）直接映射到 Apache Pulsar 配置属性。有关详细信息，请参阅 Apache Pulsar 文档。

只有 Pulsar 支持的属性的子集可以直接通过`PulsarProperties`类获得。如果您希望使用不直接支持的其他属性来调整自动配置的组件，则可以使用上述每个组件支持的定制器。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://doc.shiker.tech/spring-boot-can-kao-wen-dang/10.-xiao-xi/10.4.-apache-pulsar-zhi-chi.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
