RabbitMQ 的消息持久化與 Spring AMQP 的實現(xiàn)詳解
前言
要從奔潰的 RabbitMQ 中恢復(fù)的消息,我們需要做消息持久化。如果消息要從 RabbitMQ 奔潰中恢復(fù),那么必須滿足三點,且三者缺一不可。
- 交換器必須是持久化。
- 隊列必須是持久化的。
- 消息必須是持久化的。
原生的實現(xiàn)方式
原生的 RabbitMQ 客戶端需要完成三個步驟。
第一步,交換器的持久化。
// 參數(shù)1 exchange :交換器名 // 參數(shù)2 type :交換器類型 // 參數(shù)3 durable :是否持久化 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
第二步,隊列的持久化。
// 參數(shù)1 queue :隊列名 // 參數(shù)2 durable :是否持久化 // 參數(shù)3 exclusive :僅創(chuàng)建者可以使用的私有隊列,斷開后自動刪除 // 參數(shù)4 autoDelete : 當(dāng)所有消費客戶端連接斷開后,是否自動刪除隊列 // 參數(shù)5 arguments channel.queueDeclare(QUEUE_NAME, true, false, false, null);
第三步,消息的持久化。
// 參數(shù)1 exchange :交換器
// 參數(shù)2 routingKey : 路由鍵
// 參數(shù)3 props : 消息的其他參數(shù),其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
// 參數(shù)4 body : 消息體
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Spring AMQP 的實現(xiàn)方式
Spring AMQP 是對原生的 RabbitMQ 客戶端的封裝。一般情況下,我們只需要定義交換器的持久化和隊列的持久化。
其中,交換器的持久化配置如下。
// 參數(shù)1 name :交互器名 // 參數(shù)2 durable :是否持久化 // 參數(shù)3 autoDelete :當(dāng)所有消費客戶端連接斷開后,是否自動刪除隊列 new TopicExchange(name, durable, autoDelete)
此外,還需要再配置隊列的持久化。
// 參數(shù)1 name :隊列名 // 參數(shù)2 durable :是否持久化 // 參數(shù)3 exclusive :僅創(chuàng)建者可以使用的私有隊列,斷開后自動刪除 // 參數(shù)4 autoDelete : 當(dāng)所有消費客戶端連接斷開后,是否自動刪除隊列 new Queue(name, durable, exclusive, autoDelete);
至此,RabbitMQ 的消息持久化配置完畢。
那么,消息的持久化難道不需要配置么?確實如此,我們來看下源碼。
一般情況下,我們會通過這種方式發(fā)送消息。
rabbitTemplate.convertAndSend(exchange, routeKey, message);
其中,調(diào)用了 convertAndSend(String exchange, String routingKey, final Object object) 方法。
@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}
接著,用調(diào)用了 convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) 方法。
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}
此時,最關(guān)鍵的方法出現(xiàn)了,它是 convertMessageIfNecessary(final Object object)。
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}
其中,關(guān)鍵的是 MessageProperties 類,它持久化的策略是 MessageDeliveryMode.PERSISTENT,因此它會初始化時默認(rèn)消息是持久化的。
public class MessageProperties implements Serializable {
public MessageProperties() {
this.deliveryMode = DEFAULT_DELIVERY_MODE;
this.priority = DEFAULT_PRIORITY;
}
static {
DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
DEFAULT_PRIORITY = Integer.valueOf(0);
}
}
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Mybatis注解實現(xiàn)多數(shù)據(jù)源讀寫分離詳解
這篇文章主要給大家介紹了關(guān)于Mybatis注解實現(xiàn)多數(shù)據(jù)源讀寫分離的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Mybatis具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
vue3使用vue-diff工具來比較數(shù)據(jù)差異
這篇文章主要為大家詳細(xì)介紹了vue3如何使用vue-diff工具來比較數(shù)據(jù)差異,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-11-11

