InheritableThreadLocal 線上程池中進行父子執行緒間訊息傳遞出現訊息丟失的解析

2022-06-29 18:01:41

在日常研發過程中,我們經常面臨著需要線上程內,執行緒間進行訊息傳遞,比如在修改一些開源元件原始碼的過程中,需要將外部引數透傳到內部,如果進行方法引數過載,則涉及到的改動量過大,這樣,我們可以依賴ThreadLocal 來進行訊息傳遞。

ThreadLocal 是 儲存線上程棧幀中的一塊資料儲存區域,其可以做到執行緒與執行緒之間的讀寫隔離。

但是在我們的日常場景中,經常會出現 父執行緒 需要向子執行緒中傳遞訊息,而 ThreadLocal  僅能在當前執行緒上進行資料快取,因此 我們需要使用 InheritableThreadLocal  來實現 父子執行緒間的訊息傳遞

// 定義訊息
public class ThreadLocalMessage { private final InheritableThreadLocal<Msg> msg; private ThreadLocalMessage() { msg = new InheritableThreadLocal<>(); } public Msg getMsg() { return this.msg.get(); } public void setMsg(Msg msg) { this.msg.set(msg); } public void clear() { msg.remove(); } private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage(); public static ThreadLocalMessage getInstance() { return threadLocalMessage; } /** * 獲取執行緒中的訊息 * * @return */ public static Msg getOrCreateMsg() { Msg msg = ThreadLocalMessage.getInstance().getMsg(); if (msg == null) { msg = new Msg(); } return msg; } public static class Msg { /** * taskId */ private String taskId; private Map<String, Object> others; private int retCode; public Msg() { } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public String toString() { return "Msg{" + "taskId='" + taskId + '\'' + ", others=" + others + ", retCode=" + retCode + '}'; } } }

  

// 定義執行緒池
@EnableAsync @Configuration public class ExecutorConfig { private final Logger log = LoggerFactory.getLogger(getClass()); @Value("${executor.corePool:2}") private Integer corePool; @Value("${executor.maxPool:10}") private Integer maxPool; @Value("${executor.queue:2}") private Integer queue; @Bean("cdl-executor") public Executor executor() { log.info("start async Executor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //設定核心執行緒數 executor.setCorePoolSize(corePool); //設定最大執行緒數 executor.setMaxPoolSize(maxPool); //設定佇列大小 executor.setQueueCapacity(queue); //設定執行緒池中的執行緒的名稱字首 executor.setThreadNamePrefix("async-executor-"); // 設定拒絕策略 executor.setRejectedExecutionHandler((r, e) -> { // ..... }); // CALLER_RUNS:不在新執行緒中執行任務,而是有呼叫者所在的執行緒來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執行初始化 executor.initialize(); return executor;
// 使用TTL 初始化 executor //return TtlExecutors.getTtlExecutor(executor); } }
// 建立子執行緒進行訊息傳遞並列印
public String test() throws Exception{ for (int i = 0 ; i < 20; i++){ ThreadLocalMessage.Msg msg = ThreadLocalMessage.getOrCreateMsg(); msg.setTaskId("task_id_"+i); ThreadLocalMessage.getInstance().setMsg(msg); myService.testThread(i); ThreadLocalMessage.getInstance().clear(); } return "ok"; }

  

經過程式碼測試,我們建立了一個池子大小為10 的執行緒,並行啟動了20個執行緒去進行父子執行緒訊息傳遞,結果如下:

 

 

 

經過測試,我們發現 只有10個執行緒 的訊息傳遞成功了,其餘10個執行緒的訊息均丟失了,這是什麼原因呢。。。

遇到這個問題,我們首先得弄清楚 InheritableThreadLocal 是如何在父子執行緒間進行訊息傳遞的

InheritableThreadLocal 在父執行緒創建子執行緒的時候,會將父執行緒中InheritableThreadLocal  中儲存的資料 拷貝一份 儲存到子執行緒的 InheritableThreadLocal  中

而我們使用的 執行緒池,執行緒池是會反覆利用執行緒的,當執行緒池沒有被建立滿,每次都是新建立執行緒,直到執行緒池建立滿了,再需要使用執行緒就會從執行緒池中拿已經建立好的執行緒。

問題就出在這裡,由於後面的執行緒 是從執行緒池中去撈已經建立好的執行緒,不會走建立邏輯,也就無法觸發 InheritableThreadLocal 中向子執行緒 拷貝,這也就是為什麼  InheritableThreadLocal  合併執行緒池 使用時,出現了 訊息丟失的原因

 

如何解決????

阿里巴巴開源的TTL ,用於解決執行緒池中的父子執行緒複用,執行緒資料傳遞,可以完美解決這個問題

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>transmittable-thread-local</artifactId>
            <version>2.0.0</version>
        </dependency>

  

@EnableAsync
@Configuration
public class ExecutorConfig {

    private final Logger log = LoggerFactory.getLogger(getClass());

    @Value("${executor.corePool:2}")
    private Integer corePool;
    @Value("${executor.maxPool:10}")
    private Integer maxPool;
    @Value("${executor.queue:2}")
    private Integer queue;


    @Bean("cdl-executor")
    public Executor executor() {
        log.info("start async Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //設定核心執行緒數
        executor.setCorePoolSize(corePool);
        //設定最大執行緒數
        executor.setMaxPoolSize(maxPool);
        //設定佇列大小
        executor.setQueueCapacity(queue);
        //設定執行緒池中的執行緒的名稱字首
        executor.setThreadNamePrefix("async-executor-");

        // 設定拒絕策略
        executor.setRejectedExecutionHandler((r, e) -> {
            // .....
        });

        // CALLER_RUNS:不在新執行緒中執行任務,而是有呼叫者所在的執行緒來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        // 使用TTL 的 executor
        return TtlExecutors.getTtlExecutor(executor);
        //return executor;
    }
}

  

public class ThreadLocalMessage {

    private final TransmittableThreadLocal<Msg> msg;

    private ThreadLocalMessage() {
        msg = new TransmittableThreadLocal<>();
    }

    public Msg getMsg() {
        return this.msg.get();
    }

    public void setMsg(Msg msg) {
        this.msg.set(msg);
    }

    public void clear() {
        msg.remove();
    }

    private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage();

    public static ThreadLocalMessage getInstance() {
        return threadLocalMessage;
    }

    /**
     * 獲取執行緒中的訊息
     *
     * @return
     */
    public static Msg getOrCreateMsg() {
        Msg msg = ThreadLocalMessage.getInstance().getMsg();
        if (msg == null) {
            msg = new Msg();
        }
        return msg;
    }

    public static class Msg {

        /**
         * taskId
         */
        private String taskId;


        public Msg() {
        }

        public String getTaskId() {
            return taskId;
        }

        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }

        @Override
        public String toString() {
            return "Msg{" +
                    "taskId='" + taskId + '\'' +
                    '}';
        }
    }

}

  

按照之前的呼叫方法再試一次,結果如下:

 

 可以發現未出現資料丟失的情況