一、簡介
1)在方法上使用該@Async註解,申明該方法是一個非同步任務;
2)在類上面使用該@Async註解,申明該類中的所有方法都是非同步任務;
3)方法上一旦標記了這個@Async註解,當其它執行緒呼叫這個方法時,就會開啟一個新的子執行緒去非同步處理該業務邏輯。
4)使用此註解的方法的類物件,必須是spring管理下的bean物件;
5)要想使用非同步任務,需要在主類上開啟非同步設定,即設定上@EnableAsync註解;
如下方式會使@Async失效
以Spring boot 為例,啟動類中增加@EnableAsync:
@EnableAsync @SpringBootApplication public class ManageApplication { //... }
@Component public class MyAsyncTask { @Async public void asyncCpsItemImportTask(Long platformId, String jsonList){ //...具體業務邏輯 } }
@Async註解在使用時,如果不指定執行緒池的名稱,則使用Spring預設的執行緒池,Spring預設的執行緒池為SimpleAsyncTaskExecutor。
該型別執行緒池的預設設定:
預設核心執行緒數:8,
最大執行緒數:Integet.MAX_VALUE,
佇列使用LinkedBlockingQueue,
容量是:Integet.MAX_VALUE,
空閒執行緒保留時間:60s,
執行緒池拒絕策略:AbortPolicy。
從最大執行緒數的設定上,相信你也看到問題了:並行情況下,會無限建立執行緒、然後OOM、然後系統崩潰。。。
可以通過修改執行緒池預設設定,來解決上述問題;
spring: task: execution: pool: max-size: 6 core-size: 3 keep-alive: 3s queue-capacity: 1000 thread-name-prefix: name
@Async註解,支援使用自定義執行緒池,所以通過自定義執行緒池解決上述問題。
或者說,有時候、實際開發中就是要求你必修使用指定的執行緒池,@Async註解是支援的。
/** * @author HWX */ @Configuration @EnableAsync public class ThreadPoolTaskConfig { /* 預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務, 當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取佇列當中; 當佇列滿了,就繼續建立執行緒,當執行緒數量大於等於maxPoolSize後,開始使用拒絕策略拒絕 */ /** 允許執行緒空閒時間(單位:預設為秒) */ private static final int KEEP_ALIVE_TIME = 60; /** 緩衝佇列大小 */ private static final int QUEUE_CAPACITY = 1000; /** 執行緒池名字首 */ private static final String THREAD_NAME_PREFIX = "Async-Service-"; @Bean("taskExecutor") // bean的名稱,預設為首字母小寫的方法名 public ThreadPoolTaskExecutor taskExecutor(){ // 獲取當前機器CPU核數 int cpuProcessors = Runtime.getRuntime().availableProcessors(); if (cpuProcessors == 0) { cpuProcessors = 4; } ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(cpuProcessors); executor.setMaxPoolSize(cpuProcessors+1); executor.setQueueCapacity(QUEUE_CAPACITY); executor.setKeepAliveSeconds(KEEP_ALIVE_TIME); executor.setThreadNamePrefix(THREAD_NAME_PREFIX); // 執行緒池對拒絕任務的處理策略 // CallerRunsPolicy:由呼叫執行緒(提交任務的執行緒)處理該任務 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化 executor.initialize(); return executor; } }
使用
@Service public class TranTest2Service { Logger log = LoggerFactory.getLogger(TranTest2Service.class); // 傳送提醒簡訊 1 @PostConstruct // 加上該註解專案啟動時就執行一次該方法 @Async("taskExecutor") public void sendMessage1() throws InterruptedException { log.info("傳送簡訊方法---- 1 執行開始"); Thread.sleep(5000); // 模擬耗時 log.info("傳送簡訊方法---- 1 執行結束"); } // 傳送提醒簡訊 2 @PostConstruct // 加上該註解專案啟動時就執行一次該方法 @Async("taskExecutor") public void sendMessage2() throws InterruptedException { log.info("傳送簡訊方法---- 2 執行開始"); Thread.sleep(2000); // 模擬耗時 log.info("傳送簡訊方法---- 2 執行結束"); } }
@Async註解由於是非同步執行的,在其進行資料庫的操作之時,將無法控制事務管理。
解決辦法:可以把@Transactional註解放到內部的需要進行事務的方法上;
即將方法中對資料庫的操作集中提取出來、放入一個方法中,對該方法加@Transactional註解進行事務控制
@Async的原理概括:
@Async 非同步執行,是通過 Spring AOP 動態代理 的方式來實現的。Spring容器啟動初始化bean時,判斷類中是否使用了@Async註解,如果使用了則為其建立切入點和切入點處理器,根據切入點建立代理,線上程呼叫@Async註解標註的方法時,會呼叫代理,執行切入點處理器invoke方法,將方法的執行提交給執行緒池中的另外一個執行緒來處理,從而實現了非同步執行。
所以,如果a方法呼叫它同類中的標註@Async的b方法,是不會非同步執行的,因為從a方法進入呼叫的都是該類物件本身,不會進入代理類。因此,相同類中的方法呼叫帶@Async的方法是無法非同步的,這種情況仍然是同步。
非同步的業務邏輯處理場景 有兩種:一個是不需要返回結果,另一種是需要接收返回結果。
不需要返回結果的比較簡單,就不多說了。
需要接收返回結果的範例如下:
@Async("MyExecutor") public Future<Map<Long, List>> queryMap(List ids) { List<> result = businessService.queryMap(ids); .............. Map<Long, List> resultMap = Maps.newHashMap(); ... return new AsyncResult<>(resultMap); }
呼叫非同步方法的範例:
public Map<Long, List> asyncProcess(List<BindDeviceDO> bindDevices,List<BindStaffDO> bindStaffs, String dccId) { Map<Long, List> finalMap =null; // 返回值: Future<Map<Long, List>> asyncResult = MyService.queryMap(ids); try { finalMap = asyncResult.get(); } catch (Exception e) { ... } return finalMap; }
我個人覺得,非同步方法不該設定返回值;因為呼叫非同步方法的地方,還要等待返回結果的話,那就差不多又成了序列執行了,失去了非同步的意義。
1、執行緒池本身也會消耗記憶體資源,所以我們要控制執行緒池的規模,防止它佔用過多資源、進而影響專案執行;
2、為了統一規劃資源,執行緒池儘量統一設定,即全專案儘量使用同一個執行緒池。
3、那麼使用@Async,並自定義執行緒池,會全域性公用嗎?
我們做如下測試:
/**application.yml設定**/ # 自定義執行緒池引數(用以@Async使用,可選) execution: pool: core-size: 3 queue-capacity: 500 max-size: 10 keep-alive: 3 thread-name-prefix: customize-th- /**執行緒池設定類**/ @Configuration public class ExecutorConfig { /** * 核心執行緒 */ @Value("${execution.pool.core-size}") private int corePoolSize; /** * 佇列容量 */ @Value("${execution.pool.queue-capacity}") private int queueCapacity; /** * 最大執行緒 */ @Value("${execution.pool.max-size}") private int maxPoolSize; /** * 保持時間 */ @Value("${execution.pool.keep-alive}") private int keepAliveSeconds; /** * 名稱字首 */ @Value("${execution.pool.thread-name-prefix}") private String preFix; @Bean("MyExecutor") public Executor myExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setThreadNamePrefix(preFix); executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy()); executor.initialize(); return executor; } }
/**測試類A**/ @Service public class TestServiceAImpl implements TestServiceA { @Async("MyExecutor") @Override public void testMethod1() { for (int i = 0; i < 10; i++) { final int index = i; System.out.println("A類方法一的 " + index + " 被執行,執行緒名:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } @Async("MyExecutor") @Override public void testMethod2() { for (int i = 0; i < 10; i++) { final int index = i; System.out.println("A類方法二的 " + index + " 被執行,執行緒名:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } /**測試類B**/ @Service public class TestServiceBImpl implements TestServiceB { @Async("MyExecutor") @Override public void testMethod1() { for (int i = 0; i < 10; i++) { final int index = i; System.out.println("B類方法一的 " + index + " 被執行,執行緒名:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } @Async("MyExecutor") @Override public void testMethod2() { for (int i = 0; i < 10; i++) { final int index = i; System.out.println("B類方法二的 " + index + " 被執行,執行緒名:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
@RestController public class TestController{ @Autowired private TestServiceA testServiceA; @Autowired private TestServiceB testServiceB; /** * 測試執行緒01 */ @GetMapping(value = "/threadTest") public void threadTest01() { System.out.println("【執行緒一】" + "group:"+ Thread.currentThread().getThreadGroup() + "; id:" +Thread.currentThread().getId()+"; name:"+ Thread.currentThread().getName()); testServiceA.testMethod1(); testServiceA.testMethod2(); testServiceB.testMethod1(); testServiceB.testMethod2(); } }
2023-04-12 14:03:38.945 [http-nio-8085-exec-2] INFO o.a.c.c.C.[.[.[/] - [log,173] - Initializing Spring DispatcherServlet 'dispatcherServlet' 【執行緒一】group:java.lang.ThreadGroup[name=main,maxpri=10]; id:85; name:http-nio-8085-exec-2 A類方法一的 0 被執行,執行緒名:customize-th-1 A類方法二的 0 被執行,執行緒名:customize-th-2 B類方法一的 0 被執行,執行緒名:customize-th-3 A類方法一的 1 被執行,執行緒名:customize-th-1 B類方法一的 1 被執行,執行緒名:customize-th-3 A類方法二的 1 被執行,執行緒名:customize-th-2 B類方法一的 2 被執行,執行緒名:customize-th-3 A類方法二的 2 被執行,執行緒名:customize-th-2 A類方法一的 2 被執行,執行緒名:customize-th-1 A類方法一的 3 被執行,執行緒名:customize-th-1 A類方法二的 3 被執行,執行緒名:customize-th-2 B類方法一的 3 被執行,執行緒名:customize-th-3 A類方法二的 4 被執行,執行緒名:customize-th-2 A類方法一的 4 被執行,執行緒名:customize-th-1 B類方法一的 4 被執行,執行緒名:customize-th-3 A類方法二的 5 被執行,執行緒名:customize-th-2 B類方法一的 5 被執行,執行緒名:customize-th-3 A類方法一的 5 被執行,執行緒名:customize-th-1 A類方法一的 6 被執行,執行緒名:customize-th-1 B類方法一的 6 被執行,執行緒名:customize-th-3 A類方法二的 6 被執行,執行緒名:customize-th-2 A類方法一的 7 被執行,執行緒名:customize-th-1 A類方法二的 7 被執行,執行緒名:customize-th-2 B類方法一的 7 被執行,執行緒名:customize-th-3 B類方法一的 8 被執行,執行緒名:customize-th-3 A類方法二的 8 被執行,執行緒名:customize-th-2 A類方法一的 8 被執行,執行緒名:customize-th-1 B類方法一的 9 被執行,執行緒名:customize-th-3 A類方法一的 9 被執行,執行緒名:customize-th-1 A類方法二的 9 被執行,執行緒名:customize-th-2 B類方法二的 0 被執行,執行緒名:customize-th-1 B類方法二的 1 被執行,執行緒名:customize-th-1 B類方法二的 2 被執行,執行緒名:customize-th-1 B類方法二的 3 被執行,執行緒名:customize-th-1 B類方法二的 4 被執行,執行緒名:customize-th-1 B類方法二的 5 被執行,執行緒名:customize-th-1 B類方法二的 6 被執行,執行緒名:customize-th-1 B類方法二的 7 被執行,執行緒名:customize-th-1 B類方法二的 8 被執行,執行緒名:customize-th-1 B類方法二的 9 被執行,執行緒名:customize-th-1
【分析】
從結果可以看到,A類、B類的方法交替執行,但是他們的執行緒都來自同一個執行緒池「customize-th-」、也就是我自己設定的執行緒池。
不僅如此,它們還遵循我對執行緒池的設定(核心執行緒數3),每當正在執行的執行緒滿3,不論是A類還是B類、接下來的任務就先放入佇列,等有空餘執行緒再執行。
從以上兩點可以確認,A類和B類用的是同一個執行緒池,@Async註解使用自定義執行緒池非同步執行任務,只要在註解後新增執行緒池設定名稱@Async(「MyExecutor」)、就可以實現整個專案公用同一個執行緒池。