Redisson 延时队列 原理 详解

花了一天研究了下Redisson 的延时队列,RBlockingQueue ,RDelayedQueue 。 网上没一个说清楚的,而且都是说 轮询redis的zset,都是错误的 ! 让我来纠正,如果我有错的也可指出。

Demo用法

public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
	Config config = new Config();
	config.useSingleServer().setAddress("redis://172.29.2.10:7000");
	RedissonClient redisson = Redisson.create(config);
	RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1");
	RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
	new Thread() {
		public void run() {
			while(true) {
				try {
                                        //阻塞队列有数据就返回,否则wait
					System.err.println( blockingQueue.take());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};
	}.start();
	
	for(int i=1;i<=5;i++) {
                // 向阻塞队列放入数据
		delayedQueue.offer("fffffffff"+i, 13, TimeUnit.SECONDS);
	}
}

上面构造了Redisson 阻塞延时队列,然后向里面塞了5条数据,都是13秒后到期。 我们先不启动程序 ,先打开redis执行:

[root@localhost redis-cluster]# redis-cli -c -p 7000 -h 172.29.2.10 --raw
172.29.2.10:7000> monitor
OK

monitor 命令可以监控redis执行了哪些命令, 注意线上不要乱搞,耗性能的 。然后我们启动程序,观察redis执行命令情况,这里分为三个阶段:

第一介段: 客户端程序启动,offer方法执行之前 ,redis服务会收到如下redis命令:

1610452446.652126 [0 172.29.2.194:65025] "SUBSCRIBE" "redisson_delay_queue_channel:{dest_queue1}"
1610452446.672009 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452442403" "limit" "0" "2"
1610452446.672018 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452446.673896 [0 172.29.2.194:65034] "BLPOP" "dest_queue1" "0" 

SUBSCRIBE

这里订阅了一个固定的队列 redisson_delay_queue_channel:{dest_queue1}, 就是为了开启进程里面的延时任务,很重要,redisson延时取数据都靠它了。后面会说。

zrangebyscore

zrangebyscore用法扫盲
>> zrangebyscore key min max [WITHSCORES] [LIMIT offset count]
分页获取指定区间内(min - max)带有分数值(可选)的有序集成员的列表

redisson_delay_queue_timeout:{dest_queue1} 是一个zset,当有延时数据存入Redisson队列时,就会在此队列中插入 数据,排序分数为延时的时间戳。

zrangebyscore就是取出前2条(源码是100条,如下图)过了当前时间的数据。如果取的是0的话就执行下面的zrange, 这里程序刚启动肯定是0(除非是之前的队列数据没有取完)。之所以在刚启动时 这样取数据就是为了把上次进程宕机后没发完的数据发完。

zrange

取出第一个数,也就是判断上面的还有不有下一页。

BLPOP

移出并获取 dest_queue1 列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止 , 这里显然没有元素 ,就会一直阻塞。


第二介段: 执行offer向Redisson队列设置值

这个阶段我们发现redis干了下面事情:

1610452446.684465 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455407" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452446.684480 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452446.684492 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.684498 [0 lua] "publish" "redisson_delay_queue_channel:{dest_queue1}" "1610452455407"
1610452446.687922 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455422" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452446.687943 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452446.687958 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.690478 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455424" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452446.690492 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452446.690502 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.692661 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455427" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4"
1610452446.692674 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4"
1610452446.692683 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.696054 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455429" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452446.696081 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452446.696098 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"

我们客户端是设置了5条数据。上面也可以看出来。

zadd

往我们zset里面设置 数据截止的时间戳(当前执行的时间戳+延时的时间毫秒值),内容为我们的ffffff1 ,不过特殊编码了,加了点什么,不用管。

rpush

同步一份数据到list队列,这里也不知道干嘛的,先放到这里。

zrange+publish

取出排序好的第一个数据,也就是最临近要触发的数据,然后发送通知 (之前订阅了的客户端,可能是微服务就有多个客户端),内容为将要触发的时间。客户端收到通知后,就在自己进程里面开启延时任务( HashedWheelTimer ),到时间后就可以从redis取数据发送。

后面又是我们的5条循环的设置数据 zadd...


第三介段:到延时时间取redis数据

1610452459.680953 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455416" "limit" "0" "2"
1610452459.680967 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff1"
1610452459.680976 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452459.680984 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452459.680991 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES" // 判断是否有值
1610452459.745813 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455480" "limit" "0" "2"
1610452459.745829 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff2"
1610452459.745837 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452459.745845 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff3"
1610452459.745848 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452459.745855 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452459.745864 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452459.756909 [0 172.29.2.194:65026] "BLPOP" "dest_queue1" "0"
1610452459.758092 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455493" "limit" "0" "2"
1610452459.758108 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff4"
1610452459.758114 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4"
1610452459.758121 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff5"
1610452459.758124 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452459.758133 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452459.758143 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452459.759030 [0 172.29.2.194:65037] "BLPOP" "dest_queue1" "0"
1610452459.760933 [0 172.29.2.194:65036] "BLPOP" "dest_queue1" "0"
1610452459.763913 [0 172.29.2.194:65038] "BLPOP" "dest_queue1" "0"
1610452459.765999 [0 172.29.2.194:65039] "BLPOP" "dest_queue1" "0"

这个阶段是由客户端进程里面的延时任务执行的,延时任务是在第二阶段构造的,已经说了(通过redis的订阅/发布实现)。

zrangebyscore

取出前2条到时间的数据,第一阶段已说。

rpush

将上面取到的数据push到阻塞队列,注意我们第一阶段已经监听了这个阻塞队列

"BLPOP" "dest_queue1" "0" 

所以这里就会通知客户端取数据。

lrem + zrem

将取完的数据删掉。

zrange

取zset第一个数据,有的话继续上面逻辑取数据,否则进入下面。

BLPOP

继续监听这个阻塞队列。以便下次用。

小结一下

  • 客户端启动,redisson先订阅一个key,同时 BLPOP key 0 无限监听一个阻塞队列(等里面有数据了就返回)。
  • 当有数据put时,redisson先把数据放到一个zset集合(按延时到期时间的时间戳为分数排序),同时发布上面订阅的key,发布内容为数据到期的timeout,此时客户端进程开启一个延时任务,延时时间为发布的timeout。
  • 客户端进程的延时任务到了时间执行,从zset分页取出过了当前时间的数据,然后将数据rpush到第一步的阻塞队列里。然后将当前数据从zset移除,取完之后,又执行 BLPOP key 0 无限监听一个阻塞队列。
  • 上一步客户端监听的阻塞队列返回取到数据,回调到 RBlockingQueue 的 take方法。于是,我们就收到了数据。

大致原理就是这样,redisson不是通过轮询zset的,将延时任务执行放到进程里面实现,只有到时间才会取redis zset。

redisson里面还有很多异常,重试机制 没讲。毕竟时间就一天,没法全部吃透。有了这些原理,我相信你也能实现一个属于自己的redisson延时队列了。


redisson延时队列优化

由于我在线上使用了redisson延时队列,在数据量小的时候表现很佳也很稳定,但我们瞬时流量特别大, 发生了到了延时时间了还给我延时十几分钟的情况 ,这个是我万万没想到的。

这个是我测试的情况

当我设置了14511条数据到redisson延时队列时,取出来的时间在本身的延时时间上还延时了198636多毫秒,而且时间随着数据增加而增加。 我们线上是微信自动发消息业务,这样会导致你跟你女朋友推晚安消息,结果她第二天早上收到,然后她就认为你勾搭上了美国的妞,和你分手,并暗暗自喜到老娘早就有外遇了,就盼着这天了。


为了你的性福,于是我想到了一个优化的法子,构建cluster模式。需要自己定制开发。

下面是我优化后的测试结果:

10万多条数据,真实延时时间最大33399毫秒,已经表现很好了,毕竟我开发环境redis特别垃圾。

由于开发不易,可以在这个上面下载我定制后的源码,不用担心代码性能,这个是我们线上已经用了一年多了。 你也可以压测下。

https://pic1.zhimg.com/v2-2a853b787f9b76e3651d67b7a4481630_180x120.jpg



兄弟(妹子)不要以为就这样江湖再见了,我还要烦你一点时间。

上面介绍了进程里面的延时任务都是一笔带过,下面来讲讲下它的原理。redisson使用的是netty里面的延时任务 io . netty . util .HashedWheelTimer

eclipse-javadoc:☂=redisson/F:\/mvn-res\/res\/io\/netty\/netty-common\/4.1.56.Final\/netty-common-4.1.56.Final.jar=/maven.pomderived=/true=/=/maven.pomderived=/true=/=/maven.groupId=/io.netty=/=/maven.artifactId=/netty-common=/=/maven.version=/4.1.56.Final=/=/maven.scope=/compile=/

eclipse-javadoc:☂=redisson/F:\/mvn-res\/res\/io\/netty\/netty-common\/4.1.56.Final\/netty-common-4.1.56.Final.jar=/maven.pomderived=/true=/=/maven.pomderived=/true=/=/maven.groupId=/io.netty=/=/maven.artifactId=/netty-common=/=/maven.version=/4.1.56.Final=/=/maven.scope=/compile=/

eclipse-javadoc:☂=redisson/F:\/mvn-res\/res\/io\/netty\/netty-common\/4.1.56.Final\/netty-common-4.1.56.Final.jar=/maven.pomderived=/true=/=/maven.pomderived=/true=/=/maven.groupId=/io.netty=/=/maven.artifactId=/netty-common=/=/maven.version=/4.1.56.Final=/=/maven.scope=/compile=/


HashedWheelTimer 实现原理

HashedWheelTimer本质是一种类似延迟任务队列的实现,适用于对时效性不高的,可快速执行的,大量这样的“小”任务,能够做到高性能,低消耗

redisson是在这里用的 org . redisson . connection .MasterSlaveConnectionManager

eclipse-javadoc:☂=redisson/src\/main\/java=/optional=/true=/=/maven.pomderived=/true=/

eclipse-javadoc:☂=redisson/src\/main\/java=/optional=/true=/=/maven.pomderived=/true=/

eclipse-javadoc:☂=redisson/src\/main\/java=/optional=/true=/=/maven.pomderived=/true=/

   // 初始化  timer 
   protected void initTimer(MasterSlaveServersConfig config) {
        int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};
        Arrays.sort(timeouts);
        int minTimeout = timeouts[0];
        if (minTimeout % 100 != 0) {
            minTimeout = (minTimeout % 100) / 2;
        } else if (minTimeout == 100) {
            minTimeout = 50;
        } else {
            minTimeout = 100;
        }
        // minTimeout 为100
        timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);
        
        connectionWatcher = new IdleConnectionWatcher(this, config);
        subscribeService = new PublishSubscribeService(this, config);
    }


    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        try {
        	System.err.println(time + " HadLuo ======================newTimeout==================" + task + " , "+ delay+"ms");
            return timer.newTimeout(task, delay, unit);
        } catch (IllegalStateException e) {
            if (isShuttingDown()) {
                return DUMMY_TIMEOUT;
            }
            
            throw e;
        }
    }

抽象出来就是这样

HashedWheelTimer timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), 100,
		TimeUnit.MILLISECONDS, 1024, false);
// 构建一个延时任务
timer.newTimeout((time) -> {
	System.err.println("到了12s后了,该娶媳妇了~");
}, 12, TimeUnit.SECONDS);

算了,下篇文章讲下原理吧,篇幅还有点长,请移致下面文章。

罗政:HashedWheelTimer 源码解析


更多redis文章

Redis

强烈推荐一个 进阶 JAVA架构师 的博客

https://pic2.zhimg.com/v2-1e8deea0c94dab83067a8eca4007734d_ipico.jpg

支付宝打赏 微信打赏

如果文章对您有帮助,您可以鼓励一下作者