Dlang 並行化

2023-06-29 06:01:01

Dlang 並行化

好難受,dlang 生態太差,沒辦法,學了半天才明白。

我儘量以精煉的語言解釋。

採用 定義,例子(程式碼),解釋 的步驟講解。

所以你可能看到很多程式碼,一點解釋……

我會省略一些 import,讓程式碼短一些

parallelism 並行

感覺好廢物,這一小部分瞭解即可。

這部分只需要會 parallelmap & amap 其實就差不多了。

介紹比較實用的幾種方法。

parallel 迭代

foreach (i; parallel(range, work_uint_size = 100)) {
    // do something here
}

其中 work_unit_size 表示最多同時執行的數量。

例子

import std.stdio, std.parallelism;
import core.thread;

struct Producer {
    void produce() {
        Thread.sleep(1.seconds);

        writeln("Process +1");
    }
};

void main() {
    auto prods = new Producer[](10);

    foreach (prod; parallel(prods)) {
        prod.produce();
    }
}

Task

建立任務:

auto theTask = task!anOperation(arguments);
// or
auto theTask = task(&someFunction, parameters...)

執行任務:theTask.executeInNewThread()

檢視是否完成:if (theTask.done) { ... }

獲取結果:auto result = theTask.yeildForce()


asyncBuf

感覺沒啥用。

並行儲存多個需要長時間製作的元素。還需要保證使用的長時間的……

例子:

struct Producer {
    int i, total;

    bool empty() const {
        return total <= i;
    }

    int front() const {
        return i;
    }

    void popFront() {
        writefln("Producing product ID: %d", i);
        Thread.sleep(1.seconds / 2);
        ++i;
    }
};

void main() {
    auto prods = Producer(0, 10);
    foreach (prod; taskPool.asyncBuf(prods, 3)) {
        writef("Got product id: %d\n", prod);
        Thread.sleep(1.seconds);
        writeln("Used product...");
    }
}

map & amap

先看例子:

int increase(int x) {
    Thread.sleep(500.msecs);
    return x + 3;
}

void main() {
    int[] nums;
    foreach (i; 0 .. 10) {
        nums ~= i;
    }

    // auto results = taskPool.map!increase(nums);
    auto results = taskPool.amap!increase(nums);
    foreach (result; results) {
        writeln(result);
    }
}

可以類比 python 中的 map

兩者的區別:

  • map 可以指定同時執行的數量,而 amap 是有多少執行多少。

  • map 會一定程度上按順序執行,而 amap 並不是順序執行,它依靠 RandomAccessRange,也就是隨機順序執行。


訊息並行

我不知道怎麼翻譯,反正就是 Message Passing Concurrency

核心方法: spawn (喚起)

我們可以形象的認為,spawn 方法可以喚起一個新的工人(執行緒)來為我們工作。

並且這個工人與主執行緒是分開的(先看程式碼後面解釋):

import std.stdio;
import std.concurrency;
import core.thread;
void worker() {
    foreach (i; 0 .. 5) {
        Thread.sleep(500.msecs);
        writeln(i, " (worker) in ", thisTid);

    }

}
void main() {
    Tid myWorkerTid = spawn(&worker);
    foreach (i; 0 .. 5) {
        Thread.sleep(300.msecs);
        writeln(i, " (main) in ", thisTid);

    }

    writeln("main is done!");
}

最終輸出:

0 (main) in Tid(7f0eb19bc0b0)
0 (worker) in Tid(7f0eb19bc000)
1 (main) in Tid(7f0eb19bc0b0)
2 (main) in Tid(7f0eb19bc0b0)
1 (worker) in Tid(7f0eb19bc000)
3 (main) in Tid(7f0eb19bc0b0)
2 (worker) in Tid(7f0eb19bc000)
4 (main) in Tid(7f0eb19bc0b0)
main is done!
3 (worker) in Tid(7f0eb19bc000)
4 (worker) in Tid(7f0eb19bc000)

實際輸出可能略有差異。

