最近接到一個需求,批次上傳圖片到伺服器及實時更新上傳進度。當處理大量檔案上傳任務時,效率是一個關鍵因素。傳統的序列方式會導致任務耗時較長,而使用並行處理可以極大地提高上傳效率。想到很久之前用CompletableFuture優化過一些多統計的業務場景,效果都還不錯,因此在這裡也使用它來優化一下上傳的效率。
CompletableFuture類是Java 8引入的,它實現了Future和CompletionStage介面,提供了更強大和靈活的非同步程式設計功能。CompletableFuture除了具有Future的特性外,還提供了更多的操作和組合方式來處理非同步任務。它可以更方便地處理非同步任務,實現並行程式設計,並提供更好的例外處理和結果轉換機制。在進行非同步程式設計時,CompletableFuture是一個更為強大和推薦的選擇。
主要特點:
非同步執行:允許將任務提交給後臺執行緒,在任務執行期間不會阻塞主執行緒。這樣可以提高應用程式的響應效能,特別是在處理I/O密集型操作時,如網路請求或資料庫查詢。
鏈式呼叫和組合操作:支援鏈式呼叫,可以將多個非同步任務按照順序連線起來形成一個任務流水線。每個任務的執行依賴於前一個任務的結果,這種序列的處理方式可以簡化非同步任務的編寫和管理。
例外處理:提供了例外處理的機制,可以通過異常回撥方法來捕獲和處理任務執行過程中的異常情況。這樣可以更好地控制和處理任務執行過程中的異常,提供更健壯的程式碼。
轉換和合並結果:提供了一系列的轉換和合並操作,可以對任務的結果進行對映、轉換和合並。這樣可以方便地對任務的結果進行處理和轉換,得到最終期望的結果。
多工並行執行:支援等待多個任務並行執行,並等待它們全部完成或任意一個完成。這種能力使得在處理並行任務時可以更好地利用系統資源,提高任務執行的效率。
測試批次上傳了1000張圖片,每張圖片在579KB,一共564MB。使用序列方式上傳,總時長為501秒,使用並行方式上傳,總時長是108秒,通過對比優化前後的程式碼,可以明顯看出使用CompletableFuture並行處理方式的效率更高。由於任務是並行執行的,多核處理器的能力得到了充分的利用,從而大大提高了批次上傳的速度。
序列處理方式
/** * describe: 批次上傳圖片 * * @param files 圖片檔案集合 * @param fileId 資料夾id * @param scheduleKey 上傳進度key * @date 2023年06月28日 11:42:03 * @author Tang */ @Override public BatchUploadVO batchUpload2(MultipartFile[] files, Long fileId, String scheduleKey) { //取上傳設定 String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class); ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class); List<String> imgTypeList = Arrays.asList(config.getImgType().split(",")); List<String> errorNames = Lists.newCopyOnWriteArrayList(); String userName = SecurityAuthorHolder.getSecurityUser().getUsername(); for(MultipartFile file : files){ try { RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME); String suffix = Objects.requireNonNull(file.getOriginalFilename()).substring(file.getOriginalFilename().lastIndexOf(".") + 1); ServerException.Assert(!imgTypeList.contains(suffix), "檔案格式不正確,支援" + String.join(",", imgTypeList)); ServerException.Assert(file.getSize() > config.getMaxSize() * 1024, "檔案最大不能超過" + config.getMaxSize() + "K"); //上傳 ImgResourceEntity saveData = upload(file, config); saveData.setFileId(fileId); saveData.setCreator(userName); baseMapper.insert(saveData); //快取自增 供輪詢查詢實時進度 RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME); } catch (Exception e) { errorNames.add(file.getOriginalFilename()); RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME); } } BatchUploadVO vo = schedule(scheduleKey); vo.setErrFileNames(errorNames); return vo; }
序列處理呼叫時間
並行處理方式
/** * describe: 批次上傳圖片 * * @param files 圖片檔案集合 * @param fileId 資料夾id * @param scheduleKey 上傳進度key * @date 2023年06月28日 11:42:03 * @author Tang */ @Override public BatchUploadVO batchUpload(MultipartFile[] files, Long fileId, String scheduleKey) { ExecutorService executor = Executors.newFixedThreadPool(10); //取上傳設定 String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class); ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class); List<String> imgTypeList = Arrays.asList(config.getImgType().split(",")); List<String> errorNames = Lists.newCopyOnWriteArrayList(); String userName = SecurityAuthorHolder.getSecurityUser().getUsername(); CompletableFuture<Void> allFutures = CompletableFuture.allOf( Arrays.stream(files).map(v -> CompletableFuture.runAsync( () -> { try { RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME); String suffix = Objects.requireNonNull(v.getOriginalFilename()).substring(v.getOriginalFilename().lastIndexOf(".") + 1); ServerException.Assert(!imgTypeList.contains(suffix), "檔案格式不正確,支援" + String.join(",", imgTypeList)); ServerException.Assert(v.getSize() > config.getMaxSize() * 1024, "檔案最大不能超過" + config.getMaxSize() + "K"); //上傳 ImgResourceEntity saveData = upload(v, config); saveData.setFileId(fileId); saveData.setCreator(userName); baseMapper.insert(saveData); //快取自增 供輪詢查詢實時進度 RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME); } catch (Exception e) { errorNames.add(v.getOriginalFilename()); RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME); } }, executor) ).toArray(CompletableFuture[]::new) ); // 等待所有 CompletableFuture 完成 allFutures.join(); // 關閉執行緒池 executor.shutdown(); BatchUploadVO vo = schedule(scheduleKey); vo.setErrImgFileNames(errorNames); return vo; }
並行呼叫處理時間
執行緒池的使用:為了實現並行處理,可以使用執行緒池來管理並執行非同步任務。通過合理設定執行緒池的大小和引數,可以控制並行執行緒的數量和資源的利用率。
例外處理:在並行處理中,每個任務都是獨立執行的,因此需要適當處理任務中可能出現的異常情況,避免異常的影響擴散。
進度更新:為了實時更新上傳進度,可以將每個任務的進度資訊儲存到Redis中,並在前端通過輪詢查詢的方式獲取最新的進度資訊。
執行緒安全:確保上傳邏輯的執行緒安全性,避免多執行緒環境下的競態條件和資料一致性問題。
使用CompletableFuture來優化批次上傳任務是一種高效且靈活的方式。通過並行處理,我們可以充分利用多核處理器的能力,提高任務的執行效率。同時,通過實時更新上傳進度並返回總體的上傳結果,可以給使用者更好的體驗。
在實現過程中,我們需要合理使用執行緒池、處理異常、保證資料同步和執行緒安全,以確保上傳任務的穩定性和效能。同時,我們還可以利用CompletableFuture提供的方法來處理任務的結果、異常和其他相關操作,以滿足具體的業務需求。
通過使用CompletableFuture進行批次上傳任務的優化,可以顯著提高系統的效能和使用者體驗,適用於需要處理大量並行任務的場景。