SpringBoot implements Redis Stream queue
Preface
Simply implement the method of operating the Redis Stream queue in SpringBoot and monitor the messages in the queue for consumption.
jdk:1.8
springboot-version:2.6.3
redis: 5.0.1 (Stream queue is only available in version 5 or above)
Preparation
1pom
redis dependency package (version 2.6.3)
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2 yml
spring:
redis:
database: 0
host: 127.0.0.1
3 RedisStreamUtil
tool class
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
@Component
public class RedisStreamUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* Create consumer group
*
* @param key key name
* @param group Group name
* @return {@link String}
*/
public String oup(String key, String group) {
return redisTemplate.opsForStream().createGroup(key, group);
}
/**
* Get consumer information
*
* @param key key name
* @param group Group name
* @return {@link StreamInfo.XInfoConsumers}
*/
public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
return redisTemplate.opsForStream().consumers(key, group);
}
/**
* Query group information
*
* @param key key name
* @return
*/
public StreamInfo.XInfoGroups queryGroups(String key) {
return redisTemplate.opsForStream().groups(key);
}
// Add toMapinformation
public String addMap(String key, Map<String, Object> value) {
return redisTemplate.opsForStream().add(key, value).getValue();
}
// read message
public List<MapRecord<String, Object, Object>> read(String key) {
return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
}
// Confirm consumption
public Long ack(String key, String group, String... recordIds) {
return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
}
// Delete message。When all messages of a node are deleted,Then the node will be automatically destroyed
public Long del(String key, String... recordIds) {
return redisTemplate.opsForStream().delete(key, recordIds);
}
// Determine whether it existskey
public boolean hasKey(String key) {
Boolean aBoolean = redisTemplate.hasKey(key);
return aBoolean != null && aBoolean;
}
}
Code
Producer sends message
The producer sends a message, creates an addMessage
method in the Service layer, and sends the message to the queue.
The first parameter of the addMap()
method in the code is key, and the second parameter is value. The key must be consistent with the subsequent configuration. Remember this key for now.
@Service
@Slf4j
@RequiredArgsConstructor
public class RedisStreamMqServiceImpl implements RedisStreamMqService {
private final RedisStreamUtil redisStreamUtil;
/**
* send a message
*
* @return {@code Object}
*/
@Override
public Object addMessage() {
RedisUser redisUser = new RedisUser();
redisUser.setAge(18);
redisUser.setName("hcr");
redisUser.setEmail("156ef561@gmail.com");
Map<String, Object> message = new HashMap<>();
message.put("user", redisUser);
String recordId = redisStreamUtil.addMap("mystream", message);
return recordId;
}
}
controller interface methods
@RestController
@RequestMapping("/redis")
@Slf4j
@RequiredArgsConstructor
public class RedisController {
private final RedisStreamMqService redisStreamMqService;
@GetMapping("/addMessage")
public Object addMessage() {
return redisStreamMqService.addMessage();
}
}
Call the test to see if data is added to redis normally.
Interface returns data
1702622585248-0
View data in redis
Consumers monitor messages for consumption
Create a RedisConsumersListener
listener
import cn.hcr.utils.RedisStreamUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Slf4j
@RequiredArgsConstructor
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {
public final RedisStreamUtil redisStreamUtil;
/**
* listener
*
* @param message
*/
@Override
public void onMessage(MapRecord<String, String, String> message) {
// streamofkeyvalue
String streamKey = message.getStream();
//informationID
RecordId recordId = message.getId();
//Message content
Map<String, String> msg = message.getValue();
log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);
//processing logic
//After logical processing is completed,ackinformation,Delete message,groupis the name of the consumer group
StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);
xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
redisStreamUtil.del(streamKey, recordId.getValue());
}
}
Create a RedisConfig
configuration class and configure monitoring
package cn.hcr.config;
import cn.hcr.listener.RedisConsumersListener;
import cn.hcr.utils.RedisStreamUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Configuration
@Slf4j
public class RedisConfig {
@Resource
private RedisStreamUtil redisStreamUtil;
/**
* redisSerialization
*
* @param redisConnectionFactory
* @return {@code RedisTemplate<String, Object>}
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean
public Subscription subscription(RedisConnectionFactory factory) {
AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
// How many messages can be obtained at most at one time
.batchSize(5)
.executor(executor)
.pollTimeout(Duration.ofSeconds(1))
.errorHandler(throwable -> {
log.error("[MQ handler exception]", throwable);
throwable.printStackTrace();
})
.build();
//ShouldkeyandgroupCan be customized according to needs
String streamName = "mystream";
String groupname = "mygroup";
initStream(streamName, groupname);
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
// Manualaskinformation
Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));
// automaticaskinformation
/* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
listenerContainer.start();
return subscription;
}
private void initStream(String key, String group) {
boolean hasKey = redisStreamUtil.hasKey(key);
if (!hasKey) {
Map<String, Object> map = new HashMap<>(1);
map.put("field", "value");
//Create topic
String result = redisStreamUtil.addMap(key, map);
//Create consumer group
redisStreamUtil.oup(key, group);
//Delete the initialized value
redisStreamUtil.del(key, result);
log.info("stream:{}-group:{} initialize success", key, group);
}
}
}
redisTemplate: This bean is used to configure redis serialization
subscription: configure listening
initStream: initialize consumer group
Listening test
After using the addMessage()
method to deliver a message, view the console output information.
【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=[
"cn.hcr.pojo.RedisUser",
{"name":"hcr","age":18,"email":"156ef561@gmail.com"}
]
}
Summarize
The above is a demo that simply implements the Redis Stream queue in SpringBoot. If you need the source code or are unclear, please comment or send a private message.
Template: This bean is used to configure redis serialization
subscription: configure listening
initStream: initialize consumer group