@Async註解詳解 以及 可能遇到的各種問題

2023-09-13 12:01:33

一、簡介
1)在方法上使用該@Async註解,申明該方法是一個非同步任務;
2)在類上面使用該@Async註解,申明該類中的所有方法都是非同步任務;
3)方法上一旦標記了這個@Async註解,當其它執行緒呼叫這個方法時,就會開啟一個新的子執行緒去非同步處理該業務邏輯。
4)使用此註解的方法的類物件,必須是spring管理下的bean物件;
5)要想使用非同步任務,需要在主類上開啟非同步設定,即設定上@EnableAsync註解;

注意事項

如下方式會使@Async失效

  • 非同步方法使用static修飾
  • 非同步類沒有使用@Component註解(或其他註解)導致spring無法掃描到非同步類
  • 非同步方法不能與被呼叫的非同步方法在同一個類中
  • 類中需要使用@Autowired或@Resource等註解自動注入,不能自己手動new物件
  • 如果使用SpringBoot框架必須在啟動類中增加@EnableAsync註解

二、使用

1、基礎程式碼範例

1)啟動類中增加@EnableAsync

以Spring boot 為例,啟動類中增加@EnableAsync:

@EnableAsync
@SpringBootApplication
public class ManageApplication {
    //...
}

2)方法上加@Async註解:

@Component
public class MyAsyncTask {
     @Async
    public void asyncCpsItemImportTask(Long platformId, String jsonList){
        //...具體業務邏輯
    }
}

2、隱含問題一:預設執行緒池設定不合適,導致系統奔潰

 

@Async註解在使用時,如果不指定執行緒池的名稱,則使用Spring預設的執行緒池,Spring預設的執行緒池為SimpleAsyncTaskExecutor。

 

該型別執行緒池的預設設定:

 預設核心執行緒數:8,
    
    最大執行緒數:Integet.MAX_VALUE,
    佇列使用LinkedBlockingQueue,
    容量是:Integet.MAX_VALUE,
    空閒執行緒保留時間:60s,
    執行緒池拒絕策略:AbortPolicy。

從最大執行緒數的設定上,相信你也看到問題了:並行情況下,會無限建立執行緒、然後OOM、然後系統崩潰。。。

 

1)問題一解決方法一:

可以通過修改執行緒池預設設定,來解決上述問題;

spring:
  task:
    execution:
      pool:
        max-size: 6
        core-size: 3
        keep-alive: 3s
        queue-capacity: 1000
        thread-name-prefix: name

 

2)問題一解決方法二:

@Async註解,支援使用自定義執行緒池,所以通過自定義執行緒池解決上述問題。
或者說,有時候、實際開發中就是要求你必修使用指定的執行緒池,@Async註解是支援的。

/**
 * @author HWX
 */
@Configuration
@EnableAsync  
public class ThreadPoolTaskConfig {

/*
    預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,
     當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取佇列當中;
   當佇列滿了,就繼續建立執行緒,當執行緒數量大於等於maxPoolSize後,開始使用拒絕策略拒絕
 */


    /** 允許執行緒空閒時間(單位:預設為秒) */  
    private static final int KEEP_ALIVE_TIME = 60;
    /** 緩衝佇列大小 */  
    private static final int QUEUE_CAPACITY = 1000;
    /** 執行緒池名字首 */  
    private static final String THREAD_NAME_PREFIX = "Async-Service-";
  
    @Bean("taskExecutor") // bean的名稱,預設為首字母小寫的方法名  
    public ThreadPoolTaskExecutor taskExecutor(){
        // 獲取當前機器CPU核數
        int cpuProcessors = Runtime.getRuntime().availableProcessors();
        if (cpuProcessors == 0) {
            cpuProcessors = 4;
        }
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(cpuProcessors);
        executor.setMaxPoolSize(cpuProcessors+1);
        executor.setQueueCapacity(QUEUE_CAPACITY);
        executor.setKeepAliveSeconds(KEEP_ALIVE_TIME);
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
  
        // 執行緒池對拒絕任務的處理策略  
        // CallerRunsPolicy:由呼叫執行緒(提交任務的執行緒)處理該任務  
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        // 初始化  
        executor.initialize();  
        return executor;  
    }  
}  

使用

@Service  
public class TranTest2Service {  
    Logger log = LoggerFactory.getLogger(TranTest2Service.class);  
  
    // 傳送提醒簡訊 1  
        @PostConstruct // 加上該註解專案啟動時就執行一次該方法  
    @Async("taskExecutor")  
    public void sendMessage1() throws InterruptedException {  
        log.info("傳送簡訊方法---- 1   執行開始");  
        Thread.sleep(5000); // 模擬耗時  
        log.info("傳送簡訊方法---- 1   執行結束");  
    }  
  
