Java 多執行緒寫zip檔案遇到的錯誤 write beyond end of stream!

2022-11-05 15:00:12

  最近在寫一個大量小檔案直接壓縮到一個zip的需求,由於zip中的entry每一個都是獨立的,不需要追加寫入,也就是一個entry檔案,寫一個內容,

因此直接使用了多執行緒來處理,結果就翻車了,程式碼給出瞭如下的錯誤write beyond end of stream!

      下面直接還原當時的程式碼場景:

 1 public class MultiThreadWriteZipFile {
 2 
 3     private static ExecutorService executorService = Executors.newFixedThreadPool(50);
 4 
 5     private static  CountDownLatch countDownLatch = new CountDownLatch(50);
 6 
 7 
 8     @Test
 9     public void multiThreadWriteZip() throws IOException, InterruptedException {
10         File file = new File("D:\\Gis開發\\資料\\影像資料\\china_tms\\2\\6\\2.jpeg");
11         //建立一個zip
12         ZipOutputStream zipOutputStream =
13                 new ZipOutputStream(new FileOutputStream(new File("E:\\java\\test\\test.zip")));
14 
15         for (int i = 0; i < 50; i++){
16             String entryName = i + File.separator + i + File.separator + i + ".jpeg";
17             executorService.submit(() -> {
18                 try {
19                     writeSource2ZipFile(new FileInputStream(file),entryName,zipOutputStream);
20                     countDownLatch.countDown();
21                 } catch (IOException e) {
22                     e.getLocalizedMessage();
23                 }
24             });
25         }
26         //阻塞主執行緒
27         countDownLatch.await();
28         //關閉流
29         zipOutputStream.close();
30     }
31 
32 
33     public void writeSource2ZipFile(InputStream inputStream,
34                                            String zipEntryName,
35                                            ZipOutputStream zipOutputStream) throws IOException {
36         //新建entry
37         zipOutputStream.putNextEntry(new ZipEntry(zipEntryName));
38         byte[] buf = new byte[1024];
39         int position;
40         //entry中寫資料
41         while((position = inputStream.read(buf)) != -1){
42             zipOutputStream.write(buf);
43         }
44         zipOutputStream.closeEntry();
45         zipOutputStream.flush();
46     }
47 }

 直接執行上面的程式碼就會報錯:write beyond end of stream

 將 private static ExecutorService executorService = Executors.newFixedThreadPool(50);

修改為

private static ExecutorSercvice executorService = Executors.newSingleThreadExecutor();

此時程式碼執行正常!

至於原因嘛,我們跟蹤下程式碼也就明白其中的原因了,我們先來看報錯的程式碼出處:

在java.util包下的DeflaterOutputStream的201行(jdk1.8,其它版本可能會有差異),我們來看程式碼

 public void write(byte[] b, int off, int len) throws IOException {
        if (def.finished()) {
            throw new IOException("write beyond end of stream");
        }
        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
            throw new IndexOutOzfBoundsException();
        } else if (len == 0) {
            return;
        }
        if (!def.finished()) {
            def.setInput(b, off, len);
            while (!def.needsInput()) {
                deflate();
            }
        }
    }

關鍵的原因就是def.finished()對應的狀態資訊,而這個狀態是在Deflater這個類中定義的,這個類也是Java基於ZLIB壓縮庫實現的,一個壓縮工具類。

而下面的這段程式碼就是改變這個狀態的,

public void finish() {
        synchronized (zsRef) {
            finish = true;
        }
    }

而這個程式碼的呼叫之處,最源頭就是我們上面的zipOutputStream.putNextEntry(new ZipEntry(zipEntryName)); 這行程式碼,

其實先思路,就是每次新增一個entry的時候,都需要將上一次的entry關閉掉,此時也就觸發了這個條件,而這個狀態並不是執行緒私有的,我們通過下面的程式碼就可以知道

public
class Deflater {

    private final ZStreamRef zsRef;
    private byte[] buf = new byte[0];
    private int off, len;
    private int level, strategy;
    private boolean setParams;
    private boolean finish, finished;
    private long bytesRead;
    private long bytesWritten;

因此在多執行緒下,這個狀態肯定是執行緒不安全的!

好了本次關於多執行緒下寫zip報錯的問題,就介紹到這裡!