解釋

  • spawn(&worker) 喚起了一個新的執行緒執行 worker 函數,並返回了新的執行緒的 id 是一個結構體 Tid

  • thisTid 類似於一個宏,用於獲取當前所線上程的 id


傳送訊息

先看程式碼後解釋:

void worker() {
    int value = 0;
    while (value >= 0) {
        value = receiveOnly!int();
        double result = cast(double)value / 7;
        ownerTid.send(result);
    }
}

void main() {
    Tid myWorker = spawn(&worker);

    foreach (val; 0 .. 10) {
        myWorker.send(val);
        double result = receiveOnly!double();
        writefln("Send %s got %s", val, result);
    }

    myWorker.send(-1); // terminate worker process
}

最終輸出:

Send 0 got 0
Send 1 got 0.142857
Send 2 got 0.285714
Send 3 got 0.428571
Send 4 got 0.571429
Send 5 got 0.714286
Send 6 got 0.857143
Send 7 got 1
Send 8 got 1.14286
Send 9 got 1.28571

解釋

  • ownerTid 類似於一個宏,用於取得喚醒自己的執行緒的 Tid,從而傳送訊息。

  • Tid.send(...) 可以向 Tid 代表的那個執行緒傳送一條訊息。

    • 如果同時要傳送多個東西,在傳送的地方是 Tid.send(a, b, c, ...)

    • 在接受的地方要變化為 receiveOnly!(typeof(a), typeof(b), typeof(c), ...),最終得到的是一個 tuple,可以通過下標存取。

  • receiveOnly!type() 表示只接受型別為 type 的訊息。

  • 最後 myWorker.send(-1) 是根據程式碼邏輯結束的,並不屬於通法。

如果我們需要更靈活的接受方法怎麼辦?

void workerFunc() {
    bool isDone = false;
    while (!isDone) {
        void intHandler(int message) {
            writeln("handling int message: ", message);

            if (message == -1) {
                writeln("exiting");
                isDone = true;
            }
        }

        void stringHandler(string message) {
            writeln("handling string message: ", message);
        }
        
        receive(&intHandler, &stringHandler);
    }    
}

我們可以指定多種 Handler 以處理不同的資料型別。利用 receive 註冊 到處理型別訊息的函數中。


更優雅的方式

處理更多的型別:

struct Exit {}

void worker() {
    bool done = false;

    while (!done) {
        receive(
            (int message) {
                writeln("int message ", message);
            },

            (string message) {
                writeln("string message", message);
            },

            (Exit message) {
                writeln("Exit message");
                done = true;
            },

            (Variant message) {
                writeln("Unexpected message: ", message);
            }
        );
    }
}

void main() {
    Tid myWorker = spawn(&worker);

    myWorker.send(10);
    myWorker.send("hello");
    myWorker.send(10.1);
    myWorker.send(Exit());
}

主要是使用了匿名函數……

解釋

  • 利用 std.variant.Variant 以接收任何型別的資料。但是需要保證,處理所有型別資料的方法應該放在最後面,不然會導致全部被判斷成 Variant

超時接受

我們可以定一個超時時間,超過這個時間就直接返回。

先看程式碼:

struct Exit {}

void worker() {
    bool done = false;

    while (!done) {
        bool received = receiveTimeout(600.msecs,
            (Exit message) {
                writeln("Exit message");
                done = true;
            },

            (Variant message) {
                writeln("Some message: ", message);
            }
        );
        if (!received) {
            writeln("no message yet...");
        }
    }
}

void main() {
    Tid myWorker = spawn(&worker);

    myWorker.send(10);
    myWorker.send("hello");
    Thread.sleep(1.seconds);
    myWorker.send(10.1);
    myWorker.send(Exit());
}

最終輸出

Some message: 10
Some message: hello
no message yet...
Some message: 10.1
Exit message

解釋

  • receiveTimeout 只比 recieve 多了一個引數,用於指定超時時間。

  • 返回一個 bool 變數,如果為 false 則沒有接收到任何訊息。


等待所有執行緒結束thread_joinAll()

一般來說放在需要放的地方……即可。


資料共用