    // 傳送提醒簡訊 2  
        @PostConstruct // 加上該註解專案啟動時就執行一次該方法  
    @Async("taskExecutor")  
    public void sendMessage2() throws InterruptedException {  
  
        log.info("傳送簡訊方法---- 2   執行開始");  
        Thread.sleep(2000); // 模擬耗時  
        log.info("傳送簡訊方法---- 2   執行結束");  
    }  
}  

3、隱含問題二:非同步任務的事務問題

@Async註解由於是非同步執行的,在其進行資料庫的操作之時,將無法控制事務管理。
解決辦法:可以把@Transactional註解放到內部的需要進行事務的方法上
即將方法中對資料庫的操作集中提取出來、放入一個方法中,對該方法加@Transactional註解進行事務控制

4、隱含問題三:在同類方法中呼叫@Async方法,沒有非同步執行

@Async的原理概括:

@Async 非同步執行,是通過 Spring AOP 動態代理 的方式來實現的。Spring容器啟動初始化bean時,判斷類中是否使用了@Async註解,如果使用了則為其建立切入點和切入點處理器,根據切入點建立代理,線上程呼叫@Async註解標註的方法時,會呼叫代理,執行切入點處理器invoke方法,將方法的執行提交給執行緒池中的另外一個執行緒來處理,從而實現了非同步執行。

所以,如果a方法呼叫它同類中的標註@Async的b方法,是不會非同步執行的,因為從a方法進入呼叫的都是該類物件本身,不會進入代理類。因此,相同類中的方法呼叫帶@Async的方法是無法非同步的,這種情況仍然是同步。

三、非同步任務的返回結果

非同步的業務邏輯處理場景 有兩種:一個是不需要返回結果,另一種是需要接收返回結果。

不需要返回結果的比較簡單,就不多說了。

需要接收返回結果的範例如下:

@Async("MyExecutor")
public Future<Map<Long, List>> queryMap(List ids) {
    List<> result = businessService.queryMap(ids);
    ..............
    Map<Long, List> resultMap = Maps.newHashMap();
    ...
    return new AsyncResult<>(resultMap);
}

呼叫非同步方法的範例:

public Map<Long, List> asyncProcess(List<BindDeviceDO> bindDevices,List<BindStaffDO> bindStaffs, String dccId) {
        Map<Long, List> finalMap =null;
        // 返回值:
        Future<Map<Long, List>> asyncResult = MyService.queryMap(ids);
        try {
            finalMap = asyncResult.get();
        } catch (Exception e) {
            ...
        }
        return finalMap;
}

我個人覺得,非同步方法不該設定返回值;因為呼叫非同步方法的地方,還要等待返回結果的話,那就差不多又成了序列執行了,失去了非同步的意義。

 

四、檢測給@Async設定自定義執行緒池、會是整個專案共用的嗎?

 

1、執行緒池本身也會消耗記憶體資源,所以我們要控制執行緒池的規模,防止它佔用過多資源、進而影響專案執行;
2、為了統一規劃資源,執行緒池儘量統一設定,即全專案儘量使用同一個執行緒池。
3、那麼使用@Async,並自定義執行緒池,會全域性公用嗎?

我們做如下測試:

/**application.yml設定**/
# 自定義執行緒池引數(用以@Async使用,可選)
execution:
  pool:
    core-size: 3
    queue-capacity: 500
    max-size: 10
    keep-alive: 3
    thread-name-prefix: customize-th-


/**執行緒池設定類**/
@Configuration
public class ExecutorConfig {

    /**
     * 核心執行緒
     */
    @Value("${execution.pool.core-size}")
    private int corePoolSize;
    /**
     * 佇列容量
     */
    @Value("${execution.pool.queue-capacity}")
    private int queueCapacity;
    /**
     * 最大執行緒
     */
    @Value("${execution.pool.max-size}")
    private int maxPoolSize;
    /**
     * 保持時間
     */
    @Value("${execution.pool.keep-alive}")
    private int keepAliveSeconds;
    /**
     * 名稱字首
     */
    @Value("${execution.pool.thread-name-prefix}")
    private String preFix;

    @Bean("MyExecutor")
    public Executor myExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setThreadNamePrefix(preFix);
        executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }
}

2)寫兩個測試類,使用@Async標記方法

/**測試類A**/
@Service
public class TestServiceAImpl implements TestServiceA {

