Redisson implements delay queue
Version Notes:
- spring boot 2.6.0
- redisson-spring-boot-starter 3.28.0
1. Add dependencies & configuration
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.28.0</version>
</dependency>
application.properties
spring.application.name=springboot-redis-delayed-queue-demo
spring.redis.database=2
spring.redis.host=localhost
spring.redis.password=123456
spring.redis.port=6379
2. Adding delayed tasks
package cn.aohan.delayedqueue.provider;
import cn.aohan.delayedqueue.model.DelayedTaskInfo;
import cn.aohan.delayedqueue.model.TaskData;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
/**
* @author Aohan
* @date 2024/4/19
*/
@Component
public class DelayedQueueProvider {
private final RedissonClient redissonClient;
public DelayedQueueProvider(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* Add delayed tasks
*
* @param delayedName Delay name
* @param val value
* @param delayTime delay
* @param timeUnit time unit
*/
public void addDelayedTask(String delayedName, TaskData val, long delayTime, TimeUnit timeUnit) {
final DelayedTaskInfo task = new DelayedTaskInfo();
task.setCreateAt(System.currentTimeMillis());
task.setDelayTime(delayTime);
task.setTimeUnit(timeUnit);
task.setVal(val);
task.setDelayedName(delayedName);
final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(delayedName);
delayedQueue.offer(task, delayTime, timeUnit);
}
/**
* Delete task
*
* @param queueName queue name
* @param taskId Taskid
*/
public void removeTask(String queueName, String taskId) {
final RBlockingDeque<DelayedTaskInfo> blockingDeque = getBlockingDeque(queueName);
final Predicate<DelayedTaskInfo> predicate = item -> {
final TaskData val = item.getVal();
return Objects.nonNull(val) && Objects.equals(taskId, val.taskId);
};
blockingDeque.removeIf(predicate);
final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(getBlockingDeque(queueName));
delayedQueue.removeIf(predicate);
}
/**
* Get blockeddeque
*
* @param queueName queue name
* @return {@link RBlockingDeque}<{@link DelayedTaskInfo}>
*/
public RBlockingDeque<DelayedTaskInfo> getBlockingDeque(String queueName) {
return redissonClient.getBlockingDeque(queueName, JsonJacksonCodec.INSTANCE);
}
/**
* Get delay queue
*
* @param queueName queue name
* @return {@link RDelayedQueue}<{@link DelayedTaskInfo}>
*/
private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(String queueName) {
return redissonClient.getDelayedQueue(getBlockingDeque(queueName));
}
/**
* Get delay queue
*
* @param blockingDeque blockdeque
* @return {@link RDelayedQueue}<{@link DelayedTaskInfo}>
*/
private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(RBlockingDeque<DelayedTaskInfo> blockingDeque) {
return redissonClient.getDelayedQueue(blockingDeque);
}
}
3. Monitoring delay task expiration
Delay queue name constant
/**
* @author Aohan
* @date 2024/4/19
*/
public class QueueConstant {
/**
* Test delayed task queue name
*/
public static final String TEST_DELAYED_TASK_QUEUE = "test_delayed_task_queue";
}
listener listening
package cn.aohan.delayedqueue.listener;
import cn.aohan.delayedqueue.constant.QueueConstant;
import cn.aohan.delayedqueue.model.DelayedTaskInfo;
import cn.aohan.delayedqueue.provider.DelayedQueueProvider;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* Delay task listener
*
* @author Aohan
* @date 2024/4/19
*/
@RequiredArgsConstructor
@Slf4j
@Component
public class DelayedTaskListener implements ApplicationRunner {
private final DelayedQueueProvider delayedQueueProvider;
@Override
public void run(ApplicationArguments args) throws Exception {
delayedTaskHandle(QueueConstant.TEST_DELAYED_TASK_QUEUE);
}
public void delayedTaskHandle(String delayedQueueName) {
final Thread thread = new Thread(() -> {
final RBlockingDeque<DelayedTaskInfo> blockingDeque = delayedQueueProvider.getBlockingDeque(delayedQueueName);
while (true) {
try {
//Take out expired data,Wait timeout
final DelayedTaskInfo delayedTaskInfo = blockingDeque.poll(2, TimeUnit.MINUTES);
if (Objects.isNull(delayedTaskInfo)) {
continue;
}
log.info("DelayedTask task :[{}]", delayedTaskInfo);
} catch (Exception e) {
log.error("DelayedTaskListener#delayedTaskHandle error delayedQueueName:[{}]", delayedQueueName, e);
}
}
});
thread.setDaemon(true);
thread.start();
}
}
4. Add test delay task
package cn.aohan.delayedqueue.controller;
import cn.aohan.common.dto.Result;
import cn.aohan.delayedqueue.constant.QueueConstant;
import cn.aohan.delayedqueue.model.dto.TestDelayedDTO;
import cn.aohan.delayedqueue.provider.DelayedQueueProvider;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author Aohan
* @date 2024/4/19
*/
@AllArgsConstructor
@RestController
@RequestMapping("/api/test/delayed")
public class DelayedQueueTestController {
private final DelayedQueueProvider delayedQueueProvider;
/**
* Add delayed tasks
*
* @param delayedTask Delay tasks
* @return {@link Result}<{@link Void}>
*/
@PostMapping
public Result<Void> addDelayedTask(@RequestBody TestDelayedDTO delayedTask) {
delayedQueueProvider.addDelayedTask(
QueueConstant.TEST_DELAYED_TASK_QUEUE,
delayedTask.getVal(),
delayedTask.getDelayTime(),
delayedTask.getTimeUnit()
);
return Result.success();
}
}
5. General process and principles
When the delay queue is initially created, a QueueTransferTask
will be created.
org.redisson.RedissonDelayedQueue#RedissonDelayedQueue
channelName = prefixName("redisson_delay_queue_channel", getRawName());
- Here, subscribe to the delay queue (channel) to create task scheduling (mainly using the time wheel in netty).
- Use
pushTaskAsync
to operate the Lua script to remove LIST and ZSET elements in redis.
Insert into the appropriate position in the center according to the delay time, mainly
A Lua script in the org.redisson.RedissonDelayedQueue#offerAsync
method
local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);
redis.call('zadd', KEYS[2], ARGV[1], value);
redis.call('rpush', KEYS[3], value);
local v = redis.call('zrange', KEYS[2], 0, 0);
if v[1] == value then
redis.call('publish', KEYS[4], ARGV[1]);
end;
To summarize, what this code does is:
- Pack two strings into a binary value.
- Add the packed value to a sorted set (zset) and assign it a score (current time + delay time).
- Appends the same value to the end of a list.
- If the added element is the first element in the sorted collection, a message is published to the subscribed channel above.
Then use BLPOP blocking to obtain the elements of the LIST
Redisson implements the principle of delay queue. Simply put, when data is inserted into the delay queue, it will be stored in the list and zset structures of the delay queue, and will be stored in the delay queue through task scheduling. The data of the period is taken out and then put into the blocking queue. The client blocks and pulls the data in the blocking queue through the BLPOP command. If the data is pulled, the business logic can be processed.
Project source code