【JAVA】普通IO資料拷貝次數的問題探討

2022-09-25 21:01:22

最近看到網上有些文章在討論JAVA中普通檔案IO讀/寫的時候經過了幾次資料拷貝,如果從系統呼叫開始分析,以讀取檔案為例,資料的讀取過程如下(以快取I/O為例):

  1. 應用程式呼叫read函數發起系統呼叫,此時由使用者空間切換到核心空間;
  2. 核心通過DMA從磁碟拷貝資料到核心緩衝區;
  3. 將核心緩衝區的資料拷貝到使用者空間的緩衝區,回到使用者空間;

整個讀取過程發生了兩次資料拷貝,一次是DMA將磁碟上的檔案資料拷貝到核心緩衝區,一次是將核心緩衝區的資料拷貝到使用者緩衝區。

在JAVA中,JVM劃分了堆記憶體,平時建立的物件基本都在堆中,不過也可以通過NIO包下的ByteBuffer申請堆外記憶體DirectByteBuffer:

ByteBuffer.allocateDirect(size);

無論是普通IO或者是NIO,在進行檔案讀寫的時候一般都會建立一個buffer作為資料的緩衝區,讀寫相關方法底層是通過呼叫native函數(JNI呼叫)來實現的,在進行讀寫時將buffer傳遞給JNI。
JNI一般使用C/C++程式碼實現,JNI底層呼叫C函數庫時,要求buffer所在記憶體地址上的內容不能失效,但是JVM在進行垃圾回收的時候有可能對物件進行移動,導致地址發生變化,所以通過NIO進行檔案讀取的時候,從原始碼中可以明顯看到對buffer的物件型別進行了判斷,如果buffer是DirectByteBuffer型別,使用的是堆外記憶體,直接使用即可,反之則認為使用的是堆內記憶體,此時需要先申請一塊堆外記憶體作為堆外記憶體buffer,然後進行系統呼叫,進行資料讀取,讀取完畢後將堆外記憶體buffer的內容再拷回JVM堆內記憶體buffer中,這裡一般是沒有疑問的。

比較有疑問的點是在普通IO中,讀寫檔案傳入的是位元組陣列byte[],一種說法是陣列一般分配的是連續的記憶體空間,即使記憶體地址發生了變化,根據陣列的首地址依舊可以找到整個陣列的記憶體,所以使用普通IO進行檔案讀寫的時候,不需要重新分配堆外記憶體,直接使用堆內的位元組陣列即可,為了探究普通IO到底有沒有重新申請堆外記憶體,接下來我們去看下原始碼。

普通IO

首先來看一下使用普通IO進行檔案讀取的例子,建立一個檔案輸入流和位元組陣列,通過輸入流讀取檔案到位元組陣列中,這裡的位元組陣列佔用的是JVM的堆內記憶體

   // 建立輸入流
    try (InputStream is = new FileInputStream("/document/123.txt")) {
        // 建立位元組陣列(堆內記憶體)
        byte[] bytes = new byte[1024];
        int len = 0;
        // 通過read方法讀取資料到bytes陣列
        while ((len = is.read(bytes)) != -1){
            String content = new String(bytes, 0, len);
            System.out.print(content);
        }
        is.read(bytes);
    } catch (Exception e) {
        e.printStackTrace();
    }

由於輸入流使用的FileInputStream,所以讀取檔案會進入到FileInputStream中的read方法,可以看到裡面又呼叫了readBytes方法,readBytes是一個native方法,裡面傳入了三個引數,分別為存放資料的位元組陣列、讀取檔案的起始位置和讀取資料的長度:

public class FileInputStream extends InputStream {
    /**
     * 讀取資料
     */
    public int read(byte b[]) throws IOException {
        return readBytes(b, 0, b.length);
    }

    /**
     * 讀取位元組資料
     * @param b 資料讀取後放入的位元組陣列
     * @param off 讀取起始位置
     * @param len 讀取資料的長度
     * @exception IOException If an I/O error has occurred.
     */
    private native int readBytes(byte b[], int off, int len) throws IOException;
}

接下來就需要去readBytes中看下到底有沒有使用傳入的堆內記憶體進行資料拷貝,由於readBytes是native方法,所以需要藉助openjdk原始碼來檢視具體的實現過程。

openjdk原始碼下載地址:http://hg.openjdk.java.net/

這裡以openjdk1.8為例,看一下readBytes的實現過程。