    @Async("MyExecutor")
    @Override
    public void testMethod1() {
        for (int i = 0; i < 10; i++) {
            final int index = i;
            System.out.println("A類方法一的 " + index + " 被執行,執行緒名:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Async("MyExecutor")
    @Override
    public void testMethod2() {
        for (int i = 0; i < 10; i++) {
            final int index = i;
            System.out.println("A類方法二的 " + index + " 被執行,執行緒名:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**測試類B**/
@Service
public class TestServiceBImpl implements TestServiceB {

    @Async("MyExecutor")
    @Override
    public void testMethod1() {
        for (int i = 0; i < 10; i++) {
            final int index = i;
            System.out.println("B類方法一的 " + index + " 被執行,執行緒名:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Async("MyExecutor")
    @Override
    public void testMethod2() {
        for (int i = 0; i < 10; i++) {
            final int index = i;
            System.out.println("B類方法二的 " + index + " 被執行,執行緒名:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

3)寫一個測試Controller介面,非同步呼叫兩個測試類的方法

@RestController
public class TestController{

    @Autowired
    private TestServiceA testServiceA;

    @Autowired
    private TestServiceB testServiceB;

    /**
     * 測試執行緒01
     */
    @GetMapping(value = "/threadTest")
    public void threadTest01() {
        System.out.println("【執行緒一】" + "group:"+ Thread.currentThread().getThreadGroup() + "; id:" +Thread.currentThread().getId()+"; name:"+ Thread.currentThread().getName());

        testServiceA.testMethod1();
        testServiceA.testMethod2();

        testServiceB.testMethod1();
        testServiceB.testMethod2();
    }
}

 

4)分析執行結果

 
2023-04-12 14:03:38.945 [http-nio-8085-exec-2] INFO  o.a.c.c.C.[.[.[/] - [log,173] - Initializing Spring DispatcherServlet 'dispatcherServlet'
【執行緒一】group:java.lang.ThreadGroup[name=main,maxpri=10]; id:85; name:http-nio-8085-exec-2
A類方法一的 0 被執行,執行緒名:customize-th-1
A類方法二的 0 被執行,執行緒名:customize-th-2
B類方法一的 0 被執行,執行緒名:customize-th-3
A類方法一的 1 被執行,執行緒名:customize-th-1
B類方法一的 1 被執行,執行緒名:customize-th-3
A類方法二的 1 被執行,執行緒名:customize-th-2
B類方法一的 2 被執行,執行緒名:customize-th-3
A類方法二的 2 被執行,執行緒名:customize-th-2
A類方法一的 2 被執行,執行緒名:customize-th-1
A類方法一的 3 被執行,執行緒名:customize-th-1
A類方法二的 3 被執行,執行緒名:customize-th-2
B類方法一的 3 被執行,執行緒名:customize-th-3
A類方法二的 4 被執行,執行緒名:customize-th-2
A類方法一的 4 被執行,執行緒名:customize-th-1
B類方法一的 4 被執行,執行緒名:customize-th-3
A類方法二的 5 被執行,執行緒名:customize-th-2
B類方法一的 5 被執行,執行緒名:customize-th-3
A類方法一的 5 被執行,執行緒名:customize-th-1
A類方法一的 6 被執行,執行緒名:customize-th-1
B類方法一的 6 被執行,執行緒名:customize-th-3
A類方法二的 6 被執行,執行緒名:customize-th-2
A類方法一的 7 被執行,執行緒名:customize-th-1
A類方法二的 7 被執行,執行緒名:customize-th-2
B類方法一的 7 被執行,執行緒名:customize-th-3
B類方法一的 8 被執行,執行緒名:customize-th-3
A類方法二的 8 被執行,執行緒名:customize-th-2
A類方法一的 8 被執行,執行緒名:customize-th-1
B類方法一的 9 被執行,執行緒名:customize-th-3
A類方法一的 9 被執行,執行緒名:customize-th-1
A類方法二的 9 被執行,執行緒名:customize-th-2
B類方法二的 0 被執行,執行緒名:customize-th-1
B類方法二的 1 被執行,執行緒名:customize-th-1
B類方法二的 2 被執行,執行緒名:customize-th-1
B類方法二的 3 被執行,執行緒名:customize-th-1
B類方法二的 4 被執行,執行緒名:customize-th-1
B類方法二的 5 被執行,執行緒名:customize-th-1
B類方法二的 6 被執行,執行緒名:customize-th-1
B類方法二的 7 被執行,執行緒名:customize-th-1
B類方法二的 8 被執行,執行緒名:customize-th-1
B類方法二的 9 被執行,執行緒名:customize-th-1

 

【分析】
從結果可以看到,A類、B類的方法交替執行,但是他們的執行緒都來自同一個執行緒池「customize-th-」、也就是我自己設定的執行緒池。
不僅如此,它們還遵循我對執行緒池的設定(核心執行緒數3),每當正在執行的執行緒滿3,不論是A類還是B類、接下來的任務就先放入佇列,等有空餘執行緒再執行。
從以上兩點可以確認,A類和B類用的是同一個執行緒池,@Async註解使用自定義執行緒池非同步執行任務,只要在註解後新增執行緒池設定名稱@Async(「MyExecutor」)、就可以實現整個專案公用同一個執行緒池。