CompletableFuture之批次上傳

2023-06-30 18:00:56

前言

最近接到一個需求,批次上傳圖片到伺服器及實時更新上傳進度。當處理大量檔案上傳任務時,效率是一個關鍵因素。傳統的序列方式會導致任務耗時較長,而使用並行處理可以極大地提高上傳效率。想到很久之前用CompletableFuture優化過一些多統計的業務場景,效果都還不錯,因此在這裡也使用它來優化一下上傳的效率。

 

CompletableFuture簡介

CompletableFuture類是Java 8引入的,它實現了FutureCompletionStage介面,提供了更強大和靈活的非同步程式設計功能。CompletableFuture除了具有Future的特性外,還提供了更多的操作和組合方式來處理非同步任務。它可以更方便地處理非同步任務,實現並行程式設計,並提供更好的例外處理和結果轉換機制。在進行非同步程式設計時,CompletableFuture是一個更為強大和推薦的選擇。

主要特點:

  1. 非同步執行:允許將任務提交給後臺執行緒,在任務執行期間不會阻塞主執行緒。這樣可以提高應用程式的響應效能,特別是在處理I/O密集型操作時,如網路請求或資料庫查詢。

  2. 鏈式呼叫和組合操作:支援鏈式呼叫,可以將多個非同步任務按照順序連線起來形成一個任務流水線。每個任務的執行依賴於前一個任務的結果,這種序列的處理方式可以簡化非同步任務的編寫和管理。

  3. 例外處理:提供了例外處理的機制,可以通過異常回撥方法來捕獲和處理任務執行過程中的異常情況。這樣可以更好地控制和處理任務執行過程中的異常,提供更健壯的程式碼。

  4. 轉換和合並結果:提供了一系列的轉換和合並操作,可以對任務的結果進行對映、轉換和合並。這樣可以方便地對任務的結果進行處理和轉換,得到最終期望的結果。

  5. 多工並行執行:支援等待多個任務並行執行,並等待它們全部完成或任意一個完成。這種能力使得在處理並行任務時可以更好地利用系統資源,提高任務執行的效率。

 

序列和並行的效率對比

測試批次上傳了1000張圖片,每張圖片在579KB,一共564MB。使用序列方式上傳,總時長為501秒,使用並行方式上傳,總時長是108秒,通過對比優化前後的程式碼,可以明顯看出使用CompletableFuture並行處理方式的效率更高。由於任務是並行執行的,多核處理器的能力得到了充分的利用,從而大大提高了批次上傳的速度。

 

序列處理方式

/**
 * describe: 批次上傳圖片
 *
 * @param files 圖片檔案集合
 * @param fileId 資料夾id
 * @param scheduleKey 上傳進度key
 * @date 2023年06月28日 11:42:03
 * @author Tang
 */
@Override
public BatchUploadVO batchUpload2(MultipartFile[] files, Long fileId, String scheduleKey) {
	//取上傳設定
	String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class);
	ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class);
	List<String> imgTypeList = Arrays.asList(config.getImgType().split(","));

	List<String> errorNames = Lists.newCopyOnWriteArrayList();
	String userName = SecurityAuthorHolder.getSecurityUser().getUsername();

	for(MultipartFile file : files){
		try {
			RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
			String suffix = Objects.requireNonNull(file.getOriginalFilename()).substring(file.getOriginalFilename().lastIndexOf(".") + 1);
			ServerException.Assert(!imgTypeList.contains(suffix), "檔案格式不正確,支援" + String.join(",", imgTypeList));
			ServerException.Assert(file.getSize() > config.getMaxSize() * 1024, "檔案最大不能超過" + config.getMaxSize() + "K");
			//上傳
			ImgResourceEntity saveData = upload(file, config);
			saveData.setFileId(fileId);
			saveData.setCreator(userName);
			baseMapper.insert(saveData);
			//快取自增  供輪詢查詢實時進度
			RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
		} catch (Exception e) {
			errorNames.add(file.getOriginalFilename());
			RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
		}
	}

	BatchUploadVO vo = schedule(scheduleKey);
	vo.setErrFileNames(errorNames);
	return vo;
}

  

序列處理呼叫時間

 

並行處理方式

/**
 * describe: 批次上傳圖片
 *
 * @param files 圖片檔案集合
 * @param fileId 資料夾id
 * @param scheduleKey 上傳進度key
 * @date 2023年06月28日 11:42:03
 * @author Tang
 */
@Override
public BatchUploadVO batchUpload(MultipartFile[] files, Long fileId, String scheduleKey) {
	ExecutorService executor = Executors.newFixedThreadPool(10);

	//取上傳設定
	String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class);
	ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class);
	List<String> imgTypeList = Arrays.asList(config.getImgType().split(","));

	List<String> errorNames = Lists.newCopyOnWriteArrayList();
	String userName = SecurityAuthorHolder.getSecurityUser().getUsername();

	CompletableFuture<Void> allFutures = CompletableFuture.allOf(
			Arrays.stream(files).map(v ->
					CompletableFuture.runAsync(
							() -> {
								try {
									RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
									String suffix = Objects.requireNonNull(v.getOriginalFilename()).substring(v.getOriginalFilename().lastIndexOf(".") + 1);
									ServerException.Assert(!imgTypeList.contains(suffix), "檔案格式不正確,支援" + String.join(",", imgTypeList));
									ServerException.Assert(v.getSize() > config.getMaxSize() * 1024, "檔案最大不能超過" + config.getMaxSize() + "K");
									//上傳
									ImgResourceEntity saveData = upload(v, config);
									saveData.setFileId(fileId);
									saveData.setCreator(userName);
									baseMapper.insert(saveData);
									//快取自增  供輪詢查詢實時進度
									RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
								} catch (Exception e) {
									errorNames.add(v.getOriginalFilename());
									RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
								}
							}, executor)
			).toArray(CompletableFuture[]::new)
	);
	// 等待所有 CompletableFuture 完成
	allFutures.join();

	// 關閉執行緒池
	executor.shutdown();

	BatchUploadVO vo = schedule(scheduleKey);
	vo.setErrImgFileNames(errorNames);
	return vo;
}

  

 並行呼叫處理時間

 

實現過程中的注意事項

  1. 執行緒池的使用:為了實現並行處理,可以使用執行緒池來管理並執行非同步任務。通過合理設定執行緒池的大小和引數,可以控制並行執行緒的數量和資源的利用率。

  2. 例外處理:在並行處理中,每個任務都是獨立執行的,因此需要適當處理任務中可能出現的異常情況,避免異常的影響擴散。

  3. 進度更新:為了實時更新上傳進度,可以將每個任務的進度資訊儲存到Redis中,並在前端通過輪詢查詢的方式獲取最新的進度資訊。

  4. 執行緒安全:確保上傳邏輯的執行緒安全性,避免多執行緒環境下的競態條件和資料一致性問題。

 

總結

使用CompletableFuture來優化批次上傳任務是一種高效且靈活的方式。通過並行處理,我們可以充分利用多核處理器的能力,提高任務的執行效率。同時,通過實時更新上傳進度並返回總體的上傳結果,可以給使用者更好的體驗。
在實現過程中,我們需要合理使用執行緒池、處理異常、保證資料同步和執行緒安全,以確保上傳任務的穩定性和效能。同時,我們還可以利用CompletableFuture提供的方法來處理任務的結果、異常和其他相關操作,以滿足具體的業務需求。
通過使用CompletableFuture進行批次上傳任務的優化,可以顯著提高系統的效能和使用者體驗,適用於需要處理大量並行任務的場景。