[Java] Implementing Redis Stream queue in SpringBoot

SpringBoot implements Redis Stream queue

Preface

Simply implement the method of operating the Redis Stream queue in SpringBoot and monitor the messages in the queue for consumption.

jdk:1.8

springboot-version:2.6.3

redis: 5.0.1 (Stream queue is only available in version 5 or above)

Preparation

1pom

redis dependency package (version 2.6.3)

 <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency> 

2 yml

spring: 
  redis:
    database: 0
    host: 127.0.0.1 

3 RedisStreamUtil tool class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Component
public class RedisStreamUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * Create consumer group
     *
     * @param key   key name
     * @param group Group name
     * @return {@link String}
     */
    public String oup(String key, String group) {
        return redisTemplate.opsForStream().createGroup(key, group);
    }

    /**
     * Get consumer information
     *
     * @param key   key name
     * @param group Group name
     * @return {@link StreamInfo.XInfoConsumers}
     */
    public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
        return redisTemplate.opsForStream().consumers(key, group);
    }

    /**
     * Query group information
     *
     * @param key key name
     * @return
     */
    public StreamInfo.XInfoGroups queryGroups(String key) {
        return redisTemplate.opsForStream().groups(key);
    }

    // Add toMapinformation
    public String addMap(String key, Map<String, Object> value) {
        return redisTemplate.opsForStream().add(key, value).getValue();
    }

    // read message
    public List<MapRecord<String, Object, Object>> read(String key) {
        return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
    }

    // Confirm consumption
    public Long ack(String key, String group, String... recordIds) {
        return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
    }

    // Delete message。When all messages of a node are deleted,Then the node will be automatically destroyed
    public Long del(String key, String... recordIds) {
        return redisTemplate.opsForStream().delete(key, recordIds);
    }

    // Determine whether it existskey
    public boolean hasKey(String key) {
        Boolean aBoolean = redisTemplate.hasKey(key);
        return aBoolean != null && aBoolean;
    }
} 

Code

Producer sends message

The producer sends a message, creates an addMessage method in the Service layer, and sends the message to the queue.

The first parameter of the addMap() method in the code is key, and the second parameter is value. The key must be consistent with the subsequent configuration. Remember this key for now.

@Service
@Slf4j
@RequiredArgsConstructor
public class RedisStreamMqServiceImpl implements RedisStreamMqService {

    private final RedisStreamUtil redisStreamUtil;

    /**
     * send a message
     *
     * @return {@code Object}
     */
    @Override
    public Object addMessage() {
        RedisUser redisUser = new RedisUser();
        redisUser.setAge(18);
        redisUser.setName("hcr");
        redisUser.setEmail("156ef561@gmail.com");

        Map<String, Object> message = new HashMap<>();
        message.put("user", redisUser);

        String recordId = redisStreamUtil.addMap("mystream", message);
        return recordId;
    }
} 

controller interface methods

@RestController
@RequestMapping("/redis")
@Slf4j
@RequiredArgsConstructor
public class RedisController {

    private final RedisStreamMqService redisStreamMqService;

    @GetMapping("/addMessage")
    public Object addMessage() {
        return redisStreamMqService.addMessage();
    }
} 

Call the test to see if data is added to redis normally.

Interface returns data

1702622585248-0 

View data in redis

Consumers monitor messages for consumption

Create a RedisConsumersListener listener

import cn.hcr.utils.RedisStreamUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
@RequiredArgsConstructor
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {

    public final RedisStreamUtil redisStreamUtil;

    /**
     * listener
     *
     * @param message
     */
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        // streamofkeyvalue
        String streamKey = message.getStream();
        //informationID
        RecordId recordId = message.getId();
        //Message content
        Map<String, String> msg = message.getValue();
        log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);

        //processing logic

        //After logical processing is completed,ackinformation,Delete message,groupis the name of the consumer group
        StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);
        xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
        redisStreamUtil.del(streamKey, recordId.getValue());
    }
} 

Create a RedisConfig configuration class and configure monitoring

package cn.hcr.config;

import cn.hcr.listener.RedisConsumersListener;
import cn.hcr.utils.RedisStreamUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@Slf4j
public class RedisConfig {

    @Resource
    private RedisStreamUtil redisStreamUtil;

    /**
     * redisSerialization
     *
     * @param redisConnectionFactory
     * @return {@code RedisTemplate<String, Object>}
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory factory) {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // How many messages can be obtained at most at one time
                        .batchSize(5)
                        .executor(executor)
                        .pollTimeout(Duration.ofSeconds(1))
                        .errorHandler(throwable -> {
                            log.error("[MQ handler exception]", throwable);
                            throwable.printStackTrace();
                        })
                        .build();

        //ShouldkeyandgroupCan be customized according to needs
        String streamName = "mystream";
        String groupname = "mygroup";

        initStream(streamName, groupname);
        var listenerContainer = StreamMessageListenerContainer.create(factory, options);
        // Manualaskinformation
        Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
                StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));
        // automaticaskinformation
           /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
        listenerContainer.start();
        return subscription;
    }

    private void initStream(String key, String group) {
        boolean hasKey = redisStreamUtil.hasKey(key);
        if (!hasKey) {
            Map<String, Object> map = new HashMap<>(1);
            map.put("field", "value");
            //Create topic
            String result = redisStreamUtil.addMap(key, map);
            //Create consumer group
            redisStreamUtil.oup(key, group);
            //Delete the initialized value
            redisStreamUtil.del(key, result);
            log.info("stream:{}-group:{} initialize success", key, group);
        }
    }
} 

redisTemplate: This bean is used to configure redis serialization

subscription: configure listening

initStream: initialize consumer group

Listening test

After using the addMessage() method to deliver a message, view the console output information.

【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=[
    "cn.hcr.pojo.RedisUser",
    {"name":"hcr","age":18,"email":"156ef561@gmail.com"}
    ]
} 

Summarize

The above is a demo that simply implements the Redis Stream queue in SpringBoot. If you need the source code or are unclear, please comment or send a private message.
Template: This bean is used to configure redis serialization

subscription: configure listening

initStream: initialize consumer group