深入理解 Python 虛擬機器器:位元組(bytes)的實現原理及原始碼剖析

2023-03-24 06:00:21

深入理解 Python 虛擬機器器:位元組(bytes)的實現原理及原始碼剖析

在本篇文章當中主要給大家介紹在 cpython 內部,bytes 的實現原理、記憶體佈局以及與 bytes 相關的一個比較重要的優化點—— bytes 的拼接。

資料結構

typedef struct {
    PyObject_VAR_HEAD
    Py_hash_t ob_shash;
    char ob_sval[1];

    /* Invariants:
     *     ob_sval contains space for 'ob_size+1' elements.
     *     ob_sval[ob_size] == 0.
     *     ob_shash is the hash of the string or -1 if not computed yet.
     */
} PyBytesObject;

typedef struct {
    PyObject ob_base;
    Py_ssize_t ob_size; /* Number of items in variable part */
} PyVarObject;

typedef struct _object {
    Py_ssize_t ob_refcnt;
    struct _typeobject *ob_type;
} PyObject;

上面的資料結構用圖示如下所示:

現在我們來解釋一下上面的資料結構各個欄位的含義:

  • ob_refcnt,這個還是物件的參照計數的個數,主要是在垃圾回收的時候有用。
  • ob_type,這個是物件的資料型別。
  • ob_size,表示這個物件當中位元組的個數。
  • ob_shash,物件的雜湊值,如果還沒有計算,雜湊值為 -1 。
  • ob_sval,一個資料儲存一個位元組的資料,需要注意的是 ob_sval[size] 一定等於 '\0' ,表示字串的結尾。

可能你會有疑問上面的結構體當中並沒有後面的那麼多位元組啊,陣列只有一個位元組的資料啊,這是因為在 cpython 的實現當中除了申請 PyBytesObject 大的小記憶體空間之外,還會在這個基礎之上申請連續的額外的記憶體空間用於儲存資料,在後續的原始碼分析當中可以看到這一點。

下面我們舉幾個例子來說明一下上面的佈局:

上面是空和字串 abc 的位元組表示。

建立位元組物件

下面是在 cpython 當中通過位元組數建立 PyBytesObject 物件的函數。下面的函數的主要功能是建立一個能夠儲存 size 個位元組大小的資料的 PyBytesObject 物件,下面的函數最重要的一個步驟就是申請記憶體空間。

static PyObject *
_PyBytes_FromSize(Py_ssize_t size, int use_calloc)
{
    PyBytesObject *op;
    assert(size >= 0);

    if (size == 0 && (op = nullstring) != NULL) {
#ifdef COUNT_ALLOCS
        null_strings++;
#endif
        Py_INCREF(op);
        return (PyObject *)op;
    }

    if ((size_t)size > (size_t)PY_SSIZE_T_MAX - PyBytesObject_SIZE) {
        PyErr_SetString(PyExc_OverflowError,
                        "byte string is too large");
        return NULL;
    }

    /* Inline PyObject_NewVar */
    // PyBytesObject_SIZE + size 就是實際申請的記憶體空間的大小 PyBytesObject_SIZE 就是表示 PyBytesObject 各個欄位佔用的實際的記憶體空間大小
    if (use_calloc)
        op = (PyBytesObject *)PyObject_Calloc(1, PyBytesObject_SIZE + size);
    else
        op = (PyBytesObject *)PyObject_Malloc(PyBytesObject_SIZE + size);
    if (op == NULL)
        return PyErr_NoMemory();
    // 將物件的 ob_size 欄位賦值成 size 
    (void)PyObject_INIT_VAR(op, &PyBytes_Type, size);
    // 由於物件的雜湊值還沒有進行計算 因此現將雜湊值賦值成 -1
    op->ob_shash = -1;
    if (!use_calloc)
        op->ob_sval[size] = '\0';
    /* empty byte string singleton */
    if (size == 0) {
        nullstring = op;
        Py_INCREF(op);
    }
    return (PyObject *) op;
}

我們可以使用一個寫例子來看一下實際的 PyBytesObject 記憶體空間的大小。

