操屁眼的视频在线免费看,日本在线综合一区二区,久久在线观看免费视频,欧美日韩精品久久综

新聞資訊

    、前言

    首先說(shuō)一點(diǎn),企業(yè)中最常用的實(shí)際上既不是RocketMQ,也不是Kafka,而是RabbitMQ。

    RocketMQ很強(qiáng)大,但主要是阿里推廣自己的云產(chǎn)品而開(kāi)源出來(lái)的一款消息隊(duì)列,其實(shí)中小企業(yè)用RocketMQ的沒(méi)有想象中那么多。

    深層次的原因在于兔寶在中小企業(yè)普及更早,經(jīng)受的考驗(yàn)也更久,很容易產(chǎn)生「回頭客」,當(dāng)初隨RabbitMQ成長(zhǎng)的一批人才如今大部分都已成為企業(yè)中的中堅(jiān)骨干,技術(shù)選型親睞RabbitMQ的幾率就更高。

    至于Kafka,主要還是用在大數(shù)據(jù)和日志采集方面,除了一些公司有特定的需求會(huì)使用外,對(duì)消息收發(fā)準(zhǔn)確率要求較高的公司依然是以RabbitMQ作為企業(yè)級(jí)消息隊(duì)列的首選。

    工作這么多年我自身的感受是,RabbitMQ經(jīng)久不衰,除非后續(xù)其他消息中間件有與眾不同的使用體驗(yàn),否則依然是RabbitMQ的占有率更高。

    所以準(zhǔn)備進(jìn)入軟件行業(yè)的小伙伴,我建議有必要系統(tǒng)的先把RabbitMQ學(xué)好,然后再學(xué)習(xí)其他消息中間件擴(kuò)展視野,他們的原理大同小異,是可以觸類(lèi)旁通的。


    二、兩個(gè)概念

    RabbitMQ避免消息丟失的方法主要是利用消息確認(rèn)機(jī)制和手動(dòng)簽收機(jī)制,所以有必要把這兩個(gè)概念搞清楚。

    1、消息確認(rèn)機(jī)制

    主要是生產(chǎn)者使用的機(jī)制,用來(lái)確認(rèn)消息是否被成功消費(fèi)。

    配置如下:

    這樣,當(dāng)你實(shí)現(xiàn)RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback這兩個(gè)接口的方法后,就可以針對(duì)性地進(jìn)行消息確認(rèn)的日志記錄,之后做進(jìn)一步的消息發(fā)送補(bǔ)償,以達(dá)到接近100%投遞的目的。

    偽代碼如下:


    2、消息簽收機(jī)制

    RabbitMQ的消息是自動(dòng)簽收的,你可以理解為快遞簽收了,那么這個(gè)快遞的狀態(tài)就從發(fā)送變?yōu)橐押炇眨ㄒ坏膮^(qū)別是快遞公司會(huì)對(duì)物流軌跡有記錄,而MQ簽收后就從隊(duì)列中刪除了。

    企業(yè)級(jí)開(kāi)發(fā)中,RabbitMQ我們基本都開(kāi)啟手動(dòng)簽收方式,這樣可以有效避免消息的丟失。

    前文中已經(jīng)在生產(chǎn)者開(kāi)啟了手動(dòng)簽收機(jī)制,那么作為消費(fèi)方,也要設(shè)置手動(dòng)簽收。

    配置如下:

    消費(fèi)監(jiān)聽(tīng)時(shí),手動(dòng)簽收就一行代碼,偽代碼如下:


    三、消息丟失

    兩個(gè)概念搞清楚后,就可以來(lái)學(xué)習(xí)消息丟失的問(wèn)題和處理方案了。

    1、出現(xiàn)原因

    消息丟失的原因無(wú)非有三種:

    1)、消息發(fā)出后,中途網(wǎng)絡(luò)故障,服務(wù)器沒(méi)收到;

    2)、消息發(fā)出后,服務(wù)器收到了,還沒(méi)持久化,服務(wù)器宕機(jī);

    3)、消息發(fā)出后,服務(wù)器收到了,消費(fèi)方還未處理業(yè)務(wù)邏輯,服務(wù)卻掛掉了,而消息也自動(dòng)簽收,等于啥也沒(méi)干。

    這三種情況,(1) 和 (2)是由于生產(chǎn)方未開(kāi)啟消息確認(rèn)機(jī)制導(dǎo)致,(3)是由于消費(fèi)方未開(kāi)啟手動(dòng)簽收機(jī)制導(dǎo)致。

    2、解決方案

    1)、生產(chǎn)方發(fā)送消息時(shí),要try...catch,在catch中捕獲異常,并將MQ發(fā)送的關(guān)鍵內(nèi)容記錄到日志表中,日志表中要有消息發(fā)送狀態(tài),若發(fā)送失敗,由定時(shí)任務(wù)定期掃描重發(fā)并更新?tīng)顟B(tài);

    2)、生產(chǎn)方publisher必須要加入確認(rèn)回調(diào)機(jī)制,確認(rèn)成功發(fā)送并簽收的消息,如果進(jìn)入失敗回調(diào)方法,就修改數(shù)據(jù)庫(kù)消息的狀態(tài),等待定時(shí)任務(wù)重發(fā);

    3)、消費(fèi)方要開(kāi)啟手動(dòng)簽收ACK機(jī)制,消費(fèi)成功才將消息移除,失敗或因異常情況而尚未處理,就重新入隊(duì)。

    其實(shí)這就是前面闡述兩個(gè)概念時(shí)已經(jīng)講過(guò)的內(nèi)容,也是接近100%消息投遞的企業(yè)級(jí)方案之一,主要目的就是為了解決消息丟失的問(wèn)題。


    四、消息重復(fù)

    1、出現(xiàn)原因

    消息重復(fù)大體上有兩種情況會(huì)出現(xiàn):

    1)、消息消費(fèi)成功,事務(wù)已提交,簽收時(shí)結(jié)果服務(wù)器宕機(jī)或網(wǎng)絡(luò)原因?qū)е潞炇帐。顟B(tài)會(huì)由unack轉(zhuǎn)變?yōu)閞eady,重新發(fā)送給其他消費(fèi)方;

    2)、消息消費(fèi)失敗,由于retry重試機(jī)制,重新入隊(duì)又將消息發(fā)送出去。

    2、解決方案

    網(wǎng)上大體上能搜羅到的方法有三種:

    1)、消費(fèi)方業(yè)務(wù)接口做好冪等;

    2)、消息日志表保存MQ發(fā)送時(shí)的唯一消息ID,消費(fèi)方可以根據(jù)這個(gè)唯一ID進(jìn)行判斷避免消息重復(fù);

    3)、消費(fèi)方的Message對(duì)象有個(gè)getRedelivered()方法返回Boolean,為T(mén)RUE就表示重復(fù)發(fā)送過(guò)來(lái)的。

    我這里只推薦第一種,業(yè)務(wù)方法冪等這是最直接有效的方式,(2)還要和數(shù)據(jù)庫(kù)產(chǎn)生交互,(3)有可能導(dǎo)致第一次消費(fèi)失敗但第二次消費(fèi)成功的情況被砍掉。


    五、消息積壓

    1、出現(xiàn)原因

    消息積壓出現(xiàn)的場(chǎng)景一般有兩種:

    1)、消費(fèi)方的服務(wù)掛掉,導(dǎo)致一直無(wú)法消費(fèi)消息;

    2)、消費(fèi)方的服務(wù)節(jié)點(diǎn)太少,導(dǎo)致消費(fèi)能力不足,從而出現(xiàn)積壓,這種情況極可能就是生產(chǎn)方的流量過(guò)大導(dǎo)致。

    2、解決方案

    1)、既然消費(fèi)能力不足,那就擴(kuò)展更多消費(fèi)節(jié)點(diǎn),提升消費(fèi)能力;

    2)、建立專(zhuān)門(mén)的隊(duì)列消費(fèi)服務(wù),將消息批量取出并持久化,之后再慢慢消費(fèi)。

    (1)就是最直接的方式,也是消息積壓最常用的解決方案,但有些企業(yè)考慮到服務(wù)器成本壓力,會(huì)選擇第(2)種方案進(jìn)行迂回,先通過(guò)一個(gè)獨(dú)立服務(wù)把要消費(fèi)的消息存起來(lái),比如存到數(shù)據(jù)庫(kù),之后再慢慢處理這些消息即可。


    六、使用心得

    這里單獨(dú)講一下本人在工作中使用RabbitMQ的一些心得,希望能有所幫助。

    1)、消息丟失、消息重復(fù)、消息積壓三個(gè)問(wèn)題中,實(shí)際上主要解決的還是消息丟失,因?yàn)榇蟛糠止居霾坏较⒎e壓的場(chǎng)景,而稍微有水準(zhǔn)的公司核心業(yè)務(wù)都會(huì)解決冪等問(wèn)題,所以幾乎不存在消息重復(fù)的可能;

    2)、消息丟失的最常見(jiàn)企業(yè)級(jí)方案之一就是定時(shí)任務(wù)補(bǔ)償,因?yàn)椴徽撌荢OA還是微服務(wù)的架構(gòu),必然會(huì)有分布式任務(wù)調(diào)度的存在,自然也就成為MQ最直接的補(bǔ)償方式,如果MQ一定要實(shí)現(xiàn)100%投遞,這種是最普遍的方案。但我實(shí)際上不推薦中小企業(yè)使用該方案,因?yàn)閼{空增加維護(hù)成本,而且沒(méi)有一定規(guī)模的項(xiàng)目完全沒(méi)必要,大家都小看了RabbitMQ本身的性能,比如我們公司,支撐一個(gè)三甲醫(yī)院,也就是三臺(tái)8核16G服務(wù)器的集群,上線至今3年毫無(wú)壓力;

    3)、不要迷信網(wǎng)上和培訓(xùn)機(jī)構(gòu)講解的生產(chǎn)者消息確認(rèn)機(jī)制,也就是前面兩個(gè)概念中講到的ConfirmCallback和ReturnCallback,這種機(jī)制十分降低MQ性能,我們團(tuán)隊(duì)曾遇到過(guò)一次流量高峰期帶來(lái)的MQ傳輸及消費(fèi)性能大幅降低的情況,后來(lái)發(fā)現(xiàn)是消息確認(rèn)機(jī)制導(dǎo)致,關(guān)閉后立馬恢復(fù)正常,從此以后都不再使用這種機(jī)制,MQ運(yùn)行十分順暢。同時(shí)我們會(huì)建立后臺(tái)管理實(shí)現(xiàn)人工補(bǔ)償,通過(guò)識(shí)別業(yè)務(wù)狀態(tài)判斷消費(fèi)方是否處理了業(yè)務(wù)邏輯,畢竟這種情況都是少數(shù),性能和運(yùn)維成本,在這一塊我們選擇了性能;

    4)、我工作這些年使用RabbitMQ沒(méi)見(jiàn)過(guò)自動(dòng)簽收方式,一定是開(kāi)啟手動(dòng)簽收;

    5)、手動(dòng)簽收方式你在網(wǎng)上看到的教程幾乎都是處理完業(yè)務(wù)邏輯之后再手動(dòng)簽收,但實(shí)際上這種用法是不科學(xué)的,在分布式的架構(gòu)中,MQ用來(lái)解耦和轉(zhuǎn)發(fā)是非常常見(jiàn)的,如果是支付業(yè)務(wù),往往在回調(diào)通知中通過(guò)MQ轉(zhuǎn)發(fā)到其他服務(wù),其他服務(wù)如果業(yè)務(wù)處理不成功,那么手動(dòng)簽收也不執(zhí)行,這個(gè)消息又會(huì)入隊(duì)發(fā)給其他消費(fèi)者,這樣就可能在流量洪峰階段因?yàn)榕既坏臉I(yè)務(wù)處理失敗造成堵塞,甚至標(biāo)題所講的三種問(wèn)題同時(shí)出現(xiàn),這樣就會(huì)得不償失。

    不科學(xué)的用法:在處理完業(yè)務(wù)邏輯后再手動(dòng)簽收,否則不簽收,就好比客人進(jìn)店了你得買(mǎi)東西,否則不讓走。

    科學(xué)的用法:不論業(yè)務(wù)邏輯是否處理成功,最終都要將消息手動(dòng)簽收,MQ的使命不是保證客人進(jìn)店了必須消費(fèi),不消費(fèi)就不讓走,而是客人能進(jìn)來(lái)就行,哪怕是隨便看看也算任務(wù)完成。

    可能有人會(huì)問(wèn)你這樣不是和自動(dòng)簽收沒(méi)區(qū)別嗎,NO,你要知道如果自動(dòng)簽收,出現(xiàn)消息丟失你連記錄日志的可能都沒(méi)有。

    另外,為什么一定要這么做,因?yàn)镸Q是中間件,本身就是輔助工具,就是一個(gè)滴滴司機(jī),保證給你送到順便說(shuō)個(gè)再見(jiàn)就行,沒(méi)必要還下車(chē)給你搬東西。

    如果強(qiáng)加給MQ過(guò)多壓力,只會(huì)造成本身業(yè)務(wù)的畸形。我們使用MQ的目的就是解耦和轉(zhuǎn)發(fā),不再做多余的事情,保證MQ本身是流暢的、職責(zé)單一的即可。


    七、總結(jié)

    本篇主要講了RabbitMQ的三種常見(jiàn)問(wèn)題及解決方案,同時(shí)分享了一些作者本人工作中使用的心得,我想網(wǎng)上是很難找到的,如果哪一天用到了,不妨再打開(kāi)看看,也許能避免一些生產(chǎn)環(huán)境可能出現(xiàn)的問(wèn)題。

    我總結(jié)下來(lái)就是三點(diǎn):

    1)、消息100%投遞會(huì)增加運(yùn)維成本,中小企業(yè)視情況使用,非必要不使用;

    2)、消息確認(rèn)機(jī)制影響性能,非必要不使用;

    3)、消費(fèi)者先保證消息能簽收,業(yè)務(wù)處理失敗可以人工補(bǔ)償。

    工作中怕的永遠(yuǎn)不是一個(gè)技術(shù)不會(huì)使用,而是遇到問(wèn)題不知道有什么解決思路。


    END


    私信作者回復(fù)「資源」有更多驚喜內(nèi)容哦!

    消息隊(duì)列的優(yōu)缺點(diǎn)

    消息隊(duì)列的優(yōu)點(diǎn):

    • 解耦合:

    沒(méi)有引入消息隊(duì)列:系統(tǒng)A發(fā)送數(shù)據(jù)給系統(tǒng)B,C;當(dāng)有需求系統(tǒng)B不需要接受系統(tǒng)A的數(shù)據(jù)了,又要改系統(tǒng)A的代碼;或者系統(tǒng)D也新增需求,需要接收系統(tǒng)A的數(shù)據(jù)了,此時(shí)又要去改系統(tǒng)A的代碼,給系統(tǒng)D發(fā)送數(shù)據(jù)。頻繁地改代碼,系統(tǒng)間耦合度高。

    引入消息隊(duì)列:系統(tǒng)A直接發(fā)布數(shù)據(jù)到消息隊(duì)列中間件,需要數(shù)據(jù)的系統(tǒng)直接訂閱MQ即可,不需要數(shù)據(jù)的不訂閱。系統(tǒng)之間沒(méi)有任何耦合度。

    • 流量削峰:

    典型的就是秒殺系統(tǒng),在短時(shí)間內(nèi)大量的訪問(wèn)會(huì)增加系統(tǒng)的壓力,導(dǎo)致系統(tǒng)崩潰。引入MQ后可以在系統(tǒng)之前加入緩沖,減少系統(tǒng)壓力。


    • 異步:

    用戶注冊(cè)后需要發(fā)送短信和郵件,此時(shí)可以使用MQ的異步消息。

    應(yīng)用1


    應(yīng)用2

    市面上常見(jiàn)的消息隊(duì)列


    ActiveMQ

    RabbitMQ

    RocketMQ

    Kafka

    性能(單臺(tái))

    近萬(wàn)級(jí)

    萬(wàn)級(jí)

    十萬(wàn)級(jí)

    百萬(wàn)級(jí)

    消息持久化

    支持

    支持

    支持

    支持

    多語(yǔ)言支持

    支持

    支持

    很少

    支持

    社區(qū)活躍度

    支持協(xié)議

    多 (JMS,AMQP...)

    多( AMQP,STOMP,MQTT....)

    綜合評(píng)價(jià)

    優(yōu)點(diǎn): 成熟,已經(jīng)在很多公司得到應(yīng)用。較多的文檔。各種協(xié)議支持較好,有多個(gè)語(yǔ)言的成熟客戶端。缺點(diǎn):性能較弱。缺乏大規(guī)模吞吐的場(chǎng)景的應(yīng)用有江河日下之感

    優(yōu)點(diǎn):性能較好,管理界面較豐富,在互聯(lián)網(wǎng)公司也有較大規(guī)模的應(yīng)用,有多個(gè)語(yǔ)言的成熟客戶端。缺點(diǎn):內(nèi)部機(jī)制很難了解,也意味很難定制和掌控。集群不支持動(dòng)態(tài)擴(kuò)用。

    優(yōu)點(diǎn):模型簡(jiǎn)單接口易用。在阿里有大規(guī)模應(yīng)用分布式系統(tǒng),性能很好,版本更新很快。缺點(diǎn):文檔少支持的語(yǔ)言較少尚未主流

    優(yōu)點(diǎn):天生分布式,性能最好,所以常見(jiàn)用于大數(shù)據(jù)領(lǐng)域。缺點(diǎn):運(yùn)維難度大有數(shù)據(jù)混亂的情況,對(duì)ZooKeeeper強(qiáng)依賴。多副本機(jī)制下對(duì)帶寬有定的要求。

    RabbitMQ簡(jiǎn)介

    開(kāi)發(fā)語(yǔ)言:erlang

    采用的是AMQP協(xié)議

    默認(rèn)端口:15672(http) 5672(amqp)


    rabbitMQ的前臺(tái)控制頁(yè)面


    交換機(jī)頁(yè)面

    rabbitMQ的各個(gè)組成部分:

    組成部分

    • broker:rabbitmq服務(wù)本身,一個(gè)服務(wù)就是一個(gè)broker,集群模式下就是多個(gè)broker。接收客戶端連接實(shí)現(xiàn)AMQP實(shí)體服務(wù)。
    • connection:連接。應(yīng)用程序與broker之間的網(wǎng)絡(luò)連接TCP/IP三四握手/四次揮手。
    • channel:信道。幾乎所有的操作都在信道中進(jìn)行。channel是進(jìn)行讀寫(xiě)的通道,每一個(gè)channel代表一個(gè)會(huì)話,一個(gè)連接可以有多個(gè)信道。
    • message:應(yīng)用程序與服務(wù)之間傳送的數(shù)據(jù)。由properties和body組成,properties可以對(duì)消息進(jìn)行修飾,此、比如優(yōu)先級(jí)和延遲等高級(jí)特性,body就是實(shí)際內(nèi)容。
    • virtualhost:虛擬地址。類(lèi)似數(shù)據(jù)庫(kù)中的不同用戶。用于邏輯隔離,一個(gè)虛擬主機(jī)可以有若干個(gè)exchange和queue。同一個(gè)虛擬主機(jī)中不能有相同的exchange。
    • exchange:交換機(jī)。根據(jù)routingkey將消息路由到綁定的對(duì)列中,本身不具備儲(chǔ)存消息的能力(rabbitmq隊(duì)列一定綁定交換機(jī),如果沒(méi)綁定,綁定的就是默認(rèn)交換機(jī))。
    • bindings:exchange和queue之間的虛擬連接
    • routingkey:路由規(guī)則,交換機(jī)可以根據(jù)它確認(rèn)如何路由一個(gè)特定消息
    • queue:隊(duì)列。保存消息并轉(zhuǎn)發(fā)給消費(fèi)者。

    消費(fèi)流程:

    生產(chǎn)者綁定到交換機(jī)(簡(jiǎn)單工作隊(duì)列的交換機(jī)是默認(rèn)的交換機(jī))上,然后生產(chǎn)者通過(guò)channel信道發(fā)送消息是將消息發(fā)送到綁定到交換機(jī)上,交換機(jī)本身是不負(fù)責(zé)存儲(chǔ)消息的,它只是根據(jù)routingKey將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,由消費(fèi)者監(jiān)聽(tīng)消費(fèi)

    常見(jiàn)的隊(duì)列標(biāo)簽:

    AD:自動(dòng)刪除(消費(fèi)者與隊(duì)列斷開(kāi)連接后自動(dòng)刪除)

    D:隊(duì)列持久化

    I:內(nèi)部隊(duì)列

    TTL:過(guò)期隊(duì)列

    DLX:死信隊(duì)列

    DLK:死信路由key

    Lim:設(shè)置隊(duì)列的消息最大數(shù)量

    RabbitMQ主要的各個(gè)模式以及JAVA代碼演示

    rabbit的管理界面默認(rèn)是五秒刷新一次,可以更改


    • 簡(jiǎn)單模式:

    簡(jiǎn)單模式下也會(huì)有一個(gè)默認(rèn)交換機(jī),發(fā)送消息是交換機(jī)做的事

    官方文檔上的圖

    創(chuàng)建一個(gè)隊(duì)列

    創(chuàng)建一個(gè)隊(duì)列

    JAVA代碼演示:

    /**
     * simple模式生產(chǎn)者
     */
    public class Producer {
        private final static String HOST="10.211.55.4";
        private final static String USERNAME="wangpeng";
        private final static String PASSWORD="wangpeng";
        //注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672(http)而代碼中使用的是5672(amqp)
        private final static Integer PORT=5672;
        private final static String QUEUE_NAME="helloQueue";
        public static void main(String[] args) {
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost(HOST);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername(USERNAME);
            connectionFactory.setPassword(PASSWORD);
            connectionFactory.setVirtualHost("/");
            Connection connection=null;
            Channel channel=null;
            try {
                connection=connectionFactory.newConnection();
                //一個(gè)連接中可以包含多個(gè)通道。channel是邏輯通道;連接耗時(shí)耗費(fèi)資源
                channel=connection.createChannel();
                //durable:是否持久化,服務(wù)器重啟隊(duì)列是否還存在,設(shè)置為true,則隊(duì)列會(huì)持久化,但是里面的消息不會(huì)持久化,mq重啟,消息會(huì)丟失
                //exclusive:是否排他
                //autoDelete:隊(duì)列消費(fèi)完是否自動(dòng)刪除,即對(duì)列中沒(méi)有消息是否要?jiǎng)h除
                //arguments:其他參數(shù)
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message="helloSimpleQueue";
                //第三個(gè)是額外參數(shù)//arguments:其他參數(shù),設(shè)置為MessageProperties.PERSISTENT_TEXT_PLAIN則表示這條消息會(huì)持久化隊(duì)列中
                //沒(méi)有交換機(jī)就寫(xiě)空就可以
              	channel.basicPublish("", message, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println("消息發(fā)送成功");
            } catch (Exception e) {
                e.getMessage();
            } finally {
                //關(guān)閉連接
              	...
            }
        }
    }
    /**
     * simple模式消費(fèi)者
     */
    public class Consumer {
        //服務(wù)端沒(méi)有隊(duì)列,消費(fèi)會(huì)報(bào)錯(cuò)??!
        private final static String HOST="10.211.55.4";
        private final static String USERNAME="wangpeng";
        private final static String PASSWORD="wangpeng";
        //注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672而代碼中使用的是5672
        private final static Integer PORT=5672;
        private final static String QUEUE_NAME="helloQueue";
    
        public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost(HOST);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername(USERNAME);
            connectionFactory.setPassword(PASSWORD);
            connectionFactory.setVirtualHost("/");
            Connection connection=null;
            Channel channel=null;
            try {
                connection=connectionFactory.newConnection();
                channel=connection.createChannel();
                channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
                    @Override
                    //消息消費(fèi)成功回調(diào)
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("接收消息成功:" + new String(message.getBody()));
                    }
                }, new CancelCallback() {
                    @Override
                    //消息消費(fèi)失敗回調(diào)
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接收失敗:" + consumerTag);
                    }
                });
            } catch (Exception e) {
                e.getMessage();
            } finally {
                //關(guān)閉連接
              	...
            }
    
        }
    }
    • 工作模式:

    輪詢:每個(gè)隊(duì)列收到的消息數(shù)量一致(99/3),輪詢模式下,消息會(huì)被均分給每個(gè)消費(fèi)者,不論消費(fèi)者處理時(shí)間快慢

    公平分發(fā):根據(jù)隊(duì)列的能力分發(fā),誰(shuí)快給誰(shuí)

    官方文檔上的圖

    • 發(fā)布訂閱模式:

    這種模式下需要將隊(duì)列綁定到交換機(jī)上,但是無(wú)routingkey,我們可以自己在頁(yè)面上創(chuàng)建一個(gè)指定模式的交換機(jī)

    官方文檔上的圖

    創(chuàng)建交換機(jī):

    創(chuàng)建交換機(jī)

    將隊(duì)列綁定交換機(jī):

    將隊(duì)列綁定交換機(jī)

    JAVA代碼演示:

    /**
     * fanout模式生產(chǎn)者
     */
    public class Producer {
        private final static String HOST="10.211.55.7";
        private final static String USERNAME="wangpeng";
        private final static String PASSWORD="wangpeng";
        //注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672(http)而代碼中使用的是5672(amqp)
        private final static Integer PORT=5672;
        private final static String QUEUE_NAME="helloQueue";
        private final static String EXCHANGE_NAME="fanout-Test";
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost(HOST);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername(USERNAME);
            connectionFactory.setPassword(PASSWORD);
            connectionFactory.setVirtualHost("/");
            Connection connection=null;
            Channel channel=null;
            try {
                connection=connectionFactory.newConnection("helloProducerConnection");
                //一個(gè)連接中可以包含多個(gè)通道。channel是邏輯通道;連接耗時(shí)耗費(fèi)資源
                channel=connection.createChannel();
                //durable:是否持久化,服務(wù)器重啟是否還存在
                //exclusive:是否排他
                //autoDelete:消費(fèi)完是否自動(dòng)刪除
                //arguments:其他參數(shù)
                //可以聲明多個(gè)隊(duì)列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                channel.queueDeclare(QUEUE_NAME + "1", false, false, false, null);
                channel.queueDeclare(QUEUE_NAME + "2", false, false, false, null);
                //聲明交換機(jī)fanout模式,routingKey必須為空字符串而不是null
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                //將隊(duì)列綁定到交換機(jī)上
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
                channel.queueBind(QUEUE_NAME + "1", EXCHANGE_NAME, "");
                channel.queueBind(QUEUE_NAME + "2", EXCHANGE_NAME, "");
                String message="777";
                //發(fā)布消息
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
                System.out.println("消息發(fā)送成功");
            } catch (Exception e) {
                e.getStackTrace();
            } finally {
                
            }
        }
    }
    /**
     * fanout模式消費(fèi)者
     */
    public class Consumer {
        //服務(wù)端沒(méi)有隊(duì)列,消費(fèi)會(huì)報(bào)錯(cuò)!!
        private final static String HOST="10.211.55.7";
        private final static String USERNAME="wangpeng";
        private final static String PASSWORD="wangpeng";
        //注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672而代碼中使用的是5672
        private final static Integer PORT=5672;
        private final static String QUEUE_NAME="helloQueue";
        private final static String EXCHANGE_NAME="fanout-Test";
    
        public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
    
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost(HOST);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername(USERNAME);
            connectionFactory.setPassword(PASSWORD);
            connectionFactory.setVirtualHost("/");
            Connection connection=null;
            Channel channel=null;
            try {
                connection=connectionFactory.newConnection("helloProducerConnection");
                channel=connection.createChannel();
                //將信道與交換機(jī)綁定
                channel.exchangeBind(EXCHANGE_NAME, EXCHANGE_NAME, "");
                //指定隊(duì)列接收消息
                channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("接收消息成功:" + new String(message.getBody()));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接收失?。?#34; + consumerTag);
                    }
                });
                channel.basicConsume(QUEUE_NAME + 1, true, new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("接收消息成功:" + new String(message.getBody()));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接收失?。?#34; + consumerTag);
                    }
                });
                channel.basicConsume(QUEUE_NAME + 2, true, new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("接收消息成功:" + new String(message.getBody()));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接收失?。?#34; + consumerTag);
                    }
                });
                System.in.read();
            } catch (Exception e) {
                e.getMessage();
            } finally {
                
            }
    
        }
    }
    • 路由模式:

    既要綁定到交換機(jī),又要設(shè)置routingkey,會(huì)根據(jù)routingkey分發(fā)到不同的隊(duì)列中

    官方文檔上的圖

    • 主題模式:

    綁定交換機(jī),與路由模式不同的是,routingkey支持模糊匹配(特殊字符 * 與 # ,用于做模糊匹配,其中 * 用于匹配一個(gè)單詞, #用于匹配多個(gè)單詞(可以是零個(gè))

    官方文檔上的圖

    JAVA代碼演示:

    /**
     * topic模式生產(chǎn)者
     */
    public class Producer {
        private final static String HOST="10.211.55.7";
        private final static String USERNAME="wangpeng";
        private final static String PASSWORD="wangpeng";
        //注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672(http)而代碼中使用的是5672(amqp)
        private final static Integer PORT=5672;
        private final static String QUEUE_NAME="helloQueue";
        private final static String EXCHANGE_NAME="topic-Test";
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost(HOST);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername(USERNAME);
            connectionFactory.setPassword(PASSWORD);
            connectionFactory.setVirtualHost("/");
            Connection connection=null;
            Channel channel=null;
            try {
                connection=connectionFactory.newConnection("helloProducerConnection");
                //一個(gè)連接中可以包含多個(gè)通道。channel是邏輯通道;連接耗時(shí)耗費(fèi)資源
                channel=connection.createChannel();
                //durable:是否持久化,服務(wù)器重啟是否還存在
                //exclusive:是否排他
                //autoDelete:消費(fèi)完是否自動(dòng)刪除
                //arguments:其他參數(shù)
                //可以聲明多個(gè)隊(duì)列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                channel.queueDeclare(QUEUE_NAME + "1", false, false, false, null);
                channel.queueDeclare(QUEUE_NAME + "2", false, false, false, null);
                //聲明交換機(jī)direct模式
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.*");
                channel.queueBind(QUEUE_NAME + "1", EXCHANGE_NAME, "topic.#");
                channel.queueBind(QUEUE_NAME + "2", EXCHANGE_NAME, "*.111");
                String message="topic is good";
                //只給routingKey=directs的發(fā)布消息
                channel.basicPublish(EXCHANGE_NAME, "topic.111.222", null, message.getBytes());
                System.out.println("消息發(fā)送成功");
            } catch (Exception e) {
                e.getStackTrace();
            } finally {
                
            }
        }
    }
    /**
     * topic模式消費(fèi)者
     */
    public class Consumer {
        //服務(wù)端沒(méi)有隊(duì)列,消費(fèi)會(huì)報(bào)錯(cuò)??!
    
        private final static String HOST="10.211.55.7";
        private final static String USERNAME="wangpeng";
        private final static String PASSWORD="wangpeng";
        //注意:使用docker啟動(dòng)mq時(shí),需要開(kāi)放15672和5672兩個(gè)端口,頁(yè)面上使用的是15672而代碼中使用的是5672
        private final static Integer PORT=5672;
        private final static String QUEUE_NAME="helloQueue";
        private final static String EXCHANGE_NAME="direct-Test";
    
        public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
    
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost(HOST);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername(USERNAME);
            connectionFactory.setPassword(PASSWORD);
            connectionFactory.setVirtualHost("/");
            Connection connection=null;
            Channel channel=null;
            try {
                connection=connectionFactory.newConnection("helloProducerConnection");
                channel=connection.createChannel();
                //交換機(jī)發(fā)的消息的routingKey是啥這里就寫(xiě)啥,這里寫(xiě)不寫(xiě)都無(wú)所謂
                //channel.exchangeBind(EXCHANGE_NAME,EXCHANGE_NAME,"topic.111.222");
                channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("接收消息成功:" + new String(message.getBody()));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接收失?。?#34; + consumerTag);
                    }
                });
                channel.basicConsume(QUEUE_NAME + 1, true, new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("接收消息成功:" + new String(message.getBody()));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接收失?。?#34; + consumerTag);
                    }
                });
                channel.basicConsume(QUEUE_NAME + 2, true, new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("接收消息成功:" + new String(message.getBody()));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("消息接收失?。?#34; + consumerTag);
                    }
                });
            } catch (Exception e) {
                e.getMessage();
            } finally {
                
            }
    
        }
    }

    SpringBoot集成RabbitMQ

    <!--        spring整合rabbitmq依賴-->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>
    //定義一個(gè)配置類(lèi)用于讀取yml中的自定義配置
    @ConfigurationProperties(prefix="spring.rabbitmq")
    public class ConnectionFactoryConfig {
        private String username;
        private String password;
        private String host;
        private Integer port;
        private String virtualHost;
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username=username;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password=password;
        }
    
        public String getHost() {
            return host;
        }
    
        public void setHost(String host) {
            this.host=host;
        }
    
        public Integer getPort() {
            return port;
        }
    
        public void setPort(Integer port) {
            this.port=port;
        }
    
        public String getVirtualHost() {
            return virtualHost;
        }
    
        public void setVirtualHost(String virtualHost) {
            this.virtualHost=virtualHost;
        }
    
        @Override
        public String toString() {
            return "ConnectionFactoryConfig{" +
                    "username='" + username + '\'' +
                    ", password='" + password + '\'' +
                    ", host='" + host + '\'' +
                    ", port=" + port +
                    ", virtualHost='" + virtualHost + '\'' +
                    '}';
        }
    }
    //配置mq的連接信息
    @Configuration
    //這里可以使用此注解將@ConfigurationProperties注解加入IOC容器,也可以直接在@ConfigurationProperties注解修飾的類(lèi)上,加@Component注解加入spring的IOC容器
    @EnableConfigurationProperties({ConnectionFactoryConfig.class})
    public class RabbitMqConfig {
        @Autowired
        public ConnectionFactoryConfig config;
        @Autowired
        public RabbitTemplate rabbitTemplate;
        
        //自己定義了一個(gè)連接工廠學(xué)習(xí),日常中我們可以不定義,直接在yml中設(shè)置值就行——>spring.rabbitmq.username:
        @Bean("connectionFactory")
        public CachingConnectionFactory createConnectionFactory() {
            CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
            cachingConnectionFactory.setHost(config.getHost());
            cachingConnectionFactory.setPort(config.getPort());
            cachingConnectionFactory.setUsername(config.getUsername());
            cachingConnectionFactory.setPassword(config.getPassword());
            cachingConnectionFactory.setVirtualHost(config.getVirtualHost());
            //消息到達(dá)交換機(jī)是否確認(rèn)
            //或者在配置文件中配置publisher-confirms 為true
            cachingConnectionFactory.setPublisherConfirms(true);
            //消息到達(dá)隊(duì)列是否確認(rèn)
            //或者在配置文件中配置publisher-returns 為true
            cachingConnectionFactory.setPublisherReturns(true);
            return cachingConnectionFactory;
        }
      	//封裝了對(duì)mq的管理操作
      	@Bean
        public RabbitAdmin rabbitAdmin() {
            return new RabbitAdmin(createConnectionFactory());
        
        @PostConstruct
        public void init() {
            //設(shè)置消息未到達(dá)隊(duì)列是否回調(diào)setReturnCallback接口
            //使用return-callback時(shí)必須設(shè)置mandatory為true,或者在配置中設(shè)置mandatory-expression的值為true
            rabbitTemplate.setMandatory(true);
            //消息到達(dá)交換機(jī)是否確認(rèn)
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("消息是否到達(dá)交換機(jī):" + ack);
                }
            });
            //如果exchange到queue成功,則不回調(diào)return;如果exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("消息是否沒(méi)有到達(dá)隊(duì)列..." + JSON.toJSONString(message));
                }
            });
        }
    }
    //創(chuàng)建相關(guān)隊(duì)列和交換機(jī)并綁定隊(duì)列
    //聲明消費(fèi)者監(jiān)聽(tīng)相關(guān)隊(duì)列
    public class RabbitMqTopicByRabbitAdminConfig {
        //該類(lèi)封裝了對(duì)mq的管理操作
        private RabbitAdmin rabbitAdmin;
        @Autowired
        public void setRabbitAdmin(RabbitAdmin rabbitAdmin) {
            this.rabbitAdmin=rabbitAdmin;
        }
        @Autowired
        private CachingConnectionFactory connectionFactory;
        @PostConstruct
        public void init() {
            rabbitAdmin.declareQueue(new Queue("rabbitAdminQueue", false, false, false));
            rabbitAdmin.declareQueue(new Queue("rabbitAdminQueueAnnotation", false, false, false));
            rabbitAdmin.declareExchange(new TopicExchange("rabbitAdminExchange", false, false));
            rabbitAdmin.declareBinding(new Binding("rabbitAdminQueue", Binding.DestinationType.QUEUE, "rabbitAdminExchange", "*.queue.#", null));
            rabbitAdmin.declareBinding(new Binding("rabbitAdminQueueAnnotation", Binding.DestinationType.QUEUE, "rabbitAdminExchange", "*.queueAnnotation.#", null));
        }
    		//使用代碼配置的方式綁定消費(fèi)者,也可以使用@RabbitListener(queues={"rabbitAdminQueue"})
        @Bean
        public SimpleMessageListenerContainer adminMessageContainer() {
            SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(connectionFactory);
            container.setQueueNames("rabbitAdminQueue");
            container.setMessageListener(SpringUtil.getBean("topicExchangeAdminListener", TopicExchangeAdminListener.class));
            //設(shè)置手動(dòng)ack,消費(fèi)者收到消息是否回執(zhí)
            //也可以設(shè)置在配置文件中:spring.rabbitmq.listener.simple.acknowledge-mode=manual
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            //設(shè)置ack手動(dòng)回執(zhí)之后,保證消費(fèi)者每次只拿到一條消息
            container.setPrefetchCount(1);
            return container;
        }
    }
    
    /**
     * @Author:wangpeng
     * @Description:監(jiān)聽(tīng)相關(guān)隊(duì)列的消費(fèi)者,設(shè)置手動(dòng)回執(zhí)ack后,如何給隊(duì)列回執(zhí)ack
     **/
    @Component("topicExchangeAdminListener")
    //當(dāng)使用ChannelAwareMessageListener時(shí),代表需要手動(dòng)設(shè)置ack回執(zhí),在SimpleMessageListenerContainer中設(shè)置了手動(dòng)回執(zhí)
    public class TopicExchangeAdminListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            //表明消費(fèi)者確認(rèn)收到當(dāng)前消息,第二個(gè)參數(shù)表示一次是否 ack 多條消息
            String s=new String(message.getBody());
            if (s.equals("666")) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (s.equals("777")) {
                //第二個(gè)參數(shù)表示一次是否拒絕多條消息 第三個(gè)參數(shù)表示是否把當(dāng)前消息從新入隊(duì)(false的話,會(huì)直接丟掉)
                //同一個(gè)channel deliveryTag不一致
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } else if (s.equals("888")) {
                //表明消費(fèi)者拒絕當(dāng)前消息,第二個(gè)參數(shù)表示是否把當(dāng)前消息從新入隊(duì)
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                throw new RuntimeException("不消費(fèi)??!");
            }
        }
    }

    小結(jié)

    以上是關(guān)于rabbitMQ的一些常規(guī)基礎(chǔ)操作,后面還會(huì)有一些高級(jí)操作,像死信隊(duì)列、延遲隊(duì)列、如何保證順序消費(fèi)等

網(wǎng)站首頁(yè)   |    關(guān)于我們   |    公司新聞   |    產(chǎn)品方案   |    用戶案例   |    售后服務(wù)   |    合作伙伴   |    人才招聘   |   

友情鏈接: 餐飲加盟

地址:北京市海淀區(qū)    電話:010-     郵箱:@126.com

備案號(hào):冀ICP備2024067069號(hào)-3 北京科技有限公司版權(quán)所有