蜜桃爱爱视频_无码天堂亚洲国产av_亚洲精品久久久久久久蜜桃_翘臀后进少妇大白嫩屁股91大神_日本一级片在线观看_亚洲欧洲另类_自拍天堂_中国黄色影院_国精产品999国精产_免费日韩在线视频_爱操成人网_日日躁夜夜躁狠狠躁aⅴ蜜_亚洲精品久久66国产高清_一个色综合久久_国产午夜性春猛交ⅹxxx_99操视频
        • 鄭州
        您的位置: 法制網 > 綜合 > > 詳情

        RabbitMq TTL+死信隊列 延遲消息問題記錄

        來源: 騰訊云 2023-02-22 22:59:11

        延遲隊列存儲的對象是對應的延遲消息,所謂的延遲消息是指當消息被發送以后,并不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費

        利用RabbitMqTTL和死信隊列 來實現延時消費。


        (資料圖片僅供參考)

        如果設置的是隊列統一過期時間放到死信隊列,沒有什么問題。

        如果是延時時間設置到每條消息上的。而不是給隊列的。

        實現方式為消息存活時間為動態用戶頁面可配置的。

        這就導致了一個問題:

        先用一條消息的存活時間是1天。后面又進了一條消息存活時間是1小時。

        結果一小時到了,發現這條消息并沒有被轉發到消費延時過期消息的隊列。

        原因是盡管ttl是設給每條消息的。但是本質上,所有延時消息都還在一個隊列里,對它過期時間的檢測也是從頭部開始的。

        它不會檢測每一條消息是否過期。而是順序檢測。

        如果first in的消息過期時間很長,會導致它阻塞后進的消息。

        不僅無法實現真正的過期時間。還會導致,一個大的過期時間的先進的消息,會堆積一堆后進的過期時間短的消息。

        問題解決

        這個時候可以使用rabbitMq的一個插件:rabbitmq_delayed_message_exchange

        一段時間以來,人們一直在尋找用RabbitMQ實現延遲消息的傳遞方法,到目前為止,公認的解決方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此來實現的,RabbitMQ延遲消息插件新增了一種新的交換器類型,消息通過這種交換器路由就可以實現延遲發送

        插件安裝

        需要根據自己的rabbitMq選擇對應的版本。我rabbitMq的版本是RabbitMQ 3.11.0,對應的插件版本就是:3.11.1

        基于Linux

        --1、cd到rabbitmq默認安裝位置cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins--2、通過ftp工具將插件上傳到此目錄下--3、開啟插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--4、重啟MQ服務systemctl restart rabbitmq-server

        基于Docker

        --1、通過ftp工具將插件上傳到Linux服務器的根目錄下--2、拷貝到docker中rabbitmq插件目錄下,rabbitmq_delayed_message_exchange-3.9.0.ez(下載包的全名)docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins--3、進入容器docker exec -it 容器id /bin/bash--4、查看插件是否存在(確保2中的操作已經將插件拷貝過來了)cd pluginsls |grep delay--5、開啟插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--6、退出容器exit--7、重啟MQ服務docker restart 容器ID

        安裝成功

        web界面新建交換機選擇類型出現紅框標注即表示成功

        image.png

        代碼實現

        1:springBoot配置

        @Configurationpublic class DelayRabbitmqConfig {     /**     * 聲明延遲隊列     * @return     */    @Bean    public Queue delayQueue(){        return new Queue(QueueConstant.DelayQueue,                true,false,false);    }     /**     * 聲明延遲自定義交換機類型     * @return     */    @Bean    public CustomExchange delayCustomExchange(){        HashMap args = new HashMap<>();//        設置 x-delayed-type 為 direct,當然也可以是 topic 等 發送消息時設置消息頭 headers 的 x-delay 屬性,即延遲時間,如果不設置消息將會立即投遞        args.put("x-delayed-type","direct");        return new CustomExchange(ExchangeConstant.DelayCustomerExchange,                "x-delayed-message",true,false,args);    }     /**     * 綁定延遲交換機和隊列     * @return     */    @Bean    public Binding delayQueueAndCustomExchange(){        return BindingBuilder.bind(delayQueue())                .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs();    }}

        springMvc配置

        引入依賴:    xmlns:util="http://www.springframework.org/schema/util"    http://www.springframework.org/schema/util    http://www.springframework.org/schema/util/spring-util-4.0.xsd                                                                                                                

        代碼實現

        //消息發送final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));DisTimingPushDto disTimingPushDto = new DisTimingPushDto();disTimingPushDto.setOrderId(dispense.getOrderId());disTimingPushDto.setPushTime(disDispense.getPushTime());rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);//每條消息時間配置import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * 延遲消息處理器 Processor * @author king * @date 2022年12月28日 11:14 */public class MyMessagePostProcessor implements MessagePostProcessor {    /**     * 消息延遲時間,單位:毫秒     */    private final Integer TTL;    public MyMessagePostProcessor(final Integer ttl) {        this.TTL = ttl;    }    @Override    public Message postProcessMessage(Message message) throws AmqpException {        message.getMessageProperties().setDelay(TTL);        return message;    }}
        標簽: RabbitMQ
        溫馨提示:

        在實際法律問題情景中,個案情況都有所差異,為了高效解決您的問題,保障合法權益,建議您直接向專業律師說明情況,解決您的實際問題。 立即在線咨詢 >

        上一篇
        下一篇
        相關知識推薦
        操作
        分享
        15037178970

        公眾服務

        法制網公眾號

        快速找律師 / 免費咨詢

        查法律知識 / 查看解答 / 隨時追問

        律師服務(工作日8:30-18:00 ,非工作日請QQ留言)

        律師加盟

        律師營銷服務

        在線客服:

        加盟熱線:

        律師營銷診斷

        營銷分析 / 回復咨詢

        案件接洽 / 合作加盟

        法制法律網,中國知名的 法律咨詢網站,能夠為廣大用戶提供在線 免費法律咨詢服務。
        CopyRight@2003-2022 fazhi.net ALL Rights Reservrd 版權所有
        豫ICP備2022016495號-26
        違法和不良信息舉報電話:

        主站蜘蛛池模板: 奇米色婷婷| 免费在线观看视频免费在线| 日本乱码乱码免费高清视频| av黄色免费网站| 日韩三级在线免费| 一级黄色片aaa| 亚洲色在线v中文字幕| 亚洲国产成人久久一区www| 国产精品人成在线播放| 亚州视频在线| 久草在线费播放视频| 亚洲色播永久网址大全| 少妇无码av无码一区| 国产性在线| 亚洲国产欧美日韩在线观看第一区| 色999日韩| 亚洲欧美日韩另类丝袜一区| 无码人妻丰满熟妇区96| 国产中的精品av涩差av| 亚洲精品综合网| 四虎视频国产精品免费| 亚洲国产精品无码aaa片| 久久久久久毛片免费播放| 国产综合在线播放| 亚洲女优一区| 日本一级理论片在线大全| 国产在线精品99一区不卡| 欧美乱妇高清无乱码免费| 国产欧美一区二区不卡| 亚洲 欧美 制服 另类 日韩 | 毛片视频在线免费观看| 91亚洲免费视频| 人人澡人| 最新精品露脸国产在线| 国产又色又爽无遮挡免费| 久久久成人综合亚洲欧洲精品| 欧美999| 丁香五月激情综合色婷婷| 国产精品国产三级国产专区50| 老色鬼永久精品网站| 欧美高清不卡|