首先說(shuō)一點(diǎn),企業(yè)中最常用的實(shí)際上既不是RocketMQ,也不是Kafka,而是RabbitMQ。
RocketMQ很強(qiáng)大,但主要是阿里推廣自己的云產(chǎn)品而開(kāi)源出來(lái)的一款消息隊(duì)列,其實(shí)中小企業(yè)用RocketMQ的沒(méi)有想象中那么多。
深層次的原因在于兔寶在中小企業(yè)普及更早,經(jīng)受的考驗(yàn)也更久,很容易產(chǎn)生「回頭客」,當(dāng)初隨RabbitMQ成長(zhǎng)的一批人才如今大部分都已成為企業(yè)中的中堅(jiān)骨干,技術(shù)選型親睞RabbitMQ的幾率就更高。
至于Kafka,主要還是用在大數(shù)據(jù)和日志采集方面,除了一些公司有特定的需求會(huì)使用外,對(duì)消息收發(fā)準(zhǔn)確率要求較高的公司依然是以RabbitMQ作為企業(yè)級(jí)消息隊(duì)列的首選。
工作這么多年我自身的感受是,RabbitMQ經(jīng)久不衰,除非后續(xù)其他消息中間件有與眾不同的使用體驗(yàn),否則依然是RabbitMQ的占有率更高。
所以準(zhǔn)備進(jìn)入軟件行業(yè)的小伙伴,我建議有必要系統(tǒng)的先把RabbitMQ學(xué)好,然后再學(xué)習(xí)其他消息中間件擴(kuò)展視野,他們的原理大同小異,是可以觸類(lèi)旁通的。
RabbitMQ避免消息丟失的方法主要是利用消息確認(rèn)機(jī)制和手動(dòng)簽收機(jī)制,所以有必要把這兩個(gè)概念搞清楚。
主要是生產(chǎn)者使用的機(jī)制,用來(lái)確認(rèn)消息是否被成功消費(fèi)。
配置如下:
這樣,當(dāng)你實(shí)現(xiàn)RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback這兩個(gè)接口的方法后,就可以針對(duì)性地進(jìn)行消息確認(rèn)的日志記錄,之后做進(jìn)一步的消息發(fā)送補(bǔ)償,以達(dá)到接近100%投遞的目的。
偽代碼如下:
RabbitMQ的消息是自動(dòng)簽收的,你可以理解為快遞簽收了,那么這個(gè)快遞的狀態(tài)就從發(fā)送變?yōu)橐押炇眨ㄒ坏膮^(qū)別是快遞公司會(huì)對(duì)物流軌跡有記錄,而MQ簽收后就從隊(duì)列中刪除了。
企業(yè)級(jí)開(kāi)發(fā)中,RabbitMQ我們基本都開(kāi)啟手動(dòng)簽收方式,這樣可以有效避免消息的丟失。
前文中已經(jīng)在生產(chǎn)者開(kāi)啟了手動(dòng)簽收機(jī)制,那么作為消費(fèi)方,也要設(shè)置手動(dòng)簽收。
配置如下:
消費(fèi)監(jiān)聽(tīng)時(shí),手動(dòng)簽收就一行代碼,偽代碼如下:
兩個(gè)概念搞清楚后,就可以來(lái)學(xué)習(xí)消息丟失的問(wèn)題和處理方案了。
消息丟失的原因無(wú)非有三種:
1)、消息發(fā)出后,中途網(wǎng)絡(luò)故障,服務(wù)器沒(méi)收到;
2)、消息發(fā)出后,服務(wù)器收到了,還沒(méi)持久化,服務(wù)器宕機(jī);
3)、消息發(fā)出后,服務(wù)器收到了,消費(fèi)方還未處理業(yè)務(wù)邏輯,服務(wù)卻掛掉了,而消息也自動(dòng)簽收,等于啥也沒(méi)干。
這三種情況,(1) 和 (2)是由于生產(chǎn)方未開(kāi)啟消息確認(rèn)機(jī)制導(dǎo)致,(3)是由于消費(fèi)方未開(kāi)啟手動(dòng)簽收機(jī)制導(dǎo)致。
1)、生產(chǎn)方發(fā)送消息時(shí),要try...catch,在catch中捕獲異常,并將MQ發(fā)送的關(guān)鍵內(nèi)容記錄到日志表中,日志表中要有消息發(fā)送狀態(tài),若發(fā)送失敗,由定時(shí)任務(wù)定期掃描重發(fā)并更新?tīng)顟B(tài);
2)、生產(chǎn)方publisher必須要加入確認(rèn)回調(diào)機(jī)制,確認(rèn)成功發(fā)送并簽收的消息,如果進(jìn)入失敗回調(diào)方法,就修改數(shù)據(jù)庫(kù)消息的狀態(tài),等待定時(shí)任務(wù)重發(fā);
3)、消費(fèi)方要開(kāi)啟手動(dòng)簽收ACK機(jī)制,消費(fèi)成功才將消息移除,失敗或因異常情況而尚未處理,就重新入隊(duì)。
其實(shí)這就是前面闡述兩個(gè)概念時(shí)已經(jīng)講過(guò)的內(nèi)容,也是接近100%消息投遞的企業(yè)級(jí)方案之一,主要目的就是為了解決消息丟失的問(wèn)題。
消息重復(fù)大體上有兩種情況會(huì)出現(xiàn):
1)、消息消費(fèi)成功,事務(wù)已提交,簽收時(shí)結(jié)果服務(wù)器宕機(jī)或網(wǎng)絡(luò)原因?qū)е潞炇帐。顟B(tài)會(huì)由unack轉(zhuǎn)變?yōu)閞eady,重新發(fā)送給其他消費(fèi)方;
2)、消息消費(fèi)失敗,由于retry重試機(jī)制,重新入隊(duì)又將消息發(fā)送出去。
網(wǎng)上大體上能搜羅到的方法有三種:
1)、消費(fèi)方業(yè)務(wù)接口做好冪等;
2)、消息日志表保存MQ發(fā)送時(shí)的唯一消息ID,消費(fèi)方可以根據(jù)這個(gè)唯一ID進(jìn)行判斷避免消息重復(fù);
3)、消費(fèi)方的Message對(duì)象有個(gè)getRedelivered()方法返回Boolean,為T(mén)RUE就表示重復(fù)發(fā)送過(guò)來(lái)的。
我這里只推薦第一種,業(yè)務(wù)方法冪等這是最直接有效的方式,(2)還要和數(shù)據(jù)庫(kù)產(chǎn)生交互,(3)有可能導(dǎo)致第一次消費(fèi)失敗但第二次消費(fèi)成功的情況被砍掉。
消息積壓出現(xiàn)的場(chǎng)景一般有兩種:
1)、消費(fèi)方的服務(wù)掛掉,導(dǎo)致一直無(wú)法消費(fèi)消息;
2)、消費(fèi)方的服務(wù)節(jié)點(diǎn)太少,導(dǎo)致消費(fèi)能力不足,從而出現(xiàn)積壓,這種情況極可能就是生產(chǎn)方的流量過(guò)大導(dǎo)致。
1)、既然消費(fèi)能力不足,那就擴(kuò)展更多消費(fèi)節(jié)點(diǎn),提升消費(fèi)能力;
2)、建立專(zhuān)門(mén)的隊(duì)列消費(fèi)服務(wù),將消息批量取出并持久化,之后再慢慢消費(fèi)。
(1)就是最直接的方式,也是消息積壓最常用的解決方案,但有些企業(yè)考慮到服務(wù)器成本壓力,會(huì)選擇第(2)種方案進(jìn)行迂回,先通過(guò)一個(gè)獨(dú)立服務(wù)把要消費(fèi)的消息存起來(lái),比如存到數(shù)據(jù)庫(kù),之后再慢慢處理這些消息即可。
這里單獨(dú)講一下本人在工作中使用RabbitMQ的一些心得,希望能有所幫助。
1)、消息丟失、消息重復(fù)、消息積壓三個(gè)問(wèn)題中,實(shí)際上主要解決的還是消息丟失,因?yàn)榇蟛糠止居霾坏较⒎e壓的場(chǎng)景,而稍微有水準(zhǔn)的公司核心業(yè)務(wù)都會(huì)解決冪等問(wèn)題,所以幾乎不存在消息重復(fù)的可能;
2)、消息丟失的最常見(jiàn)企業(yè)級(jí)方案之一就是定時(shí)任務(wù)補(bǔ)償,因?yàn)椴徽撌荢OA還是微服務(wù)的架構(gòu),必然會(huì)有分布式任務(wù)調(diào)度的存在,自然也就成為MQ最直接的補(bǔ)償方式,如果MQ一定要實(shí)現(xiàn)100%投遞,這種是最普遍的方案。但我實(shí)際上不推薦中小企業(yè)使用該方案,因?yàn)閼{空增加維護(hù)成本,而且沒(méi)有一定規(guī)模的項(xiàng)目完全沒(méi)必要,大家都小看了RabbitMQ本身的性能,比如我們公司,支撐一個(gè)三甲醫(yī)院,也就是三臺(tái)8核16G服務(wù)器的集群,上線至今3年毫無(wú)壓力;
3)、不要迷信網(wǎng)上和培訓(xùn)機(jī)構(gòu)講解的生產(chǎn)者消息確認(rèn)機(jī)制,也就是前面兩個(gè)概念中講到的ConfirmCallback和ReturnCallback,這種機(jī)制十分降低MQ性能,我們團(tuán)隊(duì)曾遇到過(guò)一次流量高峰期帶來(lái)的MQ傳輸及消費(fèi)性能大幅降低的情況,后來(lái)發(fā)現(xiàn)是消息確認(rèn)機(jī)制導(dǎo)致,關(guān)閉后立馬恢復(fù)正常,從此以后都不再使用這種機(jī)制,MQ運(yùn)行十分順暢。同時(shí)我們會(huì)建立后臺(tái)管理實(shí)現(xiàn)人工補(bǔ)償,通過(guò)識(shí)別業(yè)務(wù)狀態(tài)判斷消費(fèi)方是否處理了業(yè)務(wù)邏輯,畢竟這種情況都是少數(shù),性能和運(yùn)維成本,在這一塊我們選擇了性能;
4)、我工作這些年使用RabbitMQ沒(méi)見(jiàn)過(guò)自動(dòng)簽收方式,一定是開(kāi)啟手動(dòng)簽收;
5)、手動(dòng)簽收方式你在網(wǎng)上看到的教程幾乎都是處理完業(yè)務(wù)邏輯之后再手動(dòng)簽收,但實(shí)際上這種用法是不科學(xué)的,在分布式的架構(gòu)中,MQ用來(lái)解耦和轉(zhuǎn)發(fā)是非常常見(jiàn)的,如果是支付業(yè)務(wù),往往在回調(diào)通知中通過(guò)MQ轉(zhuǎn)發(fā)到其他服務(wù),其他服務(wù)如果業(yè)務(wù)處理不成功,那么手動(dòng)簽收也不執(zhí)行,這個(gè)消息又會(huì)入隊(duì)發(fā)給其他消費(fèi)者,這樣就可能在流量洪峰階段因?yàn)榕既坏臉I(yè)務(wù)處理失敗造成堵塞,甚至標(biāo)題所講的三種問(wèn)題同時(shí)出現(xiàn),這樣就會(huì)得不償失。
不科學(xué)的用法:在處理完業(yè)務(wù)邏輯后再手動(dòng)簽收,否則不簽收,就好比客人進(jìn)店了你得買(mǎi)東西,否則不讓走。
科學(xué)的用法:不論業(yè)務(wù)邏輯是否處理成功,最終都要將消息手動(dòng)簽收,MQ的使命不是保證客人進(jìn)店了必須消費(fèi),不消費(fèi)就不讓走,而是客人能進(jìn)來(lái)就行,哪怕是隨便看看也算任務(wù)完成。
可能有人會(huì)問(wèn)你這樣不是和自動(dòng)簽收沒(méi)區(qū)別嗎,NO,你要知道如果自動(dòng)簽收,出現(xiàn)消息丟失你連記錄日志的可能都沒(méi)有。
另外,為什么一定要這么做,因?yàn)镸Q是中間件,本身就是輔助工具,就是一個(gè)滴滴司機(jī),保證給你送到順便說(shuō)個(gè)再見(jiàn)就行,沒(méi)必要還下車(chē)給你搬東西。
如果強(qiáng)加給MQ過(guò)多壓力,只會(huì)造成本身業(yè)務(wù)的畸形。我們使用MQ的目的就是解耦和轉(zhuǎn)發(fā),不再做多余的事情,保證MQ本身是流暢的、職責(zé)單一的即可。
本篇主要講了RabbitMQ的三種常見(jiàn)問(wèn)題及解決方案,同時(shí)分享了一些作者本人工作中使用的心得,我想網(wǎng)上是很難找到的,如果哪一天用到了,不妨再打開(kāi)看看,也許能避免一些生產(chǎn)環(huán)境可能出現(xiàn)的問(wèn)題。
我總結(jié)下來(lái)就是三點(diǎn):
1)、消息100%投遞會(huì)增加運(yùn)維成本,中小企業(yè)視情況使用,非必要不使用;
2)、消息確認(rèn)機(jī)制影響性能,非必要不使用;
3)、消費(fèi)者先保證消息能簽收,業(yè)務(wù)處理失敗可以人工補(bǔ)償。
工作中怕的永遠(yuǎn)不是一個(gè)技術(shù)不會(huì)使用,而是遇到問(wèn)題不知道有什么解決思路。
END
私信作者回復(fù)「資源」有更多驚喜內(nèi)容哦!
消息隊(duì)列的優(yōu)點(diǎn):
沒(méi)有引入消息隊(duì)列:系統(tǒng)A發(fā)送數(shù)據(jù)給系統(tǒng)B,C;當(dāng)有需求系統(tǒng)B不需要接受系統(tǒng)A的數(shù)據(jù)了,又要改系統(tǒng)A的代碼;或者系統(tǒng)D也新增需求,需要接收系統(tǒng)A的數(shù)據(jù)了,此時(shí)又要去改系統(tǒng)A的代碼,給系統(tǒng)D發(fā)送數(shù)據(jù)。頻繁地改代碼,系統(tǒng)間耦合度高。
引入消息隊(duì)列:系統(tǒng)A直接發(fā)布數(shù)據(jù)到消息隊(duì)列中間件,需要數(shù)據(jù)的系統(tǒng)直接訂閱MQ即可,不需要數(shù)據(jù)的不訂閱。系統(tǒng)之間沒(méi)有任何耦合度。
典型的就是秒殺系統(tǒng),在短時(shí)間內(nèi)大量的訪問(wèn)會(huì)增加系統(tǒng)的壓力,導(dǎo)致系統(tǒng)崩潰。引入MQ后可以在系統(tǒng)之前加入緩沖,減少系統(tǒng)壓力。
用戶注冊(cè)后需要發(fā)送短信和郵件,此時(shí)可以使用MQ的異步消息。
應(yīng)用1
應(yīng)用2
ActiveMQ | RabbitMQ | RocketMQ | Kafka | |
性能(單臺(tái)) | 近萬(wàn)級(jí) | 萬(wàn)級(jí) | 十萬(wàn)級(jí) | 百萬(wàn)級(jí) |
消息持久化 | 支持 | 支持 | 支持 | 支持 |
多語(yǔ)言支持 | 支持 | 支持 | 很少 | 支持 |
社區(qū)活躍度 | 高 | 高 | 中 | 高 |
支持協(xié)議 | 多 (JMS,AMQP...) | 多( AMQP,STOMP,MQTT....) | 少 | 少 |
綜合評(píng)價(jià) | 優(yōu)點(diǎn): 成熟,已經(jīng)在很多公司得到應(yīng)用。較多的文檔。各種協(xié)議支持較好,有多個(gè)語(yǔ)言的成熟客戶端。缺點(diǎn):性能較弱。缺乏大規(guī)模吞吐的場(chǎng)景的應(yīng)用有江河日下之感 | 優(yōu)點(diǎn):性能較好,管理界面較豐富,在互聯(lián)網(wǎng)公司也有較大規(guī)模的應(yīng)用,有多個(gè)語(yǔ)言的成熟客戶端。缺點(diǎn):內(nèi)部機(jī)制很難了解,也意味很難定制和掌控。集群不支持動(dòng)態(tài)擴(kuò)用。 | 優(yōu)點(diǎn):模型簡(jiǎn)單接口易用。在阿里有大規(guī)模應(yīng)用分布式系統(tǒng),性能很好,版本更新很快。缺點(diǎn):文檔少支持的語(yǔ)言較少尚未主流 | 優(yōu)點(diǎn):天生分布式,性能最好,所以常見(jiàn)用于大數(shù)據(jù)領(lǐng)域。缺點(diǎn):運(yùn)維難度大有數(shù)據(jù)混亂的情況,對(duì)ZooKeeeper強(qiáng)依賴。多副本機(jī)制下對(duì)帶寬有定的要求。 |
開(kāi)發(fā)語(yǔ)言:erlang
采用的是AMQP協(xié)議
默認(rèn)端口:15672(http) 5672(amqp)
rabbitMQ的前臺(tái)控制頁(yè)面
交換機(jī)頁(yè)面
rabbitMQ的各個(gè)組成部分:
組成部分
消費(fèi)流程:
生產(chǎn)者綁定到交換機(jī)(簡(jiǎn)單工作隊(duì)列的交換機(jī)是默認(rèn)的交換機(jī))上,然后生產(chǎn)者通過(guò)channel信道發(fā)送消息是將消息發(fā)送到綁定到交換機(jī)上,交換機(jī)本身是不負(fù)責(zé)存儲(chǔ)消息的,它只是根據(jù)routingKey將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,由消費(fèi)者監(jiān)聽(tīng)消費(fèi)
常見(jiàn)的隊(duì)列標(biāo)簽:
AD:自動(dòng)刪除(消費(fèi)者與隊(duì)列斷開(kāi)連接后自動(dòng)刪除)
D:隊(duì)列持久化
I:內(nèi)部隊(duì)列
TTL:過(guò)期隊(duì)列
DLX:死信隊(duì)列
DLK:死信路由key
Lim:設(shè)置隊(duì)列的消息最大數(shù)量
rabbit的管理界面默認(rèn)是五秒刷新一次,可以更改
簡(jiǎn)單模式下也會(huì)有一個(gè)默認(rèn)交換機(jī),發(fā)送消息是交換機(jī)做的事
官方文檔上的圖
創(chuàng)建一個(gè)隊(duì)列
創(chuàng)建一個(gè)隊(duì)列
JAVA代碼演示:
/**
* simple模式生產(chǎn)者
*/
public class Producer {
private final static String HOST="10.211.55.4";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672(http)而代碼中使用的是5672(amqp)
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection();
//一個(gè)連接中可以包含多個(gè)通道。channel是邏輯通道;連接耗時(shí)耗費(fèi)資源
channel=connection.createChannel();
//durable:是否持久化,服務(wù)器重啟隊(duì)列是否還存在,設(shè)置為true,則隊(duì)列會(huì)持久化,但是里面的消息不會(huì)持久化,mq重啟,消息會(huì)丟失
//exclusive:是否排他
//autoDelete:隊(duì)列消費(fèi)完是否自動(dòng)刪除,即對(duì)列中沒(méi)有消息是否要?jiǎng)h除
//arguments:其他參數(shù)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message="helloSimpleQueue";
//第三個(gè)是額外參數(shù)//arguments:其他參數(shù),設(shè)置為MessageProperties.PERSISTENT_TEXT_PLAIN則表示這條消息會(huì)持久化隊(duì)列中
//沒(méi)有交換機(jī)就寫(xiě)空就可以
channel.basicPublish("", message, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("消息發(fā)送成功");
} catch (Exception e) {
e.getMessage();
} finally {
//關(guān)閉連接
...
}
}
}
/**
* simple模式消費(fèi)者
*/
public class Consumer {
//服務(wù)端沒(méi)有隊(duì)列,消費(fèi)會(huì)報(bào)錯(cuò)??!
private final static String HOST="10.211.55.4";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672而代碼中使用的是5672
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection();
channel=connection.createChannel();
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
//消息消費(fèi)成功回調(diào)
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
//消息消費(fèi)失敗回調(diào)
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失敗:" + consumerTag);
}
});
} catch (Exception e) {
e.getMessage();
} finally {
//關(guān)閉連接
...
}
}
}
輪詢:每個(gè)隊(duì)列收到的消息數(shù)量一致(99/3),輪詢模式下,消息會(huì)被均分給每個(gè)消費(fèi)者,不論消費(fèi)者處理時(shí)間快慢
公平分發(fā):根據(jù)隊(duì)列的能力分發(fā),誰(shuí)快給誰(shuí)
官方文檔上的圖
這種模式下需要將隊(duì)列綁定到交換機(jī)上,但是無(wú)routingkey,我們可以自己在頁(yè)面上創(chuàng)建一個(gè)指定模式的交換機(jī)
官方文檔上的圖
創(chuàng)建交換機(jī):
創(chuàng)建交換機(jī)
將隊(duì)列綁定交換機(jī):
將隊(duì)列綁定交換機(jī)
JAVA代碼演示:
/**
* fanout模式生產(chǎn)者
*/
public class Producer {
private final static String HOST="10.211.55.7";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672(http)而代碼中使用的是5672(amqp)
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
private final static String EXCHANGE_NAME="fanout-Test";
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection("helloProducerConnection");
//一個(gè)連接中可以包含多個(gè)通道。channel是邏輯通道;連接耗時(shí)耗費(fèi)資源
channel=connection.createChannel();
//durable:是否持久化,服務(wù)器重啟是否還存在
//exclusive:是否排他
//autoDelete:消費(fèi)完是否自動(dòng)刪除
//arguments:其他參數(shù)
//可以聲明多個(gè)隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME + "1", false, false, false, null);
channel.queueDeclare(QUEUE_NAME + "2", false, false, false, null);
//聲明交換機(jī)fanout模式,routingKey必須為空字符串而不是null
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//將隊(duì)列綁定到交換機(jī)上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME + "1", EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME + "2", EXCHANGE_NAME, "");
String message="777";
//發(fā)布消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("消息發(fā)送成功");
} catch (Exception e) {
e.getStackTrace();
} finally {
}
}
}
/**
* fanout模式消費(fèi)者
*/
public class Consumer {
//服務(wù)端沒(méi)有隊(duì)列,消費(fèi)會(huì)報(bào)錯(cuò)!!
private final static String HOST="10.211.55.7";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672而代碼中使用的是5672
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
private final static String EXCHANGE_NAME="fanout-Test";
public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection("helloProducerConnection");
channel=connection.createChannel();
//將信道與交換機(jī)綁定
channel.exchangeBind(EXCHANGE_NAME, EXCHANGE_NAME, "");
//指定隊(duì)列接收消息
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失?。?#34; + consumerTag);
}
});
channel.basicConsume(QUEUE_NAME + 1, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失?。?#34; + consumerTag);
}
});
channel.basicConsume(QUEUE_NAME + 2, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失?。?#34; + consumerTag);
}
});
System.in.read();
} catch (Exception e) {
e.getMessage();
} finally {
}
}
}
既要綁定到交換機(jī),又要設(shè)置routingkey,會(huì)根據(jù)routingkey分發(fā)到不同的隊(duì)列中
官方文檔上的圖
綁定交換機(jī),與路由模式不同的是,routingkey支持模糊匹配(特殊字符 * 與 # ,用于做模糊匹配,其中 * 用于匹配一個(gè)單詞, #用于匹配多個(gè)單詞(可以是零個(gè)))
官方文檔上的圖
JAVA代碼演示:
/**
* topic模式生產(chǎn)者
*/
public class Producer {
private final static String HOST="10.211.55.7";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672(http)而代碼中使用的是5672(amqp)
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
private final static String EXCHANGE_NAME="topic-Test";
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection("helloProducerConnection");
//一個(gè)連接中可以包含多個(gè)通道。channel是邏輯通道;連接耗時(shí)耗費(fèi)資源
channel=connection.createChannel();
//durable:是否持久化,服務(wù)器重啟是否還存在
//exclusive:是否排他
//autoDelete:消費(fèi)完是否自動(dòng)刪除
//arguments:其他參數(shù)
//可以聲明多個(gè)隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME + "1", false, false, false, null);
channel.queueDeclare(QUEUE_NAME + "2", false, false, false, null);
//聲明交換機(jī)direct模式
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.*");
channel.queueBind(QUEUE_NAME + "1", EXCHANGE_NAME, "topic.#");
channel.queueBind(QUEUE_NAME + "2", EXCHANGE_NAME, "*.111");
String message="topic is good";
//只給routingKey=directs的發(fā)布消息
channel.basicPublish(EXCHANGE_NAME, "topic.111.222", null, message.getBytes());
System.out.println("消息發(fā)送成功");
} catch (Exception e) {
e.getStackTrace();
} finally {
}
}
}
/**
* topic模式消費(fèi)者
*/
public class Consumer {
//服務(wù)端沒(méi)有隊(duì)列,消費(fèi)會(huì)報(bào)錯(cuò)??!
private final static String HOST="10.211.55.7";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672而代碼中使用的是5672
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
private final static String EXCHANGE_NAME="direct-Test";
public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection("helloProducerConnection");
channel=connection.createChannel();
//交換機(jī)發(fā)的消息的routingKey是啥這里就寫(xiě)啥,這里寫(xiě)不寫(xiě)都無(wú)所謂
//channel.exchangeBind(EXCHANGE_NAME,EXCHANGE_NAME,"topic.111.222");
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失?。?#34; + consumerTag);
}
});
channel.basicConsume(QUEUE_NAME + 1, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失?。?#34; + consumerTag);
}
});
channel.basicConsume(QUEUE_NAME + 2, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失?。?#34; + consumerTag);
}
});
} catch (Exception e) {
e.getMessage();
} finally {
}
}
}
<!-- spring整合rabbitmq依賴-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
//定義一個(gè)配置類(lèi)用于讀取yml中的自定義配置
@ConfigurationProperties(prefix="spring.rabbitmq")
public class ConnectionFactoryConfig {
private String username;
private String password;
private String host;
private Integer port;
private String virtualHost;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username=username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password=password;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host=host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port=port;
}
public String getVirtualHost() {
return virtualHost;
}
public void setVirtualHost(String virtualHost) {
this.virtualHost=virtualHost;
}
@Override
public String toString() {
return "ConnectionFactoryConfig{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
", host='" + host + '\'' +
", port=" + port +
", virtualHost='" + virtualHost + '\'' +
'}';
}
}
//配置mq的連接信息
@Configuration
//這里可以使用此注解將@ConfigurationProperties注解加入IOC容器,也可以直接在@ConfigurationProperties注解修飾的類(lèi)上,加@Component注解加入spring的IOC容器
@EnableConfigurationProperties({ConnectionFactoryConfig.class})
public class RabbitMqConfig {
@Autowired
public ConnectionFactoryConfig config;
@Autowired
public RabbitTemplate rabbitTemplate;
//自己定義了一個(gè)連接工廠學(xué)習(xí),日常中我們可以不定義,直接在yml中設(shè)置值就行——>spring.rabbitmq.username:
@Bean("connectionFactory")
public CachingConnectionFactory createConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
cachingConnectionFactory.setHost(config.getHost());
cachingConnectionFactory.setPort(config.getPort());
cachingConnectionFactory.setUsername(config.getUsername());
cachingConnectionFactory.setPassword(config.getPassword());
cachingConnectionFactory.setVirtualHost(config.getVirtualHost());
//消息到達(dá)交換機(jī)是否確認(rèn)
//或者在配置文件中配置publisher-confirms 為true
cachingConnectionFactory.setPublisherConfirms(true);
//消息到達(dá)隊(duì)列是否確認(rèn)
//或者在配置文件中配置publisher-returns 為true
cachingConnectionFactory.setPublisherReturns(true);
return cachingConnectionFactory;
}
//封裝了對(duì)mq的管理操作
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(createConnectionFactory());
@PostConstruct
public void init() {
//設(shè)置消息未到達(dá)隊(duì)列是否回調(diào)setReturnCallback接口
//使用return-callback時(shí)必須設(shè)置mandatory為true,或者在配置中設(shè)置mandatory-expression的值為true
rabbitTemplate.setMandatory(true);
//消息到達(dá)交換機(jī)是否確認(rèn)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息是否到達(dá)交換機(jī):" + ack);
}
});
//如果exchange到queue成功,則不回調(diào)return;如果exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息是否沒(méi)有到達(dá)隊(duì)列..." + JSON.toJSONString(message));
}
});
}
}
//創(chuàng)建相關(guān)隊(duì)列和交換機(jī)并綁定隊(duì)列
//聲明消費(fèi)者監(jiān)聽(tīng)相關(guān)隊(duì)列
public class RabbitMqTopicByRabbitAdminConfig {
//該類(lèi)封裝了對(duì)mq的管理操作
private RabbitAdmin rabbitAdmin;
@Autowired
public void setRabbitAdmin(RabbitAdmin rabbitAdmin) {
this.rabbitAdmin=rabbitAdmin;
}
@Autowired
private CachingConnectionFactory connectionFactory;
@PostConstruct
public void init() {
rabbitAdmin.declareQueue(new Queue("rabbitAdminQueue", false, false, false));
rabbitAdmin.declareQueue(new Queue("rabbitAdminQueueAnnotation", false, false, false));
rabbitAdmin.declareExchange(new TopicExchange("rabbitAdminExchange", false, false));
rabbitAdmin.declareBinding(new Binding("rabbitAdminQueue", Binding.DestinationType.QUEUE, "rabbitAdminExchange", "*.queue.#", null));
rabbitAdmin.declareBinding(new Binding("rabbitAdminQueueAnnotation", Binding.DestinationType.QUEUE, "rabbitAdminExchange", "*.queueAnnotation.#", null));
}
//使用代碼配置的方式綁定消費(fèi)者,也可以使用@RabbitListener(queues={"rabbitAdminQueue"})
@Bean
public SimpleMessageListenerContainer adminMessageContainer() {
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("rabbitAdminQueue");
container.setMessageListener(SpringUtil.getBean("topicExchangeAdminListener", TopicExchangeAdminListener.class));
//設(shè)置手動(dòng)ack,消費(fèi)者收到消息是否回執(zhí)
//也可以設(shè)置在配置文件中:spring.rabbitmq.listener.simple.acknowledge-mode=manual
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//設(shè)置ack手動(dòng)回執(zhí)之后,保證消費(fèi)者每次只拿到一條消息
container.setPrefetchCount(1);
return container;
}
}
/**
* @Author:wangpeng
* @Description:監(jiān)聽(tīng)相關(guān)隊(duì)列的消費(fèi)者,設(shè)置手動(dòng)回執(zhí)ack后,如何給隊(duì)列回執(zhí)ack
**/
@Component("topicExchangeAdminListener")
//當(dāng)使用ChannelAwareMessageListener時(shí),代表需要手動(dòng)設(shè)置ack回執(zhí),在SimpleMessageListenerContainer中設(shè)置了手動(dòng)回執(zhí)
public class TopicExchangeAdminListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//表明消費(fèi)者確認(rèn)收到當(dāng)前消息,第二個(gè)參數(shù)表示一次是否 ack 多條消息
String s=new String(message.getBody());
if (s.equals("666")) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (s.equals("777")) {
//第二個(gè)參數(shù)表示一次是否拒絕多條消息 第三個(gè)參數(shù)表示是否把當(dāng)前消息從新入隊(duì)(false的話,會(huì)直接丟掉)
//同一個(gè)channel deliveryTag不一致
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else if (s.equals("888")) {
//表明消費(fèi)者拒絕當(dāng)前消息,第二個(gè)參數(shù)表示是否把當(dāng)前消息從新入隊(duì)
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
throw new RuntimeException("不消費(fèi)??!");
}
}
}
以上是關(guān)于rabbitMQ的一些常規(guī)基礎(chǔ)操作,后面還會(huì)有一些高級(jí)操作,像死信隊(duì)列、延遲隊(duì)列、如何保證順序消費(fèi)等