>>> import sys
>>> a = b"hello world"
>>> sys.getsizeof(a)
44
>>>

上面的 44 = 32 + 11 + 1 。

其中 32 是 PyBytesObject 4 個欄位所佔用的記憶體空間,ob_refcnt、ob_type、ob_size和 ob_shash 各佔 8 個位元組。11 是表示字串 "hello world" 佔用 11 個位元組,最後一個位元組是 '\0' 。

檢視位元組長度

這個函數主要是返回 PyBytesObject 物件的位元組長度,也就是直接返回 ob_size 的值。

static Py_ssize_t
bytes_length(PyBytesObject *a)
{
    // (((PyVarObject*)(ob))->ob_size)
    return Py_SIZE(a);
}

位元組拼接

在 python 當中執行下面的程式碼就會執行位元組拼接函數:

>>> b"abc" + b"edf"

下方就是具體的執行位元組拼接的函數:

/* This is also used by PyBytes_Concat() */
static PyObject *
bytes_concat(PyObject *a, PyObject *b)
{
    Py_buffer va, vb;
    PyObject *result = NULL;

    va.len = -1;
    vb.len = -1;
    // Py_buffer 當中有一個指標欄位 buf 可以使用者儲存 PyBytesObject 當中位元組資料的首地址
    // PyObject_GetBuffer 函數的主要作用是將 物件 a 當中的位元組陣列賦值給 va 當中的 buf
    if (PyObject_GetBuffer(a, &va, PyBUF_SIMPLE) != 0 ||
        PyObject_GetBuffer(b, &vb, PyBUF_SIMPLE) != 0) {
        PyErr_Format(PyExc_TypeError, "can't concat %.100s to %.100s",
                     Py_TYPE(b)->tp_name, Py_TYPE(a)->tp_name);
        goto done;
    }

    /* Optimize end cases */
    if (va.len == 0 && PyBytes_CheckExact(b)) {
        result = b;
        Py_INCREF(result);
        goto done;
    }
    if (vb.len == 0 && PyBytes_CheckExact(a)) {
        result = a;
        Py_INCREF(result);
        goto done;
    }

    if (va.len > PY_SSIZE_T_MAX - vb.len) {
        PyErr_NoMemory();
        goto done;
    }
    result = PyBytes_FromStringAndSize(NULL, va.len + vb.len);
    // 下方就是將物件 a b 當中的位元組資料拷貝到新的
    if (result != NULL) {
        // PyBytes_AS_STRING 宏定義在下方當中 主要就是使用 PyBytesObject 物件當中的
        // ob_sval 欄位 也就是將 buf 資料(也就是 a 或者 b 當中的位元組資料)拷貝到 ob_sval當中
        memcpy(PyBytes_AS_STRING(result), va.buf, va.len);
        memcpy(PyBytes_AS_STRING(result) + va.len, vb.buf, vb.len);
    }

  done:
    if (va.len != -1)
        PyBuffer_Release(&va);
    if (vb.len != -1)
        PyBuffer_Release(&vb);
    return result;
}
#define PyBytes_AS_STRING(op) (assert(PyBytes_Check(op)), \
                                (((PyBytesObject *)(op))->ob_sval))

我們修改一個這個函數,在其中加入一條列印語句,然後重新編譯 python 執行結果如下所示:

Python 3.9.0b1 (default, Mar 23 2023, 08:35:33) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> b"abc" + b"edf"
In concat function: abc <> edf
b'abcedf'
>>> 

在上面的拼接函數當中會拷貝原來的兩個位元組物件,因此需要謹慎使用,一旦發生非常多的拷貝的話是非常耗費記憶體的。因此需要警惕使用迴圈內的記憶體拼接。比如對於 [b"a", b"b", b"c"] 來說,如果使用迴圈拼接的話,那麼會將 b"a" 拷貝兩次。

>>> res = b""
>>> for item in  [b"a", b"b", b"c"]:
...     res += item
...
>>> res
b'abc'
>>>

