Spring multi-threading implementation + reasonable setting of the maximum number of threads and the number of core threads

1. The simplest method:

  • You need to add the @EnableAsync annotation to the Spring Boot main class to enable asynchronous functionality;
  • The @Async annotation needs to be added to the asynchronous method.

The sample code is as follows:

@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

@Service
public class AsyncService {
    @Async
    public void asyncTask() {
        // Asynchronous task execution logic
    }
} 

advantage:

  • Simple and easy to use, just add @Async annotation to the method.
  • Relying on the Spring framework, it has high integration and can work seamlessly with other Spring components.

shortcoming:

  • The method must be public, otherwise asynchronous execution will be invalid.
  • Asynchronous execution results cannot be obtained directly, and types such as Future or CompletableFuture need to be used.

2. Based on the @Async annotation, how to rewrite the thread pool configuration yourself

2.1. Create thread pool
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@EnableAsync
@Configuration
public class ExecutorConfig {

    // Get the server's cpu number
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // Number of core threads
        executor.setCorePoolSize(CPU_COUNT * 2);
        // Maximum number of threads
        executor.setMaxPoolSize(CPU_COUNT * 4);
        // Thread idle time
        executor.setKeepAliveSeconds(50);
        // queue size
        executor.setQueueCapacity(CPU_COUNT * 16);
        // Thread name prefix
        executor.setThreadNamePrefix("");
        // Set deny policy
        // AbortPolicy            ->    Default policy,If the thread pool queue is full, drop the task and throw RejectedExecutionException abnormal
        // DiscardPolicy          ->    If the thread pool queue is full,The task will be discarded directly without any exception.
        // DiscardOldestPolicy    ->    If the queue is full,The earliest tasks that enter the queue will be deleted to make space.,Try joining the queue again
        // CallerRunsPolicy       ->    If adding to thread pool fails,Then the main thread will perform the task by itself,Will not wait for threads in the thread pool to execute
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // Wait for all tasks to finish before closing the thread pool
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
} 
2.2. Add @Async annotation to business method
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class TestServiceImpl {

    public static volatile AtomicInteger i = new AtomicInteger(0);

    /**
     * No result returned
     */
    @Async("taskExecutor")
    public void test() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Current No." + i.incrementAndGet() + "executions");
    }


    /**
     * There are results returned
     */
    @Async("taskExecutor")
    public Future<String> test2() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new AsyncResult("Current No." + i.incrementAndGet() + "executions");
    }
} 

You can also do without annotations, the code is as follows

package com.socketio.push.config;


import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;


/**
 * @author litong
 */
@Service
public class RedisSub implements MessageListener {


    @Resource(name = "taskExecutor")
    private ThreadPoolTaskExecutor taskExecutor;

    @Override
    public void onMessage(Message message, byte[] pattern) {

        taskExecutor.execute(()->{
            handleMsg(message);
        });

    }

    /**
     * deal withredisSubscribe to news
     * @param message
     */
    private void handleMsg(Message message){

    }

} 
Reasons why the annotation method fails

1. The annotated method must be a public method
2. The method must be called from another class, that is, from outside the class. Calls from within the class are invalid.
Because the implementation of the @Async annotation is based on AOP dynamic proxy, the reason why the internal call fails is because the method is called by the object itself instead of the proxy object, without going through the Spring container
3. The return value of an asynchronous method using the annotation @Async can only be void or Future

3. Use CompletableFuture to implement asynchronous tasks

CompletableFuture is a new asynchronous programming tool in Java 8, which can easily implement asynchronous tasks. Using CompletableFuture requires the following conditions:

  • The return value type of an asynchronous task must be CompletableFuture type;
  • Use the CompletableFuture.supplyAsync() or CompletableFuture.runAsync() method in an asynchronous task to create an asynchronous task;
  • Use the CompletableFuture.get() method in the main thread to obtain the return result of the asynchronous task.

The sample code is as follows:

@Service
public class AsyncService {
    public CompletableFuture<String> asyncTask() {
        return CompletableFuture.supplyAsync(() -> {
            // Asynchronous task execution logic
            return "Asynchronous task execution completed";
        });
    }
}

4. Use TaskExecutor to implement asynchronous tasks

TaskExecutor is an interface provided by Spring, which defines a method execute() for executing asynchronous tasks. The following conditions need to be met to use TaskExecutor:

  • A TaskExecutor instance needs to be configured in the Spring configuration file;
  • Call the execute() method of the TaskExecutor instance in an asynchronous method to execute an asynchronous task.

The sample code is as follows:

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    @Bean(name = "asyncExecutor")
    public TaskExecutor asyncExecutor() {

    // Get the server's cpu number
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CPU_COUNT);
        executor.setMaxPoolSize(CPU_COUNT * 2);
        executor.setQueueCapacity(CPU_COUNT * 100);
        executor.setThreadNamePrefix("async-");
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

@Service
public class AsyncService {
    @Autowired
    @Qualifier("asyncExecutor")
    private TaskExecutor taskExecutor;

    public void asyncTask() {
        taskExecutor.execute(() -> {
            // Asynchronous task execution logic
        });
    }
} 

To manually set the thread pool, you must set the maximum number of threads and the number of core threads reasonably. According to most opinions on the Internet, it is related to the CPU of the server:

  1. Set the number of core threads corePoolSize

  2. First check the number of CPU cores of the machine, and then set the specific parameters:
    System.out.println(Runtime.getRuntime().availableProcessors());
    That is, the number of CPU cores = Runtime.getRuntime().availableProcessors()

  3. Analyze whether the program processed by the thread pool is CPU-intensive or IO-intensive
    IO intensive: a lot of network, file operations
    IO intensive: Number of core threads = Number of CPU cores*2
    CPU-intensive: A lot of calculations, the closer the CPU usage is to 100%, consuming multiple cores or multiple machines
    CPU intensive: Number of core threads = Number of CPU cores + 1
    Note: IO intensive (practical experience of a large factory)
    Number of core threads = Number of CPU cores / (1-blocking coefficient) For example, the blocking coefficient is 0.8 and the number of CPU cores is 4
    Then the number of core threads is 20

 maxPoolSize:
When the system load reaches its maximum value, the number of core threads can no longer handle all tasks on time, and it is necessary to add threads. 200 tasks per second require 20 threads, then when the number of tasks per second reaches 1000, (1000-queueCapacity)*(20/200) is required, that is, 60 threads, and maxPoolSize can be set to 60. There is also a saying that it is cpuNUM*2 or cpuNUM*4

keepAliveTime:
It doesn't work if the number of threads is only increased but not decreased. When the load decreases, the number of threads can be reduced. If a thread's idle time reaches keepAliveTiime, the thread will exit. By default, the thread pool will maintain at least corePoolSize threads.

allowCoreThreadTimeout:
By default, the core thread will not exit. You can set this parameter to true to allow the core thread to exit as well.

queueCapacity:
The length of the task queue depends on the number of core threads and the system's task response time requirements. The queue length can be set to (corePoolSize/tasktime)*responsetime: (20/0.1)*2=400, that is, the queue length can be set to 400.
If the queue length is set too large, the task response time will be too long. Do not write in the following way:
LinkedBlockingQueue queue = new LinkedBlockingQueue();
This actually sets the queue length to Integer.MAX_VALUE, which will cause the number of threads to always be corePoolSize and never increase. When the number of tasks increases sharply, the task response time will also increase sharply.