好難受,dlang 生態太差,沒辦法,學了半天才明白。
我儘量以精煉的語言解釋。
採用 定義,例子(程式碼),解釋 的步驟講解。
所以你可能看到很多程式碼,一點解釋……
我會省略一些
import
,讓程式碼短一些
parallelism
並行感覺好廢物,這一小部分瞭解即可。
這部分只需要會
parallel
和map & amap
其實就差不多了。
介紹比較實用的幾種方法。
parallel
迭代foreach (i; parallel(range, work_uint_size = 100)) {
// do something here
}
其中 work_unit_size
表示最多同時執行的數量。
例子:
import std.stdio, std.parallelism;
import core.thread;
struct Producer {
void produce() {
Thread.sleep(1.seconds);
writeln("Process +1");
}
};
void main() {
auto prods = new Producer[](10);
foreach (prod; parallel(prods)) {
prod.produce();
}
}
建立任務:
auto theTask = task!anOperation(arguments);
// or
auto theTask = task(&someFunction, parameters...)
執行任務:theTask.executeInNewThread()
檢視是否完成:if (theTask.done) { ... }
獲取結果:auto result = theTask.yeildForce()
感覺沒啥用。
並行儲存多個需要長時間製作的元素。還需要保證使用的長時間的……
例子:
struct Producer {
int i, total;
bool empty() const {
return total <= i;
}
int front() const {
return i;
}
void popFront() {
writefln("Producing product ID: %d", i);
Thread.sleep(1.seconds / 2);
++i;
}
};
void main() {
auto prods = Producer(0, 10);
foreach (prod; taskPool.asyncBuf(prods, 3)) {
writef("Got product id: %d\n", prod);
Thread.sleep(1.seconds);
writeln("Used product...");
}
}
先看例子:
int increase(int x) {
Thread.sleep(500.msecs);
return x + 3;
}
void main() {
int[] nums;
foreach (i; 0 .. 10) {
nums ~= i;
}
// auto results = taskPool.map!increase(nums);
auto results = taskPool.amap!increase(nums);
foreach (result; results) {
writeln(result);
}
}
可以類比 python
中的 map
。
兩者的區別:
map
可以指定同時執行的數量,而 amap
是有多少執行多少。
map
會一定程度上按順序執行,而 amap
並不是順序執行,它依靠 RandomAccessRange
,也就是隨機順序執行。
我不知道怎麼翻譯,反正就是
Message Passing Concurrency
。
核心方法: spawn
(喚起)
我們可以形象的認為,spawn
方法可以喚起一個新的工人(執行緒)來為我們工作。
並且這個工人與主執行緒是分開的(先看程式碼後面解釋):
import std.stdio;
import std.concurrency;
import core.thread;
void worker() {
foreach (i; 0 .. 5) {
Thread.sleep(500.msecs);
writeln(i, " (worker) in ", thisTid);
}
}
void main() {
Tid myWorkerTid = spawn(&worker);
foreach (i; 0 .. 5) {
Thread.sleep(300.msecs);
writeln(i, " (main) in ", thisTid);
}
writeln("main is done!");
}
最終輸出:
0 (main) in Tid(7f0eb19bc0b0)
0 (worker) in Tid(7f0eb19bc000)
1 (main) in Tid(7f0eb19bc0b0)
2 (main) in Tid(7f0eb19bc0b0)
1 (worker) in Tid(7f0eb19bc000)
3 (main) in Tid(7f0eb19bc0b0)
2 (worker) in Tid(7f0eb19bc000)
4 (main) in Tid(7f0eb19bc0b0)
main is done!
3 (worker) in Tid(7f0eb19bc000)
4 (worker) in Tid(7f0eb19bc000)
實際輸出可能略有差異。
解釋:
spawn(&worker)
喚起了一個新的執行緒執行 worker
函數,並返回了新的執行緒的 id
是一個結構體 Tid
。
thisTid
類似於一個宏,用於獲取當前所線上程的 id
。
先看程式碼後解釋:
void worker() {
int value = 0;
while (value >= 0) {
value = receiveOnly!int();
double result = cast(double)value / 7;
ownerTid.send(result);
}
}
void main() {
Tid myWorker = spawn(&worker);
foreach (val; 0 .. 10) {
myWorker.send(val);
double result = receiveOnly!double();
writefln("Send %s got %s", val, result);
}
myWorker.send(-1); // terminate worker process
}
最終輸出:
Send 0 got 0
Send 1 got 0.142857
Send 2 got 0.285714
Send 3 got 0.428571
Send 4 got 0.571429
Send 5 got 0.714286
Send 6 got 0.857143
Send 7 got 1
Send 8 got 1.14286
Send 9 got 1.28571
解釋:
ownerTid
類似於一個宏,用於取得喚醒自己的執行緒的 Tid
,從而傳送訊息。
Tid.send(...)
可以向 Tid
代表的那個執行緒傳送一條訊息。
如果同時要傳送多個東西,在傳送的地方是 Tid.send(a, b, c, ...)
。
在接受的地方要變化為 receiveOnly!(typeof(a), typeof(b), typeof(c), ...)
,最終得到的是一個 tuple
,可以通過下標存取。
receiveOnly!type()
表示只接受型別為 type
的訊息。
最後 myWorker.send(-1)
是根據程式碼邏輯結束的,並不屬於通法。
如果我們需要更靈活的接受方法怎麼辦?
void workerFunc() {
bool isDone = false;
while (!isDone) {
void intHandler(int message) {
writeln("handling int message: ", message);
if (message == -1) {
writeln("exiting");
isDone = true;
}
}
void stringHandler(string message) {
writeln("handling string message: ", message);
}
receive(&intHandler, &stringHandler);
}
}
我們可以指定多種 Handler
以處理不同的資料型別。利用 receive
註冊 到處理型別訊息的函數中。
處理更多的型別:
struct Exit {}
void worker() {
bool done = false;
while (!done) {
receive(
(int message) {
writeln("int message ", message);
},
(string message) {
writeln("string message", message);
},
(Exit message) {
writeln("Exit message");
done = true;
},
(Variant message) {
writeln("Unexpected message: ", message);
}
);
}
}
void main() {
Tid myWorker = spawn(&worker);
myWorker.send(10);
myWorker.send("hello");
myWorker.send(10.1);
myWorker.send(Exit());
}
主要是使用了匿名函數……
解釋:
std.variant.Variant
以接收任何型別的資料。但是需要保證,處理所有型別資料的方法應該放在最後面,不然會導致全部被判斷成 Variant
。我們可以定一個超時時間,超過這個時間就直接返回。
先看程式碼:
struct Exit {}
void worker() {
bool done = false;
while (!done) {
bool received = receiveTimeout(600.msecs,
(Exit message) {
writeln("Exit message");
done = true;
},
(Variant message) {
writeln("Some message: ", message);
}
);
if (!received) {
writeln("no message yet...");
}
}
}
void main() {
Tid myWorker = spawn(&worker);
myWorker.send(10);
myWorker.send("hello");
Thread.sleep(1.seconds);
myWorker.send(10.1);
myWorker.send(Exit());
}
最終輸出:
Some message: 10
Some message: hello
no message yet...
Some message: 10.1
Exit message
解釋:
receiveTimeout
只比 recieve
多了一個引數,用於指定超時時間。
返回一個 bool
變數,如果為 false
則沒有接收到任何訊息。
等待所有執行緒結束:thread_joinAll()
。
一般來說放在需要放的地方……即可。
終於講到這裡了。
我們先考慮一個程式:
import std.stdio;
import std.concurrency;
import core.thread;
int variable;
void printInfo(string message) {
writefln("%s: %s (@%s)", message, variable, &variable);
}
void worker() {
variable = 42;
printInfo("Before the worker is terminated");
}
void main() {
spawn(&worker);
thread_joinAll();
printInfo("After the worker is terminated");
}
其輸出是這樣的:
Before the worker is terminated: 42 (@7F308C88C530)
After the worker is terminated: 0 (@7F308C98D730)
可以發現,同樣的變數在不同的執行緒裡面地址是不一樣的,也就是說資料是獨立的,所以要有共用。
此時我們只需要修改:
shared int variable;
即可。
實際上寫為
shared(int) variable;
會更標準,但是好麻煩……
當然,不得不說,有了訊息傳遞,那麼資料共用就是備用的方案了。
資料競爭是一個很常見的問題。
例子:
void worker(shared int* i) {
foreach (t; 0 .. 200000) {
*i = *i + 1;
}
}
void main() {
shared int i = 0;
foreach (id; 0 .. 10) {
spawn(&worker, &i);
}
thread_joinAll();
writeln("after i to ", i);
}
期望輸出 2000000
,但是實際輸出可能遠小於此。
所以我們要考慮同步:
void worker(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized {
*i = *i + 1;
}
}
}
解釋:
synchronized
會隱式地建立一個鎖,保證只有一個執行緒會持有這個鎖,並且執行這些操作。
有些時候,synchronized
會使得因為等待鎖的額外開銷使得程式變慢。但有些時候,我們可以通過更好的方法避免等待的開銷,例如使用原子操作。
synchronized
建立的鎖只會對於這一個程式碼塊生效,不會影響到其他的程式碼塊。
void increase(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized {
*i = *i + 1;
}
}
}
void decrese(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized {
*i = *i - 1;
}
}
}
void main() {
shared int i = 0;
foreach (id; 0 .. 10) {
if (id & 1) spawn(&increase, &i);
else spawn(&decrese, &i);
}
thread_joinAll();
writeln("after i to ", i);
}
期望輸出 0
但是實際輸出……不知道。所以我們需要共用鎖:
synchronized (lock_object) {
// ...
}
修改後的程式碼:
class Lock {}
shared Lock lock = new Lock();
void increase(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized (lock) {
*i = *i + 1;
}
}
}
void decrese(shared int* i) {
foreach (t; 0 .. 200000) {
synchronized (lock) {
*i = *i - 1;
}
}
}
現在就可以得到正確的答案了。
我們可以使用 synchronized
修飾一個類。這相當於在每一個程式碼塊裡面巢狀一個 synchronzied
:
synchronized class Cls {
void func() {
// ...
}
}
上面的等價於:
class Cls {
void func() {
synchronized (this) {
// ...
}
}
}
我們考慮這份程式碼:
static this() {
writeln("executing static this()");
}
void worker() {
}
void main() {
spawn(&worker);
thread_joinAll();
}
最終會輸出兩次 executing static this()
。
如果我們修改為 shared static this() { ... }
,那麼最終只會輸出一次。
需要用到
core.atomic
庫。
有程式碼:
atomic!"+="(var, x);
atomic!"-="(var, x);
// ... like *= /= ^= ...
這些都是原子操作。
有方法:
shared(int) *value;
bool is_mutated = cas(value, currentValue, newValue);
如果返回 true
,那麼值會改變,否則沒有。
原子操作一般來說快於
synchronized
。同時,原子操作也可以作用於結構體上,這裡不作為講解。
更多操作可以參考標準庫:
core.sync.barrier
core.sync.condition
core.sync.config
core.sync.exception
core.sync.mutex
core.sync.rwmutex
core.sync.semaphore