readBytes方法在原始碼解壓後的src\share\native\java\io\io_util.h檔案中,它的處理邏輯如下:

  1. 建立一個字元陣列stackBuf(堆外記憶體),大小為BUF_SIZE,從BUF_SIZE的定義中可以看出大小為8192位元組
  2. 對讀取資料長度進行判斷,如果大於8192,則根據長度重新分配一塊記憶體(堆外記憶體)作為資料緩衝區賦給buf變數,如果小於就使用預先分配的字元陣列stackBuf賦給buf變數
  3. 呼叫IO_Read函數讀取資料到buf變數中,IO_Read函數中進行了系統呼叫,通過DMA從磁碟讀取資料到核心緩衝區
  4. 呼叫SetByteArrayRegionbuf資料拷貝到bytes陣列中

readBytes的處理邏輯來看,並沒有直接使用傳入的位元組陣列(堆內記憶體)進行資料拷貝,而是重新分配了記憶體,這裡分配的是堆外記憶體,然後進行系統呼叫從磁碟讀取資料到核心緩衝區,再將核心緩衝區的資料拷貝到這裡分配的堆外記憶體中,最後呼叫SetByteArrayRegion將堆外記憶體的資料拷貝到堆內記憶體位元組陣列中。

/* 最大buffer大小
 */
#define BUF_SIZE 8192

// bytes對應傳入的位元組陣列(堆內記憶體),off對應起始位置,len對應讀取資料的長度
jint
readBytes(JNIEnv *env, jobject this, jbyteArray bytes,
          jint off, jint len, jfieldreadBytesID fid)
{
    jint nread;
    // 建立一個字元陣列,大小為BUF_SIZE,這裡分配的是堆外記憶體
    char stackBuf[BUF_SIZE];
    // 資料緩衝區
    char *buf = NULL;
    FD fd;
    // 校驗bytes是否為空
    if (IS_NULL(bytes)) {
        JNU_ThrowNullPointerException(env, NULL);
        return -1;
    }
    // 校驗是否越界
    if (outOfBounds(env, off, len, bytes)) {
        JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", NULL);
        return -1;
    }
    if (len == 0) { // 如果讀取資料長度為0直接返回
        return 0;
    } else if (len > BUF_SIZE) { // 如果讀取長度大於BUF_SIZE
        buf = malloc(len); // 分配記憶體(堆外記憶體)
        if (buf == NULL) {
            JNU_ThrowOutOfMemoryError(env, NULL);
            return 0;
        }
    } else {
        // 使用預先分配的陣列
        buf = stackBuf;
    }

    fd = GET_FD(this, fid);
    if (fd == -1) {
        JNU_ThrowIOException(env, "Stream Closed");
        nread = -1;
    } else {
        // 資料讀取
        nread = IO_Read(fd, buf, len);
        if (nread > 0) {
            // 將資料拷貝到堆內記憶體bytes中
            (*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);
        } else if (nread == -1) {
            JNU_ThrowIOExceptionWithLastError(env, "Read error");
        } else { /* EOF */
            nread = -1;
        }
    }

    if (buf != stackBuf) {
        free(buf);
    }
    return nread;
}

由於作業系統不同,系統呼叫的方法也不同,這裡以UNIX為例,看下IO_Read函數的具體實現。

IO_Read函數的定義在解壓後的src\solaris\native\java\io\io_util_md.h檔案中,可以看到IO_Read指向的是handleRead方法:

#define IO_Read handleRead

handleReadsrc\solaris\native\java\io\io_util_md.c中實現,可以看到裡面進行了系統呼叫,通過read函數讀取資料

ssize_t
handleRead(FD fd, void *buf, jint len)
{
    ssize_t result;
    // 進行系統呼叫,通過read函數讀取資料
    RESTARTABLE(read(fd, buf, len), result);
    return result;
}

普通IO資料讀取流程總結

  1. 發起JNI呼叫,建立堆外緩衝區;
  2. JNI中發起read系統呼叫,此時需要由使用者空間切換到核心空間;
  3. 進入到核心空間,DMA讀取檔案資料到核心緩衝區;
  4. 將核心緩衝區的資料拷貝到使用者緩衝區,切換回使用者空間;
  5. 將堆外緩衝區的資料拷貝到JVM堆內緩衝區中;

普通IO檔案讀取過程中並沒有因為使用位元組陣列而減少一次拷貝,讀取過程中資料發生了三次拷貝,分別是從DMA讀取資料到核心緩衝區、從核心緩衝區拷貝到使用者空間的堆外緩衝區和從堆外緩衝區拷貝到JVM堆內緩衝區。

檔案寫入的邏輯與讀取類似,具體可以通過原始碼檢視。

NIO