因為 b"a", b"b" 在拼接的時候會將他們分別拷貝一次,在進行 b"ab",b"c" 拼接的時候又會將 ab 和 c 拷貝一次,那麼具體的拷貝情況如下所示:

  • "a" 拷貝了一次。
  • "b" 拷貝了一次。
  • "ab" 拷貝了一次。
  • "c" 拷貝了一次。

但是實際上我們的需求是隻需要對 [b"a", b"b", b"c"] 當中的資料各拷貝一次,如果我們要實現這一點可以使用 b"".join([b"a", b"b", b"c"]),直接將 [b"a", b"b", b"c"] 作為引數傳遞,然後各自只拷貝一次,具體的實現程式碼如下所示,在這個例子當中 sep 就是空串 b"",iterable 就是 [b"a", b"b", b"c"] 。

Py_LOCAL_INLINE(PyObject *)
STRINGLIB(bytes_join)(PyObject *sep, PyObject *iterable)
{
    char *sepstr = STRINGLIB_STR(sep);
    const Py_ssize_t seplen = STRINGLIB_LEN(sep);
    PyObject *res = NULL;
    char *p;
    Py_ssize_t seqlen = 0;
    Py_ssize_t sz = 0;
    Py_ssize_t i, nbufs;
    PyObject *seq, *item;
    Py_buffer *buffers = NULL;
#define NB_STATIC_BUFFERS 10
    Py_buffer static_buffers[NB_STATIC_BUFFERS];

    seq = PySequence_Fast(iterable, "can only join an iterable");
    if (seq == NULL) {
        return NULL;
    }

    seqlen = PySequence_Fast_GET_SIZE(seq);
    if (seqlen == 0) {
        Py_DECREF(seq);
        return STRINGLIB_NEW(NULL, 0);
    }
#ifndef STRINGLIB_MUTABLE
    if (seqlen == 1) {
        item = PySequence_Fast_GET_ITEM(seq, 0);
        if (STRINGLIB_CHECK_EXACT(item)) {
            Py_INCREF(item);
            Py_DECREF(seq);
            return item;
        }
    }
#endif
    if (seqlen > NB_STATIC_BUFFERS) {
        buffers = PyMem_NEW(Py_buffer, seqlen);
        if (buffers == NULL) {
            Py_DECREF(seq);
            PyErr_NoMemory();
            return NULL;
        }
    }
    else {
        buffers = static_buffers;
    }

    /* Here is the general case.  Do a pre-pass to figure out the total
     * amount of space we'll need (sz), and see whether all arguments are
     * bytes-like.
     */
    for (i = 0, nbufs = 0; i < seqlen; i++) {
        Py_ssize_t itemlen;
        item = PySequence_Fast_GET_ITEM(seq, i);
        if (PyBytes_CheckExact(item)) {
            /* Fast path. */
            Py_INCREF(item);
            buffers[i].obj = item;
            buffers[i].buf = PyBytes_AS_STRING(item);
            buffers[i].len = PyBytes_GET_SIZE(item);
        }
        else if (PyObject_GetBuffer(item, &buffers[i], PyBUF_SIMPLE) != 0) {
            PyErr_Format(PyExc_TypeError,
                         "sequence item %zd: expected a bytes-like object, "
                         "%.80s found",
                         i, Py_TYPE(item)->tp_name);
            goto error;
        }
        nbufs = i + 1;  /* for error cleanup */
        itemlen = buffers[i].len;
        if (itemlen > PY_SSIZE_T_MAX - sz) {
            PyErr_SetString(PyExc_OverflowError,
                            "join() result is too long");
            goto error;
        }
        sz += itemlen;
        if (i != 0) {
            if (seplen > PY_SSIZE_T_MAX - sz) {
                PyErr_SetString(PyExc_OverflowError,
                                "join() result is too long");
                goto error;
            }
            sz += seplen;
        }
        if (seqlen != PySequence_Fast_GET_SIZE(seq)) {
            PyErr_SetString(PyExc_RuntimeError,
                            "sequence changed size during iteration");
            goto error;
        }
    }

    /* Allocate result space. */
    res = STRINGLIB_NEW(NULL, sz);
    if (res == NULL)
        goto error;

    /* Catenate everything. */
    p = STRINGLIB_STR(res);
    if (!seplen) {
        /* fast path */
        for (i = 0; i < nbufs; i++) {
            Py_ssize_t n = buffers[i].len;
            char *q = buffers[i].buf;
            Py_MEMCPY(p, q, n);
            p += n;
        }
        goto done;
    }
    // 具體的實現邏輯就是在這裡
    for (i = 0; i < nbufs; i++) {
        Py_ssize_t n;
        char *q;
        if (i) {
            // 首先現將 sepstr 拷貝到新的陣列裡面但是在我們舉的例子當中是空串 b""
            Py_MEMCPY(p, sepstr, seplen);
            p += seplen;
        }
        n = buffers[i].len;
        q = buffers[i].buf;
        // 然後將列表當中第 i 個 bytes 的資料拷貝到 p 當中 這樣就是實現了我們所需要的效果
        Py_MEMCPY(p, q, n);
        p += n;
    }
    goto done;

error:
    res = NULL;
done:
    Py_DECREF(seq);
    for (i = 0; i < nbufs; i++)
        PyBuffer_Release(&buffers[i]);
    if (buffers != static_buffers)
        PyMem_FREE(buffers);
    return res;
}

