Springboot&redisson implements delay queue

Redisson implements delay queue

Version Notes:

  • spring boot 2.6.0
  • redisson-spring-boot-starter 3.28.0
1. Add dependencies & configuration
 <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.28.0</version>
</dependency> 

application.properties

spring.application.name=springboot-redis-delayed-queue-demo

spring.redis.database=2
spring.redis.host=localhost
spring.redis.password=123456
spring.redis.port=6379 
2. Adding delayed tasks
package cn.aohan.delayedqueue.provider;

import cn.aohan.delayedqueue.model.DelayedTaskInfo;
import cn.aohan.delayedqueue.model.TaskData;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

/**
 * @author Aohan
 * @date 2024/4/19
 */
@Component
public class DelayedQueueProvider {

    private final RedissonClient redissonClient;

    public DelayedQueueProvider(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    /**
     * Add delayed tasks
     *
     * @param delayedName Delay name
     * @param val         value
     * @param delayTime   delay
     * @param timeUnit    time unit
     */
    public void addDelayedTask(String delayedName, TaskData val, long delayTime, TimeUnit timeUnit) {
        final DelayedTaskInfo task = new DelayedTaskInfo();
        task.setCreateAt(System.currentTimeMillis());
        task.setDelayTime(delayTime);
        task.setTimeUnit(timeUnit);
        task.setVal(val);
        task.setDelayedName(delayedName);
        final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(delayedName);
        delayedQueue.offer(task, delayTime, timeUnit);
    }

    /**
     * Delete task
     *
     * @param queueName queue name
     * @param taskId    Taskid
     */
    public void removeTask(String queueName, String taskId) {
        final RBlockingDeque<DelayedTaskInfo> blockingDeque = getBlockingDeque(queueName);
        final Predicate<DelayedTaskInfo> predicate = item -> {
            final TaskData val = item.getVal();
            return Objects.nonNull(val) && Objects.equals(taskId, val.taskId);
        };
        blockingDeque.removeIf(predicate);
        final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(getBlockingDeque(queueName));
        delayedQueue.removeIf(predicate);
    }

    /**
     * Get blockeddeque
     *
     * @param queueName queue name
     * @return {@link RBlockingDeque}<{@link DelayedTaskInfo}>
     */
    public RBlockingDeque<DelayedTaskInfo> getBlockingDeque(String queueName) {
        return redissonClient.getBlockingDeque(queueName, JsonJacksonCodec.INSTANCE);
    }

    /**
     * Get delay queue
     *
     * @param queueName queue name
     * @return {@link RDelayedQueue}<{@link DelayedTaskInfo}>
     */
    private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(String queueName) {
        return redissonClient.getDelayedQueue(getBlockingDeque(queueName));
    }

    /**
     * Get delay queue
     *
     * @param blockingDeque blockdeque
     * @return {@link RDelayedQueue}<{@link DelayedTaskInfo}>
     */
    private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(RBlockingDeque<DelayedTaskInfo> blockingDeque) {
        return redissonClient.getDelayedQueue(blockingDeque);
    }

} 
3. Monitoring delay task expiration

Delay queue name constant

/**
 * @author Aohan
 * @date 2024/4/19
 */
public class QueueConstant {

    /**
     * Test delayed task queue name
     */
    public static final String TEST_DELAYED_TASK_QUEUE = "test_delayed_task_queue";

} 

listener listening

package cn.aohan.delayedqueue.listener;

import cn.aohan.delayedqueue.constant.QueueConstant;
import cn.aohan.delayedqueue.model.DelayedTaskInfo;
import cn.aohan.delayedqueue.provider.DelayedQueueProvider;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * Delay task listener
 *
 * @author Aohan
 * @date 2024/4/19
 */
@RequiredArgsConstructor
@Slf4j
@Component
public class DelayedTaskListener implements ApplicationRunner {

    private final DelayedQueueProvider delayedQueueProvider;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        delayedTaskHandle(QueueConstant.TEST_DELAYED_TASK_QUEUE);
    }

    public void delayedTaskHandle(String delayedQueueName) {
        final Thread thread = new Thread(() -> {
            final RBlockingDeque<DelayedTaskInfo> blockingDeque = delayedQueueProvider.getBlockingDeque(delayedQueueName);
            while (true) {
                try {
                    //Take out expired data,Wait timeout
                    final DelayedTaskInfo delayedTaskInfo = blockingDeque.poll(2, TimeUnit.MINUTES);
                    if (Objects.isNull(delayedTaskInfo)) {
                        continue;
                    }
                    log.info("DelayedTask task :[{}]", delayedTaskInfo);
                } catch (Exception e) {
                    log.error("DelayedTaskListener#delayedTaskHandle error delayedQueueName:[{}]", delayedQueueName, e);
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }
} 
4. Add test delay task
package cn.aohan.delayedqueue.controller;

import cn.aohan.common.dto.Result;
import cn.aohan.delayedqueue.constant.QueueConstant;
import cn.aohan.delayedqueue.model.dto.TestDelayedDTO;
import cn.aohan.delayedqueue.provider.DelayedQueueProvider;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author Aohan
 * @date 2024/4/19
 */
@AllArgsConstructor
@RestController
@RequestMapping("/api/test/delayed")
public class DelayedQueueTestController {

    private final DelayedQueueProvider delayedQueueProvider;

    /**
     * Add delayed tasks
     *
     * @param delayedTask Delay tasks
     * @return {@link Result}<{@link Void}>
     */
    @PostMapping
    public Result<Void> addDelayedTask(@RequestBody TestDelayedDTO delayedTask) {
        delayedQueueProvider.addDelayedTask(
                QueueConstant.TEST_DELAYED_TASK_QUEUE,
                delayedTask.getVal(),
                delayedTask.getDelayTime(),
                delayedTask.getTimeUnit()
        );
        return Result.success();
    }

} 
5. General process and principles

When the delay queue is initially created, a QueueTransferTask will be created.

org.redisson.RedissonDelayedQueue#RedissonDelayedQueue

Please add image description
Please add image description

channelName = prefixName("redisson_delay_queue_channel", getRawName());

  • Here, subscribe to the delay queue (channel) to create task scheduling (mainly using the time wheel in netty).
  • Use pushTaskAsync to operate the Lua script to remove LIST and ZSET elements in redis.

Insert into the appropriate position in the center according to the delay time, mainly

A Lua script in the org.redisson.RedissonDelayedQueue#offerAsync method

local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);
redis.call('zadd', KEYS[2], ARGV[1], value);
redis.call('rpush', KEYS[3], value);
local v = redis.call('zrange', KEYS[2], 0, 0); 
if v[1] == value then 
   redis.call('publish', KEYS[4], ARGV[1]); 
end; 

To summarize, what this code does is:

  • Pack two strings into a binary value.
  • Add the packed value to a sorted set (zset) and assign it a score (current time + delay time).
  • Appends the same value to the end of a list.
  • If the added element is the first element in the sorted collection, a message is published to the subscribed channel above.

Then use BLPOP blocking to obtain the elements of the LIST

Redisson implements the principle of delay queue. Simply put, when data is inserted into the delay queue, it will be stored in the list and zset structures of the delay queue, and will be stored in the delay queue through task scheduling. The data of the period is taken out and then put into the blocking queue. The client blocks and pulls the data in the blocking queue through the BLPOP command. If the data is pulled, the business logic can be processed.

Project source code