接下來再來看下NIO讀取檔案的過程。
使用NIO的FileChannel讀取檔案的例子:

		try (FileInputStream fileInputStream = new FileInputStream("/document/123.txt")) {
			// 獲取檔案對應的channel
			FileChannel channel = fileInputStream.getChannel();
                       // 分配buffer
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			// 將資料讀取到buffer
			channel.read(buffer);
		} catch (Exception e) {
			e.printStackTrace();
		}

接下來進入到FileChannelImpl的read方法中,由於jdk中沒有sun包下面的原始碼,IDEA只能通過反編譯檢視原始碼,有些引數會是var1、var2...這樣的變數名,不便於閱讀,所以還可以藉助openjdk中的原始碼來檢視實現,當然也可以從網上下載sun包的原始碼,放入jdk的原始碼包中。

FileChannelImplsrc/share/classes/sun/nio/ch/FileChannelImpl.java中,裡面又是通過IOUtilread方法讀取資料放入buffer中的:

public class FileChannelImpl extends FileChannel {
   public int read(ByteBuffer dst) throws IOException {
        ensureOpen();
        if (!readable)
            throw new NonReadableChannelException();
        synchronized (positionLock) {
            int n = 0;
            int ti = -1;
            try {
                begin();
                ti = threads.add();
                if (!isOpen())
                    return 0;
                do {
                    // 通過IOUtil的read方法讀取資料,fd為檔案描述符,dst為傳入的buffer
                    n = IOUtil.read(fd, dst, -1, nd);
                } while ((n == IOStatus.INTERRUPTED) && isOpen());
                return IOStatus.normalize(n);
            } finally {
                threads.remove(ti);
                end(n > 0);
                assert IOStatus.check(n);
            }
        }
    }
}

IOUtilsrc/share/classes/sun/nio/ch/IOUtil.java中,可以看到首先對傳入的buffer型別進行了判斷:

  1. 如果是DirectBuffer,直接呼叫readIntoNativeBuffer讀取資料即可;
  2. 如果不是DirectBuffer,表示佔用的堆內記憶體,此時需要UtilgetTemporaryDirectBuffer申請一塊堆外記憶體,然後呼叫readIntoNativeBuffer讀取資料;
public class IOUtil {
    static int read(FileDescriptor fd, ByteBuffer dst, long position,
                    NativeDispatcher nd)
        throws IOException
    {
        if (dst.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");
        // 如果目標buffer是DirectBuffer
        if (dst instanceof DirectBuffer)
            return readIntoNativeBuffer(fd, dst, position, nd); // 直接讀取資料

        // 重新分配一塊native buffer,也就是堆外記憶體
        ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
        try {
            // 讀取資料
            int n = readIntoNativeBuffer(fd, bb, position, nd);
            bb.flip();
            if (n > 0)
                dst.put(bb);
            return n;
        } finally {
            Util.offerFirstTemporaryDirectBuffer(bb);
        }
    }
}

Utilsrc/share/classes/sun/nio/ch/Util.java中。
Util中,使用了ThreadLocal快取了每個執行緒申請的記憶體buffer,在呼叫
getTemporaryDirectBuffer方法獲取記憶體時,首先會根據大小從ThreadLocal中獲取是否有滿足條件的buffer,如果有直接返回即可,如果大小不夠則重新申請,可以看到申請的是堆外記憶體:

public class Util {
    // Per-thread cache of temporary direct buffers
    private static ThreadLocal<BufferCache> bufferCache =
        new ThreadLocal<BufferCache>()
    {
        @Override
        protected BufferCache initialValue() {
            // 初始化,建立一個BufferCache
            return new BufferCache();
        }
    };
    
     /**
     * Returns a temporary buffer of at least the given size
     */
    public static ByteBuffer getTemporaryDirectBuffer(int size) {
        // 先從快取中獲取
        BufferCache cache = bufferCache.get();
        ByteBuffer buf = cache.get(size);
        // 如果獲取不為空
        if (buf != null) {
            return buf;
        } else {
            // 如果沒有合適的buffer則重新申請
            if (!cache.isEmpty()) {
                buf = cache.removeFirst();
                free(buf);
            }
            // 申請堆外記憶體
            return ByteBuffer.allocateDirect(size);
        }
    }
}

ByteBufferallocateDirect方法返回的是DirectByteBuffer

public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer>

    public static ByteBuffer allocateDirect(int capacity) {
        // 建立DirectByteBuffer
        return new DirectByteBuffer(capacity);
    }
}

參考

Java NIO direct buffer的優勢在哪兒?

JAVA IO專題一:java InputStream和OutputStream讀取檔案並通過socket傳送,到底涉及幾次拷貝