10.2. AMQP

高级消息队列协议 (AMQP) 是面向消息中间件的平台中立、线路级协议。Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括spring-boot-starter-amqp“Starter”。

10.2.1. RabbitMQ 支持

RabbitMQ是一个基于 AMQP 协议的轻量级、可靠、可扩展、可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。

RabbitMQ 配置为spring.rabbitmq.*. 例如,您可以在application.properties 中声明以下部分:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret

或者,您可以使用以下addresses属性配置相同的连接:

spring.rabbitmq.addresses=amqp://admin:secret@localhost

当以这种方式指定地址时,hostport属性将被忽略。如果地址使用该amqps协议,则会自动启用 SSL 支持。

请参阅RabbitProperties 参考资料了解更多受支持的基于属性的配置选项。要配置Spring AMQP 使用的RabbitMQ ConnectionFactory的较低级别详细信息,请定义一个ConnectionFactoryCustomizerbean。

如果上下文中存在ConnectionNameStrategy bean,它将自动用于命名由自动配置的CachingConnectionFactory创建的连接.

要对RabbitTemplate 进行应用程序范围的附加定制,请使用RabbitTemplateCustomizerbean。

有关更多详细信息 , 请参阅了解 RabbitMQ 使用的协议 AMQP 。

10.2.2. 发送消息

Spring 的AmqpTemplateAmqpAdmin是自动配置的,您可以将它们直接自动装配到您自己的 bean 中,如以下示例所示:

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

}

RabbitMessagingTemplate可以用类似的方式注入。如果MessageConverter定义了一个 bean,它会自动关联到自动配置的AmqpTemplate.

如有必要,任何定义为 bean 的 org.springframework.amqp.core.Queue 都会自动用于在 RabbitMQ 实例上声明相应的队列。

要重试操作,您可以启用AmqpTemplate的重试(例如,在代理连接丢失的情况下):

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s

默认情况下禁用重试。您还可以通过声明RabbitRetryTemplateCustomizerbean 以编程方式自定义RetryTemplate

如果您需要创建更多RabbitTemplate实例或者想要覆盖默认值,Spring Boot 提供了一个RabbitTemplateConfigurerbean,您可以使用该 bean 来初始化RabbitTemplate与自动配置所使用的工厂相同的设置。

10.2.3. 向流发送消息

要将消息发送到特定流,请指定流的名称,如以下示例所示:

spring.rabbitmq.stream.name=my-stream

如果定义了MessageConverterStreamMessageConverterProducerCustomizerbean,它会自动关联到自动配置的RabbitStreamTemplate.

如果您需要创建更多RabbitStreamTemplate实例或者想要覆盖默认值,Spring Boot 提供了一个RabbitStreamTemplateConfigurerbean,您可以使用该 bean 来初始化RabbitStreamTemplate与自动配置所使用的工厂相同的设置。

10.2.4. 接收消息

当 Rabbit 基础设施存在时,任何 bean 都可以被注释@RabbitListener以创建侦听器端点。如果RabbitListenerContainerFactory未定义,则会自动配置默认值SimpleRabbitListenerContainerFactory,您可以使用spring.rabbitmq.listener.type属性切换到直接容器。如果定义了一个MessageConverter或一个MessageRecovererbean,它会自动与默认工厂关联。

以下示例组件在someQueue队列上创建侦听器端点:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}

有关详细信息, 请参阅@EnableRabbitJavadoc

如果您需要创建更多RabbitListenerContainerFactory实例或者想要覆盖默认值,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,您可以使用它们来初始化 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory,其设置与自动配置所使用的工厂相同。

您选择哪种容器类型并不重要。这两个 bean 通过自动配置公开。

例如,以下配置类公开了另一个使用特定MessageConverter的工厂:

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}

然后你可以在任何带@RabbitListener注释的方法中使用工厂,如下所示:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}

您可以启用重试来处理侦听器引发异常的情况。默认情况下,使用RejectAndDontRequeueRecoverer ,但您可以定义自己的MessageRecoverer。当重试次数用尽时,消息将被拒绝并被丢弃或路由到死信交换(如果代理配置为这样做)。默认情况下,重试被禁用。您还可以通过声明RabbitRetryTemplateCustomizerbean 以编程方式自定义RetryTemplate

默认情况下,如果禁用重试并且侦听器引发异常,则将无限期地重试传递。您可以通过两种方式修改此行为:将defaultRequeueRejected属性设置为false以便尝试零次重新传递,或者抛出AmqpRejectAndDontRequeueException来表示应拒绝消息。后者是启用重试并且达到最大传递尝试次数时使用的机制。

最后更新于