終於講到這裡了。

我們先考慮一個程式:

import std.stdio;
import std.concurrency;
import core.thread;

int variable;

void printInfo(string message) {
    writefln("%s: %s (@%s)", message, variable, &variable);
}

void worker() {
    variable = 42;
    printInfo("Before the worker is terminated");
}

void main() {
    spawn(&worker);
    thread_joinAll();
    printInfo("After the worker is terminated");
}

其輸出是這樣的:

Before the worker is terminated: 42 (@7F308C88C530)
After the worker is terminated: 0 (@7F308C98D730)

可以發現,同樣的變數在不同的執行緒裡面地址是不一樣的,也就是說資料是獨立的,所以要有共用。

此時我們只需要修改:

shared int variable;

即可。

實際上寫為 shared(int) variable; 會更標準,但是好麻煩……

當然,不得不說,有了訊息傳遞,那麼資料共用就是備用的方案了。


Data Race

資料競爭是一個很常見的問題。

例子

void worker(shared int* i) {
    foreach (t; 0 .. 200000) {
        *i = *i + 1;
    }
}

void main() {
    shared int i = 0;

    foreach (id; 0 .. 10) {
        spawn(&worker, &i);
    }

    thread_joinAll();
    writeln("after i to ", i);
}

期望輸出 2000000,但是實際輸出可能遠小於此。

所以我們要考慮同步:

void worker(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized {
            *i = *i + 1;
        }
    }
}

解釋

  • synchronized 會隱式地建立一個鎖,保證只有一個執行緒會持有這個鎖,並且執行這些操作。

  • 有些時候,synchronized 會使得因為等待鎖的額外開銷使得程式變慢。但有些時候,我們可以通過更好的方法避免等待的開銷,例如使用原子操作。

  • synchronized 建立的鎖只會對於這一個程式碼塊生效,不會影響到其他的程式碼塊。


共用鎖

void increase(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized {
            *i = *i + 1;
        }
    }
}

void decrese(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized {
            *i = *i - 1;
        }
    }
}

void main() {
    shared int i = 0;

    foreach (id; 0 .. 10) {
        if (id & 1) spawn(&increase, &i);
        else spawn(&decrese, &i);
    }

    thread_joinAll();
    writeln("after i to ", i);
}

期望輸出 0 但是實際輸出……不知道。所以我們需要共用鎖:

synchronized (lock_object) {
    // ...
}

修改後的程式碼

class Lock {}
shared Lock lock = new Lock();

void increase(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized (lock) {
            *i = *i + 1;
        }
    }
}

void decrese(shared int* i) {
    foreach (t; 0 .. 200000) {
        synchronized (lock) {
            *i = *i - 1;
        }
    }
}

現在就可以得到正確的答案了。


同步類

我們可以使用 synchronized 修飾一個類。這相當於在每一個程式碼塊裡面巢狀一個 synchronzied

synchronized class Cls {
    void func() {
        // ...
    }
}

上面的等價於:

class Cls {
    void func() {
        synchronized (this) {
            // ...
        }
    }
}

同步初始化

我們考慮這份程式碼:

static this() {
    writeln("executing static this()");
}

void worker() {
}
void main() {
    spawn(&worker);
    thread_joinAll();
}

最終會輸出兩次 executing static this()

如果我們修改為 shared static this() { ... },那麼最終只會輸出一次。


原子操作

需要用到 core.atomic 庫。

有程式碼:

atomic!"+="(var, x);
atomic!"-="(var, x);
// ... like *= /= ^= ...

這些都是原子操作。

有方法:

shared(int) *value;
bool is_mutated = cas(value, currentValue, newValue);

如果返回 true,那麼值會改變,否則沒有。

原子操作一般來說快於 synchronized

同時,原子操作也可以作用於結構體上,這裡不作為講解。

更多操作可以參考標準庫:

  • core.sync.barrier

  • core.sync.condition

  • core.sync.config

  • core.sync.exception

  • core.sync.mutex

  • core.sync.rwmutex

  • core.sync.semaphore