單位元組字元

在 cpython 的內部實現當中給單位元組的字元做了一個小的緩衝池:

static PyBytesObject *characters[UCHAR_MAX + 1]; // UCHAR_MAX 在 64 位系統當中等於 255

當建立的 bytes 只有一個字元的時候就可以檢查是否 characters 當中已經存在了,如果存在就直接返回這個已經建立好的 PyBytesObject 物件,否則再進行建立。新建立的 PyBytesObject 物件如果長度等於 1 的話也會被加入到這個陣列當中。下面是 PyBytesObject 的另外一個建立函數:

PyObject *
PyBytes_FromStringAndSize(const char *str, Py_ssize_t size)
{
    PyBytesObject *op;
    if (size < 0) {
        PyErr_SetString(PyExc_SystemError,
            "Negative size passed to PyBytes_FromStringAndSize");
        return NULL;
    }
    // 如果建立長度等於 1 而且物件在 characters 當中存在的話那麼就直接返回
    if (size == 1 && str != NULL &&
        (op = characters[*str & UCHAR_MAX]) != NULL)
    {
#ifdef COUNT_ALLOCS
        one_strings++;
#endif
        Py_INCREF(op);
        return (PyObject *)op;
    }

    op = (PyBytesObject *)_PyBytes_FromSize(size, 0);
    if (op == NULL)
        return NULL;
    if (str == NULL)
        return (PyObject *) op;

    Py_MEMCPY(op->ob_sval, str, size);
    /* share short strings */
    // 如果建立的物件的長度等於 1 那麼久將這個物件儲存到 characters 當中
    if (size == 1) {
        characters[*str & UCHAR_MAX] = op;
        Py_INCREF(op);
    }
    return (PyObject *) op;
}

我們可以使用下面的程式碼進行驗證:

>>> a = b"a"
>>> b  =b"a"
>>> a == b
True
>>> a is b
True
>>> a = b"aa"
>>> b = b"aa"
>>> a == b
True
>>> a is b
False

從上面的程式碼可以知道,確實當我們建立的 bytes 的長度等於 1 的時候物件確實是同一個物件。

總結

在本篇文章當中主要給大家介紹了在 cpython 內部對於 bytes 的實現,重點介紹了 cpython 當中 PyBytesObject 的記憶體佈局和建立 PyBytesObject 的函數,以及對於 bytes 物件的拼接細節和 cpython 內部單位元組字元的緩衝池。在程式當中最好使用 join 操作進行 btyes 的拼接操作,否則效率會比較低。


本篇文章是深入理解 python 虛擬機器器系列文章之一,文章地址:https://github.com/Chang-LeHung/dive-into-cpython

更多精彩內容合集可存取專案:https://github.com/Chang-LeHung/CSCore

關注公眾號:一無是處的研究僧,瞭解更多計算機(Java、Python、計算機系統基礎、演演算法與資料結構)知識。