MPI簡談

2022-07-31 06:02:01

MPI簡談


MPI是分散式記憶體系統,區別於OpenMP和Pthreads的共用記憶體系統。MPI是一種基於訊息傳遞的並行程式設計技術,是如今最為廣泛的並行程式開發方法。

MPI前世今生

MPI(Message Passing Interface,簡稱MPI)是一種程式設計介面標準,不是一種具體的程式語言。

  • 1992年開始起草
  • 1994年釋出第一個版本MPI-1
  • 1997年釋出第二個版本MPI-2
  • 成為訊息傳遞並行程式設計標準,也是最為流行的並行程式設計介面

MPI實現小記

MPI是一個標準,不是語言。

  1. MPICH
  1. Intel MPI
  • Intel MPI下載
  • Intel MPI是符合MPI-2標準的MPI實現。
  • Intel MPI在通訊協定的選擇上無需進行額外設定,可自動選擇MPI程序間最快的傳輸協定。

MPI特點

聚是一團火,散如滿天星

  • 基於訊息傳遞的並行程式,也就是所謂的分散式記憶體心痛,其中的每個程序之間具有自己獨立的堆疊和程式碼段,作為互不相關的多個程式執行,程序之間的資訊互動完全通過顯式的呼叫通訊函數來實現。

  • SPMD(Single Program Multiple Data)單程式多資料,使用一個程式來處理多個不同的資料集來達到並行的目的

  • MPMD(Multiple Program Multiple Data)多程式多資料,使用不同的程式處理多個資料集,合作求解同一個問題

SPMD

本質上就是對於不同的資料集合都是一樣的處理,在序列中,如果此時有一個資料集A,那麼就是將該操作對資料集A全部執行過去。

那麼在SPMD中,比如現在有三個獨立的程序,那麼就是將資料集A均等拆分成三份a1,a2,a3分給這三個程序(在SPMD中,每一個程序會有一個程序號rank,用來相互區別),等到每個程序處理完任務後,再通過訊息傳遞的辦法來收集處理資料處理的結果。MPI標準正是為了實現最後的訊息傳遞提供標準和實現方法。

MPMD

MPMD目前有三種典型的執行模型

  1. 管理者(Master)/工人(Worker)型別
  • 由一個管理者程式來控制整個程式的執行
  • 管理者程式負責將不同的任務分配給多個工人程式來完成工作
  1. 聯合資料分析型別
  • 不同的程式各自獨立的完成自己的任務,在特定的時候交換資料
  • 耦合性最少,通訊少,較為容易獲得更好的並行加速效果(相對於序列來說)
  1. 流式型別
  • 和工廠中的流水線類似,假設有三個程序,那麼對於一個任務來說,程序1完成後的輸出作為程序2的輸入,同理,程序2的輸出作為程序3的輸出,不同的執行緒之間構成的實際上是序列關係,但是當資料量足夠大的時候,我們其實可以把這種關係當作並行關係,也就是任務1,2,3近似的同時開始,同時結束(不考慮是否為同一個任務)

編寫並行程式是為了利用冗餘硬體(例如多核,多處理器或多機)提高應用效能。

MPICH誕生之旅

眾裡尋他千百度。驀然回首,那人卻在,燈火闌珊處。

筆者下載安裝MPICH走了好多彎路,謹以此文紀念。

溫馨提示:接下來的教學只適用於windowslinux筆者可不負責哦。

安裝網址推薦

MPICH-2,點選去官網安裝,或者複製下面的連結去https://www.mpich.org/static/downloads/1.4.1p1/安裝。

進入到的介面後尋找mpich2-1.4.1p1-win-x86-64.msi,點選下載就可以了。下載後會得到一個檔案,但是注意這時候將其移動到桌面,

開始安裝

注意,不要直接點選安裝,這裡的推薦是以管理員身份安裝。右鍵進行管理員安裝,但是win10沒有這個操作,這裡推薦是左下角windows圖示,右鍵,然後找Windows Powershell(管理員)或者就是找到管理員就可以了。進入介面按照筆者的順序輸入命令即可(注意需要保證前面的msi檔案已經放在桌面了,不然就需要自己cd去找了,這裡熟悉dos系統的可以跳過)

cd..
cd..
cd users/username/desktop
msiexec /package mpich2-1.4.1p1-win-x86-64.msi

一共輸入四個命令,注意第三個命令cd users/username/desktop中的username需要替換成自己的使用者名稱,也就是開機進入的自己當初起的名字。第四個命令的 mpich2-1.4.1p1-win-x86-64.msi是當初下載下來的檔案名字,如果不是這個名字記得自己改下名字。

進入安裝

如果前面的操作沒有問題,那麼現在已經執行了安裝程式。

注意兩點:

  • 注意不要點的太快,中間有一個process manager setup介面,那裡需要輸入自己的開機密碼(針對筆記型電腦),不是預設的behappy,筆者沒有嘗試過,但是聽說直接behappy最後安裝的結果也是蠻happy的。

  • 之後會有一個just for mefor everyone的選項,預設是just for me,這裡推薦改成for everyone,這裡所有的修改,都是為了之後安裝的順利。

  • 最後一直按next就行了,如果不想要在預設的資料夾,自己修改的話,那麼注意自己找好想放的位置就可以了。這裡預設路徑為C:/program files/mpich2

繼續出發

這裡同時按住win+R,在裡面輸入cmd,按回車,喚出dos系統。

然後依次輸入以下命令(本質上就是找到那個mpich2檔案)

cd/.
cd program files
cd mpich2
cd bin
smpd -status

這裡如果出現smpd running on ...的資訊,就說明smpd是執行的,安裝可以。不行的話,可能需要重新安裝qaq。

設定mpiexec

使用mpiexec我們需要先註冊賬戶

接下來在資料夾中找到moich2資料夾,點進去找到bin,然後找到wmpiregisterexe檔案點選進行註冊。當然如果之前的命令列視窗沒有關閉,也可以輸入mpiexec -register進行,是一樣的,不過這個有圖形化介面,推薦用圖形化介面,好看一點。

同樣是設定賬戶和密碼,這裡的賬戶就是鎖屏之後電腦上顯示的你的使用者名稱字,密碼就是鎖屏密碼。

註冊完畢之後,可以在dos視窗下輸入該命令:

mpiexec -validate

如果系統的反應是'SUCCESS',那麼就說明你註冊成功了,否則需要重新開始註冊。

環境變數的設定

  • 找到我的電腦,右鍵找到屬性,點進去,然後找到高階系統設定,點選進去,裡面的視窗中有環境變數點選進去,這裡有自己的使用者變數和系統變數,我們選擇在系統變數加入。

  • 在裡面找到Path,點選,然後新建一個路徑,將剛才的mpich2裡面的bin檔案新增進去,如果安裝路徑是前面所說的,那麼就是

    C:/program files/mpich2/bin

    加入即可。

  • 注意此時需要推出dos系統重新按照之前的策略,找到bin,或者輸入下面的命令。

cd /.
cd program files
cd mpich2
cd examples

可以自行執行裡面的cpi.exe檔案看看是否正常。

MPI落地

如果前面的方法都沒有用,沒有關係。我們這邊直接來一個微型mpi環境的搭建,一切為了執行自己的第一個mpi程式。

通用操作:

mpi下載

進入官網以後點選download,將裡面的msmpisetup.exemsmpisdk.msi都下載下來。這裡預設下載下來以後一直點的是確認,沒有修改安裝地址,如果有修改,根據筆者的程式碼自行修改地址那邊的資訊。

mingw64-8.1.0

注意往下拉,找到x86_64-posix-seh,點選下載,最好是64位元的,好像32位元不是很匹配。以及環境變數的設定,一樣找到我的電腦(此電腦),右鍵選擇屬性,然後找到高階系統設定,然後找到環境變數,點進去。

在使用者變數和系統變數找到path,然後剛才下載下來的x86_64-posix-seh,解壓完成之後,找到裡面的資料夾bin,比如筆者的就是C:\Program Files\mingw64\bin,也就是我將mingw64放在了C槽的program files下,然後在每個path下加入就可以了。

檢驗的過程就是仍然是同時按win+R,喚出dos系統,輸入gcc --version,如果在一大堆英文中看到8.1.0就說明安裝以及設定環境變數成功了。

vscode

vscode

一進宮

推薦去官網下載,點選download即可,裡面的設定可以放心選擇預設。如果下載完vscode,可以喚出dos系統,然後輸入code -v,如果第一行是版本號,第三行是x64說明安裝成功了。

二進宮

這裡第一次開啟vscode是英文介面,不熟悉的同學沒有關係,往右邊的懸浮視窗尋找,找到第五個擴充套件,或者直接Ctrl+Shift+X,在裡面輸入Chinese,然後下載應用就可以了,記得做完退出再進來就是中文介面了。

三進宮

接下來一樣的操作,仍然是在拓展視窗,這時候我們搜尋code runner,下載並且應用,點確認就可以了。然後依然是退出再進去。

我不來啦

最後一次進去,點選拓展介面,這時候應該能看到code runner在你的搜尋欄下面,這時候右鍵,找到拓展設定點選,然後在裡面找到這個Code-runner:Executor Map,如果覺得麻煩,直接Ctrl+F,然後再那個框中貼上Code-runner:Executor Map,按個回車就能找到了,然後點選下面的在setting.json中編輯,找到裡面的:

"c":"...."
"cpp":"...."

我們要修改的就是這個。

造宮殿

將原先的替換成下面的:

"c": "cd $dir && gcc $fileName -o $fileNameWithoutExt -fopenmp -l msmpi -L \"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Lib\\x64\" -I \"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Include\" && mpiexec -n 4 $fileNameWithoutExt",
"cpp": "cd $dir && g++ $fileName -o $fileNameWithoutExt -fopenmp -l msmpi -L \"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Lib\\x64\" -I \"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Include\" && mpiexec -n 4 $fileNameWithoutExt",

注意這裡面的MS-MPI庫的位置(\"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Lib\\x64\"\"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Include\"),尤其注意"\之前都要加上跳脫符號\,否則vscode後面執行的時候會出現no this file or directionary,這邊也可以按照自己之前下載的位置找到x64Include對應替換就可以了

執行自己的第一個mpi程式

拷貝下面程式,然後在vscode上執行,記著點選右上角的三角,或者直接Ctrl+Alt+N,就可以測試自己的mpi環境搭建的如何了。

#include <iostream>
#include <mpi.h>
#include <stdint.h>
using namespace std;

int main(int argc, char* argv[]) {
  cout << "hello" << endl;
  int myid, numprocs;
  int namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &myid);
  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
  MPI_Get_processor_name(processor_name, &namelen);
  cout << "Hello World! Process" << myid << "of" << numprocs << " on " << processor_name << endl;
  MPI_Finalize();
  return 0;
}

如果輸出為

hello
hello
hello
hello
Hello World! Process0of4 on ...
Hello World! Process1of4 on ...
Hello World! Process2of4 on ...
Hello World! Process3of4 on ...

這裡的...每個人可能不一樣,應該是硬體的一種編號,大致類似就可以了。接下來就可以mpi的探索之旅啦。

DevCPP

又是我們夢開始的dev,不過因為目前dev的mingw64版本是5.11,編譯mpi程式的時候會有錯誤,所以我們要給自己的dev升級一下。

告別往昔

點選Tools,在點選Compiler Options,進入編譯器選項介面後,右上角從左到右點選第三個Add a compiler set by folder,然後找到前面下載的mingw64資料夾,點選確定。然後再點從左到右第四個(向右的箭頭)Rename the selected compiler set,為自己新搭建的編譯器起個名字。注意,如果以後想要換回以前的編譯環境,可以往上面的選單欄中找到help,往下一行最右邊,就會看到剛才自己為編譯器起的名字,這時候點選,然後選擇裡面的第一個就是以前自己的編譯器環境,以後切換可以在這邊實現。或者就是在剛才的Compiler Options裡面的第一欄點選選擇就可以了。

浴火重生

Add the following commands when calling the compiler裡面新增下列程式碼:

-L "C:\Program Files (x86)\Microsoft SDKs\MPI\Lib\x64" -I "C:\Program Files (x86)\Microsoft SDKs\MPI\Include"

注意這裡面的MS-MPI庫的位置("C:\Program Files (x86)\Microsoft SDKs\MPI\Lib\x64"和`"C:\Program Files ,這邊也可以按照自己之前下載的位置找到x64Include對應替換就可以了

注意修改之前先在之前打勾,不然沒法選擇。

接著就是在下面的Add the following commands when calling the linker中改成下面的語句:

-static-libgcc -fopenmp -l msmpi

這裡的-fopenmp是為了執行omp,而-l msmpi是為了執行mpi,根據自己的需要自行選擇。

偷懶是一輩子的事情

這時候理論上可以開始執行自己的第一個mpi程式了,但是還不夠方便,因為此時編譯執行是沒有並行的感覺,所以我們還要進行一步操作。

點選Tools,再點選Configure Tools,點選裡面的Add,下面的內容copy即可

標題 需要填寫的內容
Title MPI RUN FOR 4
Program C:\Windows\System32\cmd.exe
Working Directory C:\Windows\System32\
Parameters /c cd/d <PROJECTPATH> & mpiexec -n 4 <EXENAME> & "<EXECPATH>ConsolePauser.exe"

這邊的parameters裡面的數位4就是我們同時執行的執行緒有多少,這裡可以根據自己的需要自行修改

執行自己的第一個mpi程式

拷貝下面程式,然後在dev執行,注意先編譯,然後點選Tools,然後選擇裡面的Package Manager,點選自己剛剛搭建的MPI RUN FOR 4,就可以測試自己的mpi環境搭建的如何了。

#include <iostream>
#include <mpi.h>
#include <stdint.h>
using namespace std;

int main(int argc, char* argv[]) {
  cout << "hello" << endl;
  int myid, numprocs;
  int namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &myid);
  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
  MPI_Get_processor_name(processor_name, &namelen);
  cout << "Hello World! Process" << myid << "of" << numprocs << " on " << processor_name << endl;
  MPI_Finalize();
  return 0;
}

如果輸出為

hello
hello
hello
hello
Hello World! Process0of4 on ...
Hello World! Process1of4 on ...
Hello World! Process2of4 on ...
Hello World! Process3of4 on ...

這裡的...每個人可能不一樣,應該是硬體的一種編號,大致類似就可以了。接下來就可以mpi的探索之旅啦。

MPI程式設計基礎

開始MPI語法的學習苦旅

關於int main(int argc, char* argv[])的解釋

注意main函數本質上只是一個程式執行的入口而已,平常我們使用scanf函數之類的,都是在執行的時候傳入引數,那麼有沒有方法在程式啟動的時候就傳遞引數呢,這裡我們就要用到int main(int argc, char* argv)

argc引數和argv引數

#include<stdio.h>

int main(int argc, char* argv[]) {
  printf("argc = %d\n", argc);
  printf("%s\n", *argv);
}

執行上面的程式,我們會發現這邊的結果為

argc = 1
C:/.../Untitled1.exe

argc代表了我們的命令列有1個字串,而這個字串就是C:/.../Untitled1.exe

所有我們就可以通過argc和argv這樣的關係來進行命令列的輸入

傳遞引數的方法

通過下面的格式傳遞:

程式名.exe 字串1 字串2 ...

#include<stdio.h>
#include<string.h>

int main(int argc, char* argv[]) {
  printf("argc = %d\n", argc);

  argv++;
  while(*argv) {
  	if(strcmp(*argv, "a") == 0) {
  		argv++;
  		printf("a\n");
	}else{
	    argv++;
	    printf("wrong\n");
	}
  }
  return 0;
}

輸入以上程式,編譯,假設此時產生的可執行檔案叫做a.exe,那麼注意此時是喚出cmd視窗,找到這個檔案所在的位置,然後輸入

a.exe a A w

就會得到一下的輸出

argc = 4
a
wrong
wrong

所以如果需要程式帶引數地啟動的時候,就是用int main(int argc, char* argv[]),僅此而已。

MPI程式引入

仍然是Hello World

#include<stdio.h>
#include "mpi.h"

int main(int argc, char* argv[]) {
  int rank;
  int size;
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
  printf("Hello World from process %d of %d\n", rank, size);
  MPI_Finalize();
  return 0;
}

這裡不使用argc,argv也是可以的

#include<stdio.h>
#include "mpi.h"

int main() {
  int rank;
  int size;
  MPI_Init(NULL, NULL);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
  printf("Hello World from process %d of %d\n", rank, size);
  MPI_Finalize();
  return 0;
}

這裡執行的結果是:

Hello World from process 2 of 4
Hello World from process 1 of 4
Hello World from process 0 of 4
Hello World from process 3 of 4

這裡筆者是4個程序,分別列印他們各自的編號,注意這邊的順序有很多種,並沒有固定的順序,因為他們是並行的,誰快,誰就先佔用列印裝置,僅此而已。

MPI四大護法

首先,想要執行mpi,很明顯
#include"mpi.h"是顯然必要的。

  • MPI_Init和MPI_Finalize

函數 作用
MPI_Init 用來初始化MPI執行環境,建立多個MPI之間的聯絡,為後續通訊做準備
MPI_Finalize 結束MPI執行環境

MPI_Init和MPI_Finalize配套使用,用來定義mpi程式的並行區。一般只有在這兩個定義的區域之內呼叫mpi函數,同時配套使用。

如果在並行區域之外有其他的行為執行,那麼不同於OpenMP,大部分MPI實現 會在各個並行程序之間獨立地執行相應地程式碼。

#include"mpi.h"
#include<stdio.h>

int main() {
  MPI_Init(NULL, NULL);
  MPI_Finalize();
  printf("Hello World\n");
  return 0;
}

/*
output:
Hello World
Hello World
Hello World
Hello World
*/
  • C語言中的MPI_Init需要提供argc和argv引數,如果沒有,寫成NULL就可以了,二MPI_Finalize函數不需要提供引數。二者的返回值都是int型別,標識函數是否呼叫成功。

  • 總的來說就是一下的呼叫形式

    • MPI_Init(&argc, &argv);
    • MPI_Init(NULL, NULL);
    • MPI_Finalize();
  • MPI_Comm_rank

MPI_Comm_rank就是表示各個MPI程序的,使用的時候需要提供兩個函數引數:

  • MPI_Comm型別的通訊域,標識參與計算的MPI行程群組。MPI_COMM_WORLD是MPI實現預先定義好的行程群組,指的是所有MPI程序所在的行程群組,如果想要申請自己的行程群組,則需要通過MPI_Comm定義並通過其他MPI函數生成。
  • 整型指標,返回程序在相應行程群組中的程序號。即需要將rank存放的地址了,本質上可以認為同scanf的引數類似
  • MPI還會預先定義一個行程群組MPI_COMM_SELF,只包含自己的行程群組,因此裡面的編號都是0
#include<stdio.h>
#include"mpi.h"

int main() {
  int r1, r2;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &r1);
    MPI_Comm_rank(MPI_COMM_SELF, &r2);
    printf("%d %d\n", r1, r2);
  MPI_Finalize();
  return 0;
}
  • MPI_Comm_size

本函數表示相應行程群組之間有多少個程序。其返回的也是整型值,同樣需要兩個引數:

  • MPI_Comm型別的通訊域,標識參與計算的MPI行程群組,與上面類似,這裡就是MPI_COMM_WORLD

  • 整型指標,返回相應行程群組中的程序數

#include<stdio.h>
#include"mpi.h"

int main() {
  int r1, r2, s1, s2;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &r1);
    MPI_Comm_rank(MPI_COMM_SELF, &r2);
    MPI_Comm_size(MPI_COMM_WORLD, &s1);
    MPI_Comm_size(MPI_COMM_SELF, &s2);
    printf("world %d of %d, self %d of %d\n", r1, s1, r2, s2);
  MPI_Finalize();
  return 0;
}

MPI的對等通訊

對等通訊時MPI程式設計的基礎。接下來將引入兩個重要的MPI函數MPI_SendMPI_Recv

先給程式碼,注意這邊的如果格式化(printf)的%d %s之類的漏掉的話,會發生通訊錯誤。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main(int argc, char* argv[]) {
  int myid, numprocs, next, namelen;
  char buffer[BUFLEN], processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Status status;
  MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(processor_name, &namelen);

    printf("Process %d on %s\n", myid, processor_name);
    printf("Process %d of %d\n", myid, numprocs);
    memset(buffer, 0, BUFLEN*sizeof(char));
    if(myid == numprocs-1)
      next = 0;
    else
      next = myid+1;
    if(myid == 0)
    {
        strcpy(buffer, "hello there");
        printf("%d sending '%s'\n", myid, buffer);
        fflush(stdout);
        MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
        printf("%d reveiving\n", myid);
        fflush(stdout);
        MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
        printf("%d received '%s'\n", myid, buffer);
        fflush(stdout);
    }else{
        printf("%d receiving\n", myid);
        fflush(stdout);
        MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
        printf("%d received '%s'\n", myid, buffer);
        fflush(stdout);
        MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
        printf("%d sent '%s'\n", myid, buffer);
        fflush(stdout);
    }
    MPI_Finalize();
    return 0;
}

這裡筆者的輸出為

Process 2 on ...
Process 2 of 4
2 receiving
Process 0 on ...
Process 0 of 4
0 sending 'hello there'
Process 1 on ...
Process 1 of 4
1 receiving
Process 3 on ...
Process 3 of 4
3 receiving
0 reveiving
1 received 'hello there'
1 sent 'hello there'
2 received 'hello there'
2 sent 'hello there'
3 received 'hello there'
3 sent 'hello there'
0 received 'hello there'

接下來逐步拆解上面的程式

四劍客

MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Get_processor_name(processor_name, &namelen);

這四個語句所執行的都是初始化操作,其中一個新成員MPI_Get_processor_name是用來取得執行本程序的機器名稱,該名稱放在processor_name中,其長度為namelen,同時MPI_MAX_PROCESSOR_NAME是記錄機器名的最大長度的。

MPI_Get_processor_name
  • 注意MPI_Get_processor_name的用法。

  • MPI_Get_processor_name(processor_name, &namelen)

這裡後面的程式碼

if(myid == numprocs - 1)
    next = 0;
else
    next = myid + 1;

目的是為了告訴程序號他們下一個程序號是多少,注意這是一個迴圈,最後一個程序號的下一個程序號是0。所以這裡的程式碼也可以是next = (myid + 1) % numprocs;,至於寫哪一種就看各自的選擇了。

fflush

如今windows下的stdout變成及時輸出,所以一般來說適用不適用fflush也看不出太大的區別了。

注意,平時使用的printf函數並不是直接列印到螢幕上,而是先傳送到stdout(此時的stdout類似緩衝區)中,再由stdout傳送到螢幕上。

那麼假設現在stdout直到遇到\n才會進行列印輸出,那麼假設程序1傳送hello給到stdout,然後這時候切換到程序2,程序2傳送hello world\n給stdout,此時列印到螢幕上的就是

hellohello world

很明顯第一個明明是程序1的,但是在我們看來是執行程序2列印出來的,為了解決這個問題,我們就要使用fflush(stdout),它的作用就是立即將所有內容傳送到指定輸出裝置上(清空緩衝區)。一般在多執行緒的輸出中使用。

接下來主角登場

MPI_Send

  • MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
  • MPI_Send函數的標準形式是
    int MPI_SEND(buf, count, datatype, dest, tag, comm)

其中,輸入引數包括:

輸入引數 作用
buf 傳送緩衝區的起始地址,可以是各種陣列或結構的指標
count 整型,傳送的資料個數,應為非負整數(感覺類似指標的偏移量)
datatype 傳送資料的資料型別
dest 應該為整數,表示目的程序號,即destination
tag 應該為整數,訊息標誌
comm MPI行程群組所在的通訊域(應該是傳送的哪個程序號所在的通訊域)
  • 該函數的作用就是向通訊域comm中的dest程序傳送資料。訊息資料存放在buf中,型別是datatype,個數是count個。這個訊息的標誌是tag,用以和本程序向同意目的程序傳送的其他訊息區別開來。

對於具體的MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD)的解釋

在通訊域MPI_COMM_WORLD內,向程序號next傳送資訊。傳送的是buffer裡面的所有資料,資料型別就是MPI_CHAR(因為buffer儲存的是char型別的資料,MPI_CHAR是MPI的預定義資料型別,和char一一對應),MPI_Send的引數都是輸入引數,沒有輸出引數

MPI_Recv

  • MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
  • MPI_Recv的標準形式就是:int MPI_Recv(buf, count, datatype, source, tag, comm, status);
  • MPI_Recv中的buffer和status是輸出引數,其他的都是輸入引數

其中的引數包括:

引數型別 作用
buf 接收緩衝區的起始地址,可以是各種陣列或結構的指標,為輸出引數
status MPI_Status結構指標,返回狀態資訊,為輸出引數
count 整數,最多可接收的資料個數
datatype 接收資料的資料型別
source 整型,接受資料的來源即傳送資料程序號
tag 整數,訊息標識,應與相應的發soon給操作訊息標識相同。
comm 本程序(訊息接收程序)和訊息傳送程序所在的通訊域

對於MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);的解釋:

在通訊域MPI_COMM_WORLD中,0號程序(假設是0)從任意程序(MPI_ANY_SOURCE表示接受任意程序發來的訊息),接收的標籤號是99,而且不超過512個MPI_CHAR型別資料,儲存到buffer中。

注意緩衝區buf的大小,不能小於傳送過來的有效訊息長度,否則可能由於陣列越界導致程式錯誤(段錯誤)

MPI_Status
  • MPI_Status是MPI中一個特殊的,也是比較有用的結構。MPI_Status的結構定義如下:
typedef struct MPI_Status {
  int count;
  int cancelled;
  int MPI_SOURCE;
  int MPI_TAG;
  int MPI_ERROR;
} MPI_Status;
  • status主要顯示接收函數的各種錯誤狀態,我們通過存取status.MPI_SOURCE,status.MPI_TAG和status.MPI_ERROR就可以得到傳送資料程序號,傳送資料使用的tag以及本接收操作返回的錯誤程式碼。當然如果想要獲取資料項數,筆者嘗試了一下,好像通過status.count無法獲取,需要通過MPI函數MPI_Get_count獲得。

MPI_Get_count

其標準定義為:

int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count);

其中前兩個引數為輸入引數,status是MPI_Recv返回的狀態結構的指標,datatype指定資料型別,最後一個引數是輸出引數,是實際接收到的給頂資料型別的資料項數。

筆者測試的程式如下,確實獲得了實際收到的個數。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define MAXLEN 512

int main(int argc, char* argv[]) {
  int myid, namelen, numprocs;
  char buffer[MAXLEN], pro_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Status status;
  MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(pro_name, &namelen);
    printf("myid %d of %d running on %s\n", myid, numprocs, pro_name);
    if(myid == 0) {
      strcpy(buffer, "hello world");
      printf("processor 0 sending message: %s\n", buffer);
      fflush(stdout);
      MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, 3, 110, MPI_COMM_WORLD);
      printf("send %d data\n", strlen(buffer)+1);
      fflush(stdout);
    }
    if(myid == 3) {
      MPI_Recv(buffer, MAXLEN, MPI_CHAR, 0, 110, MPI_COMM_WORLD, &status);
      printf("processor 3 received message: %s\n", buffer);
      fflush(stdout);
      int count;
      MPI_Get_count(&status, MPI_CHAR, &count);
      printf("the data num is %d\n", count);
    }
  MPI_Finalize();
  return 0;
}

這裡的count其實本質上是需要根據資料型別變化的,MPI_DOUBLE,MPI_INT,MPI_CHAR對於同一長度的資料所能儲存的資料個數是不一樣的,這與C是一樣的。

上面的對等通訊的例子,對應上面MPMD中的流式模型,即程序i等待程序i-1傳遞過來的字串,並將其傳遞給程序i+1,直到最後一個程序傳遞給程序0。

訊息管理7要素

mpi最重要的功能就是訊息傳遞,MPI_Send和MPI_Recv負責在兩個程序之間接收資訊和傳送資訊。主要由以下7個引數構成。

  • 傳送或者接收緩衝區buf
  • 資料數量count
  • 資料型別datatype
  • 目標程序或者源程序destination/source
  • 訊息標籤tag
  • 通訊域comm
  • 訊息狀態status,只在接收的函數中出現

訊息信封
MPI程式中的訊息傳遞和我們日常的郵件傳送和傳遞有類似之處,其中buf,coutn,datatype是信件的內容,而source/destination,tag,comm是信件的信封,因此我們稱之為訊息信封。

訊息資料型別

訊息資料型別,就是之前所說的datatype

作用

  • 方便將非連續記憶體中的資料,以及具有不同資料型別的內容組成訊息
  • 其型別匹配非常嚴格,一是宿主語言(如C)資料型別和通訊操作的資料型別匹配,同時傳送方和接收方的資料型別匹配

基本資料型別

以下給出了MPI預定義資料型別與C資料型別的對應關係

MPI預定義資料型別 相應的C資料型別
MPI_CHAR signed char
MPI_SHORT signed short int
MPI_INT signed int
MPI_LONG signed long int
MPI_UNSIGNED_CHAR unsigned char
MPI_UNSIGNED_SHORT unsigned short int
MPI_UNSIGNED unsigned int
MPI_UNSIGNED_LONG unsigned long int
MPI_FLOAT float
MPI_DOUBLE double
MPI_LONG_DOUBLE long double
MPI_BYTE 無對應型別
MPI_PACKED 無對應型別

基本上就是MPI+datatype的結構

一開始的時候建議儘可能地保證傳送和接收地資料型別完全一致。

這裡面的多出來的MPI_BYTEMPI_PACKED,可以與任意以位元組為單位的訊息相匹配。MPI_BYTE是將訊息不加修改的通過二進位制位元組流來傳遞的一種方式,而MPI_PACKED是為了將非連續的資料進行打包傳送而提出的。經常與函數MPI_Pack_sizeMPI_Pack聯合使用。

下面是MPI_PACKED的使用程式碼:

#include"mpi.h"
#include<stdio.h>
#include<string.h>
#include<stdlib.h>

#define MAXLEN 512

int main() {
  int myid, namelen;
  MPI_Status status;
  char name[MPI_MAX_PROCESSOR_NAME], buf[MAXLEN];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(name, &namelen);
    printf("processor %d is started on %s\n", myid, name);
    if(myid == 0) {
        double A[100];
        int buffersize;
        MPI_Pack_size(50, MPI_DOUBLE, MPI_COMM_WORLD, &buffersize);
        void* tempbuffer = malloc(buffersize);
        int j = sizeof(MPI_DOUBLE);
        int position = 0;
        for(int i = 0; i < 100; i++) A[i] = i * 1.1;
        printf("position : %d\n", position);
        for(int i = 0; i < 50; i++)
          MPI_Pack(A+i*2, 1, MPI_DOUBLE, tempbuffer, buffersize, &position, MPI_COMM_WORLD);
        MPI_Send(tempbuffer, position, MPI_PACKED, 1, 101, MPI_COMM_WORLD);
        free(tempbuffer);
    }
    if(myid == 1) {
      void* B = malloc(MAXLEN);
      MPI_Recv(B, MAXLEN, MPI_PACKED, 0, 101, MPI_COMM_WORLD,&status);
      int num;
      MPI_Get_count(&status, MPI_PACKED, &num);
      printf("%d\n", num);
      double* C = (double*)B;
      for(int i = 0; i < 50; i++) {
        printf("%lf\n", C[i]);
      }
      free(B);
    }
  MPI_Finalize();
  return 0;
}
  • MPI_Pack_size

    • 決定需要多大的緩衝區來存放資料
    • MPI_Pack_size(num, datatype, comm, buffersize)
    • 這裡是通過MPI_Pack_size來計算num個datatype資料所需要的記憶體,其結果存放在buffersize,注意buffersize給的是整型指標,comm就是通訊域
  • MPI_Pack

    • MPI_Pack(buf, sum, datatype, tempbuffer, buffersize, &position, comm)
    • buf是所要打包的資料的起始位置(指標or地址),第二個引數是打包幾個資料,第三個引數是說這回的資料的種類,第四個引數tempbuffer是要打包的地方,buffersize是緩衝區大小,第五個引數用於跟蹤已經有多少個資料被打包(同時也作為地址偏移量,本質上也是第一個資料開始存放的地方),第六個就是通訊域
匯出資料型別

MPI還允許通過匯出資料型別,將不連續的,甚至是不同型別的資料元素組合在一起形成新的資料型別。我們稱這種由使用者定義的資料型別為到此處資料型別。這需要由MPI提供的建構函式來構造。

總之型別匹配規則如下:

  • 有型別資料的通訊,傳送方和接收方均使用相同的資料型別
  • 無型別資料的通訊,傳送方和接收方均以MPI_BYTE作為資料型別
  • 打包資料的通訊,傳送方和接收方均使用MPI_PACKED

訊息標籤TAG

TAG是訊息信封中的一項,是程式在同一接收者的情況下,用於標識不同型別訊息的一個整數。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define MAXN 512

int main() {
  int myid, namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Status status;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(processor_name, &namelen);
    printf("processor %d running on %s\n", myid, processor_name);
    if(myid == 0) {
      char message[MAXN];
      strcpy(message, "hello, I'm processor 0\n");
      printf("processor 0 sending message: %s", message);
      fflush(stdout);
      MPI_Send(message, strlen(message)+1, MPI_CHAR, 2, 101, MPI_COMM_WORLD);
      strcpy(message, "goodbye, I'm processor 0\n");
      printf("processor 0 sending message: %s", message);
      fflush(stdout);
      MPI_Send(message, strlen(message)+1, MPI_CHAR, 2, 110, MPI_COMM_WORLD);
    }
    if(myid == 2) {
      char message[MAXN];
      MPI_Recv(message, MAXN, MPI_CHAR, 0, 101, MPI_COMM_WORLD, &status);
      printf("processor 2 received message: %s", message);
      fflush(stdout);
      MPI_Recv(message, MAXN, MPI_CHAR, 0, 110, MPI_COMM_WORLD, &status);
      printf("processor 2 received message: %s", message);
      fflush(stdout);
    }
  MPI_Finalize();
  return 0;
}

如果上述的例子假設沒有標籤的化,那麼有可能程序0傳送的第二個資訊如果比第一個資訊塊,那麼程序2接收的就是第二個資訊,如果此時儲存的地方不一樣,就會導致訊息溝通的錯誤,所以我們需要訊息標籤來進行區別。

通訊域

訊息的傳送和接收必須使用相同的訊息標籤才能實施通訊。維護TAG來匹配訊息是比較繁瑣的事情,因此我們同時提出了另一項通訊域。

一個通訊域包含一個行程群組及其上下文。行程群組是程序的有限有序集。有限是說程序的數量是有限的,有序是編號是從0~n-1。

通訊域限定了訊息傳遞的程序範圍。

一個程序在一個通訊組中,用它的編號進行標識,組的大小和程序號可以用前面所說的MPI_Comm_sizeMPI_Comm_rank獲得。

MPI預先定義了兩個行程群組:MPI_COMM_SELF(只包含自己的通訊域)和MPI_COMM_WORLD(包含所有MPI程序的行程群組),同時,MPI對於通訊子(通訊組)提供了各種管理函數。

  • int MPI_Comm_compare(comm1, comm2, result)

其中result是整型指標的傳遞,這裡比較comm1和comm2,如果comm1和comm2是相同的控制程式碼,則result為MPI_Ident(感覺上是一個整型,但是實測的時候沒法列印,反正該函數通過result值得不同來表示結果),如果僅僅是個行程群組得成員和序列號都相同,則result為MPI_Congruent,如果兩者得組成員相同但序列號不同則結果為MPI_Similar,否則結果就為MPI_Unequal

  • int MPI_Comm_dup(comm, newcomm)

對comm進行復製得到新的通訊域newcomm,注意這邊得newcomm是通過指標傳遞的,型別為MPI_Comm*

  • int MPI_Comm_solit(comm, color, key, newcomm)

通訊域分裂,本函數要求comm行程群組中的每個程序都要執行,每個程序指定一個color(整型),如果具有相同的color值的程序形成一個新的行程群組,新產生的通訊域與這些行程群組一一對應。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define MAXN 512

int main() {
  MPI_Comm a;
  MPI_Status status;
  int myid, numprocs;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    printf("MPI_COMM_WORLD:%d\n", myid);
    MPI_Comm_split(MPI_COMM_WORLD, myid%2, myid, &a);
    MPI_Comm_size(a, &numprocs);
    printf("%d\n", numprocs);
    MPI_Comm_rank(a, &myid);
    printf("a:%d\n", myid);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    if(myid == 0) {
       char buf[MAXN];
       strcpy(buf, "hello world from 0\n");
       printf("processor 0 sending : %s", buf);
       MPI_Send(buf, strlen(buf)+1, MPI_CHAR, 1, 110, a);
    }
    if(myid == 2) { // 這裡發現0和2是一組,0和3不是一組
      char buf[MAXN];
      MPI_Recv(buf, MAXN, MPI_CHAR, 0, 110, a, &status);
      printf("%s", buf);
    }
  MPI_Finalize();
  return 0;
}

注意新產生的通訊域包含舊的所有程序,只是不同的程序可能在不同的組別之中。新的行程群組中,各個程序的順序編號根據key(整型)的大小決定,如果key越小,則相應程序在新通訊域中的順序編號也越小,如果key值相同,則根據這兩個程序在原來通訊域中順序號決定新的程序號。一個程序可能提供color值為MPI_Undefined,此時,newcomm返回MPI_COMM_NULL(分裂失敗)

  • int MPI_Comm_free(comm)

釋放給定的通訊域,注意這裡傳遞的是指標

狀態字(status)

狀態字的主要功能就是儲存接收到的訊息的狀態。

while(true) {
  MPI_Recv(..., ..., ..., MPI_ANY_SOURCE, MPI_ANY_TAG,...,...);
  switch(status.MPI_TAG) {
    case 0: ...;
    case 1: ...;
    case 2: ...;
  }
}

這裡的MPI_Recv沒有指定從哪裡接收資訊,可以接收任意來源的資訊,任意標籤的資訊(MPI_ANY_TAG),我們可以通過檢查status中的MPI_TAG可以有效把訊息區分開來。當一個接收者能從不同程序接收不同大小和標籤的訊息時,比如伺服器程序,查閱狀態資訊就會很有用。我們可以利用狀態字的標籤可以進行更多的有意思的操作。

通訊匹配聖經

  • 通訊資料型別匹配
  • 訊息標籤,通訊域匹配
  • 傳送程序與接收程序號對應
  • 接收訊息的緩衝區大於傳送過來的訊息的大小

現在考慮如果當初的資訊大家都是先接收然後再傳送,程式會怎麼樣呢?執行後會發現,程式進入了停滯狀態,此時0,1,2,3都是在receiving狀態,而這時候沒有程序可以傳送訊息來結束這個狀態,這種大家都在等待的狀態,稱為「死鎖」,死鎖現象在多程序,多執行緒程式設計中是經常發生的現象。 因為MPI_Send或MPI_Recv正確返回的前提是該通訊操作已經完成。對於傳送操作來說就是緩衝區可以被其他的操作更新,對於接收操作來說就是該緩衝區中的資料已經可以被完整的使用。我們稱這樣的形式為阻塞通訊,如果沒有完成之前,其不會結束該次通訊操作。當然反過來,先傳送再接收是可以執行下去的,因為傳送操作不需要等待其他的先行操作,因此阻塞可以是有限的。阻塞通訊中對等訊息的匹配也對正確通訊有著至關重要的影響。

統計時間

編寫並行程式的目的是為了提高程式執行效能。為了檢驗並行化的效果,我們經常會用到統計時間的函數。MPI提供兩個時間函數MPI_WtimeMPI_Wtick

  • MPI_Wtime返回一個雙精度數,標識從過去的某點時間到當前時間所消耗的時間秒數

  • MPI_Wtick返回MPI_Wtime結果的精度

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main(int argc, char* argv[]) {
  int myid, numprocs, next, namelen;
  char buffer[BUFLEN], processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Status status;
  double t1, t2, t3, tick;

  MPI_Init(&argc, &argv);
  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
  MPI_Comm_rank(MPI_COMM_WORLD, &myid);
  MPI_Get_processor_name(processor_name, &namelen);

  t1 = MPI_Wtime();

  printf("Processor %d on %s\n", myid, processor_name);
  printf("Processor %d of %d\n", myid, numprocs);
  memset(buffer, 0, BUFLEN*sizeof(char));
  if(myid == numprocs-1)
    next = 0;
  else
    next = myid + 1;

  if(myid == 0) {
    strcpy(buffer, "hello there");
    printf("%d sending '%s'\n", myid, buffer); fflush(stdout);
    MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
    printf("%d receiving\n", myid); fflush(stdout);
    MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
    printf("%d received '%s'\n", myid, buffer); fflush(stdout);
  }else{
    printf("%d receiving\n", myid); fflush(stdout);
    MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
    printf("%d received '%s'\n", myid, buffer); fflush(stdout);
    MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
    printf("%d sent '%s'\n", myid, buffer); fflush(stdout);
  }

  t2 = MPI_Wtime();
  t3 = t2 - t1;
  tick = MPI_Wtick();
  printf("%d process time is '%.10f'\n", myid, t3);
  printf("%d process tick is '%.10f'\n", myid, tick);
  MPI_Finalize();
  return 0;
}

其實本質上和前面的時鐘打點函數的用法差不多,這裡MPI_Wtime就是獲得程式當前執行了多少時間,而MPI_Wtick就是獲得計時的精度。

錯誤管理

  • 通過status.MPI_ERROR來獲取錯誤碼
#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid;
  MPI_Status status;
  char buf[BUFLEN];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    printf("processor %d running\n", myid);
    if(myid == 0) {
      strcpy(buf, "hello, processor 1 from processor 0");
      printf("processor %d sending %s\n", myid, buf); fflush(stdout);
      MPI_Send(buf, strlen(buf)+1, MPI_CHAR, 1, 101, MPI_COMM_WORLD);
    }
    if(myid == 1) {
      MPI_Recv(buf, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
      printf("processor %d received %s\n", myid, buf); fflush(stdout);
      printf("tag %d source %d\n", status.MPI_TAG, status.MPI_SOURCE);
      printf("error code %d\n", status.MPI_ERROR);
    }
  MPI_Finalize();
  return 0;
}
  • MPI終止MPI程式執行的函數MPI_Abort

int MPI_Abort(MPI_Comm, int errorcode)

該函數的作用使通訊域comm的所有程序退出,返回errorcode給呼叫的環境。通訊域comm中的任意程序呼叫此函數都能使該通訊域內所有的程序結束執行。這裡只要執行到這個程式碼,那麼所有的程序都會結束,類似於丟擲異常的處理機制。

接下來進入本章的最後一個環節啦,加油。

MPI群集通訊

除了之前介紹的對等通訊,MPI還有群集通訊。群集通訊,說白了就是包含一對多,多對一,多對多的程序通訊模式(就是不帶一對一玩,但其實本質上就是多對多,因為一對多和多對一不過是多對多的特例)。此時的通訊方式變成了多個程序參與通訊。

同步

int MPI_Barrier(MPI_Comm comm)

如下面這段程式碼,如果沒有MPI_Barrier,那麼程序執行快的會直接執行下面的程式碼,而有的程序還沒有執行第一行的輸出。

#include"mpi.h"
#include<stdio.h>

int main() {
  int myid;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    printf("processor %d running\n", myid); fflush(stdout);
    MPI_Barrier(MPI_COMM_WORLD);
    printf("hello world %d\n", myid); fflush(stdout);
  MPI_Finalize();
  return 0;
}

這個函數就像是一道路障。使得通訊子comm中的所有程序相互同步,知道所有的程序都執行了他們各自的MPI_Barrier函數,然後各自開始執行後面的程式碼。同步函數是並行程式中控制執行順序的常用手段。(本質上就是強迫所有在通訊子comm中的程序,重新在Barrier那一行一起進行,讓某些執行緒達到同步,此時有點序列的味道)

廣播

廣播就是一對多的傳送訊息,從一個root程序向組內所有其他的程序傳送一條訊息。

int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int root,MPI_Comm)

相比於之前的MPI_Send,MPI_Bcast就是少了目標程序,此時的目標程序擴大為組內的所有程序。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char buf[BUFLEN], Buf[BUFLEN], name[MPI_MAX_PROCESSOR_NAME];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(name, &namelen);
    printf("%d of %d running on %s\n", myid, numprocs, name); fflush(stdout);
    memset(buf, 0, sizeof(buf));
    memset(Buf, 0, sizeof(Buf));
    if(myid == 0) {
      strcpy(buf, "hello, I\'m processor 0\n");
    }
    printf("processor %d\'s buf : %s", myid, buf); fflush(stdout);
    printf("\nMPI_Bcast is started\n"); fflush(stdout);
    if(myid == 0) MPI_Bcast(buf, strlen(buf)+1, MPI_CHAR, 0, MPI_COMM_WORLD);
    MPI_Bcast(Buf, BUFLEN, MPI_CHAR, 0, MPI_COMM_WORLD);
    printf("processor %d\'s now buf : %s", myid, Buf); fflush(stdout);
  MPI_Finalize();
  return 0;
}

用法如上,本質上和Recv和Send很相似,不過沒有了tag,同時MPI_Bcast廣播本身可以做傳送和接收,如果當前程序號等於root,那就是傳送,否則就是接收。

聚集

int MPI_Gather(void* sendbuf, int sendcnt, MPI_Datatype sendtype, void* recvbuf, int recvcnt, MPI_Datatype recvtype, int root, MPI_Comm comm)

該函數的作用就是root程序接收該通訊組每一個成員程序(包括root自己)傳送的資訊。這n個訊息的連線按程序號排列存放在root程序的接收緩衝中。每個緩衝由三元組(sendbuf, sendcnt, sendtype)標識。所有非root程序忽略接收緩衝。跟多的是接收的作用,只不過此時接收的是其他程序中傳送過來的資訊。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char name[MPI_MAX_PROCESSOR_NAME], buf[BUFLEN], BUF[BUFLEN];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(name, &namelen);
    printf("%d of %d running on %s\n", myid, numprocs, name); fflush(stdout);
    sprintf(buf, "hello, I\'m processor %d.", myid);
    printf("%s\n", buf); fflush(stdout);
    int len = strlen(buf);
    MPI_Gather(buf, len, MPI_CHAR, BUF, len, MPI_CHAR, 1, MPI_COMM_WORLD);
    //MPI_Barrier(MPI_COMM_WORLD);
    printf("processor %d\'BUF is %s\n", myid, BUF); fflush(stdout);
  MPI_Finalize();
  return 0;
}

MPI_Gather注意這邊的函數sendcnt和recvcnt要匹配。如果不相等可能會造成通訊錯誤,其實質就是執行這些函數的程序開始相互通訊。注意該函數自帶有barrier的功能。

播撒

int MPI_Scatter(void* sendbuf, int sendcnt, MPI_Datatype sendtype, void* recvbuf, int recvcnt, MPI_Datatype recvtype, int root, MPI_Comm comm)

MPI_scatter是一對多傳遞訊息。和廣播不同的是,root程序向各個程序傳遞的訊息可以是不同的。Scatter實際上執行的是與Gather相反的操作。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME], buf[BUFLEN], BUF[BUFLEN];
  MPI_Status status;
  MPI_Init(NULL, NULL);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(processor_name, &namelen);
    printf("%d of %d running on %s\n", myid, numprocs, processor_name);
    memset(buf, 0, sizeof(buf));
    if(myid == 0) strcpy(buf, "hello, I\'m processor 0");
    printf("processor %d buf %s\n", myid, buf); fflush(stdout);
    int len = strlen(buf), next = (myid + 1) % numprocs;
    MPI_Barrier(MPI_COMM_WORLD);
    if(myid == 0) {
      MPI_Send(&len, 1, MPI_INT, next, 101,  MPI_COMM_WORLD);
      MPI_Recv(&len, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    }else{
      MPI_Recv(&len, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
      MPI_Send(&len, 1, MPI_INT, next, 101,  MPI_COMM_WORLD);
    }
    MPI_Barrier(MPI_COMM_WORLD);
    printf("processor %d len %d\n", myid, len); fflush(stdout);
    MPI_Scatter(buf, len/4, MPI_CHAR, BUF, len/4, MPI_CHAR, 0, MPI_COMM_WORLD);
    printf("processor %d BUF %s\n", myid, BUF); fflush(stdout);
  MPI_Finalize();
  return 0;
}

注意方便起見,建議這裡的sendcnt和recvcnt保持一直,同時注意這裡的recvcnt是表示每個程序接收的數量,而不是傳送的總數量,注意這個區別,類似於一種分配塊中任務的數量。當然root程序可以給自己傳送資訊。

擴充套件的聚集和播撒操作

MPI_Allgather的作用是每一個程序都收集到其他所有程序的訊息,它相當於每一個程序都執行了MPI_Gather執行完了MPI_Gather之後,所有的程序的接收緩衝區的內容都是相同的,也就是說每個程序給所有程序都傳送了一個相同的訊息,所以名為allgather。本函數的介面是:

int MPI_Allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char buf[BUFLEN], BUF[BUFLEN], name[MPI_MAX_PROCESSOR_NAME];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(name, &namelen);
    printf("processor %d of %d running on %s\n", myid, numprocs, name);
    memset(buf, 0, sizeof(buf));
    memset(BUF, 0, sizeof(BUF));
    sprintf(buf, "hello, I'm processor %d", myid);
    MPI_Allgather(buf, strlen(buf), MPI_CHAR, BUF, strlen(buf), MPI_CHAR, MPI_COMM_WORLD);
    printf("processor %d get message : %s\n", myid, BUF); fflush(stdout);
  MPI_Finalize();
  return 0;
}

全域性交換

MPI_Allgather每個程序傳送一個相同的訊息給所有的程序,而MPI_Alltoall散發給不同程序的訊息是不同的。因此,它的傳送緩衝區也是一個陣列。MPI_Alltoall的每個程序可以向每個接收者傳送數目不同的資料,第i個程序傳送的第j塊資料將被第j 個程序接收並存放在其他訊息緩衝區recvbuf的第i塊,每個程序的sendcount和sendtype的型別必須和所有其他程序的recvcount和recvtype相同,這也意味著在每個程序和根程序之間傳送的資料量必須和接收的資料量相等。函數介面為:

int MPI_Alltoall(void* sendbug, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype, MPI_Comm comm)

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME], buf[BUFLEN], BUF[BUFLEN];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(processor_name, &namelen);
    printf("%d of %d running on %s\n", myid, numprocs, processor_name); fflush(stdout);
    sprintf(buf, "I\'m processor %d, hello!", myid);
    printf("processor %d : %s\n", myid, buf); fflush(stdout);
    memset(BUF, 0, sizeof(BUF));
    int len = strlen(buf);
    MPI_Alltoall(buf, len/numprocs, MPI_CHAR, BUF, len/numprocs, MPI_CHAR, MPI_COMM_WORLD);
    printf("processor %d get message: %s\n", myid, BUF);
  MPI_Finalize();
  return 0;
}

規約與掃描

MPI提供了兩種型別的聚合操作

規約

int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm)

這裡的每個程序的待處理資料存放在sendbuf中,可以是標量也可以是向量。所有程序將這些值通過輸入的操作子op計算為最終結果並將它存入root程序的recvbuf中。具體的規約操作包括:

操作子 功能
MPI_MAX 求最大值
MPI_MIN 求最小值
MPI_SUM 求和
MPI_PROD 求積
MPI_LAND 邏輯與
MPI_BAND 按位元與
MPI_LOR 邏輯或
MPI_BOR 按位元或
MPI_LXOR 邏輯互斥或
MPI_BXOR 按位元互斥或
MPI_MAXLOC 最大值且對應的位置
MPI_MINLOC 最小值且相應的位置

規約操作的資料型別與C中的整數型別對應。

#include"mpi.h"
#include<stdio.h>
#include<time.h>
#include<stdlib.h>
#include<string.h>

#define LEN 10
#define BASE 1000

int main() {
  int myid, numprocs, num[LEN], out[LEN];
  srand(time(NULL));
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    for(int i = 0; i < LEN; i++) num[i] = 10*myid + i;
    printf("processor %d array: ", myid);
    for(int i = 0; i < LEN; i++) printf("%d ", num[i]);
    printf("\n");fflush(stdout);
    memset(out, 0, sizeof(out));
    MPI_Barrier(MPI_COMM_WORLD);
    MPI_Reduce(&num, &out, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
    printf("processor %d array:", myid);
    if(out[0] == 0) printf("no\n");
    else{
      for(int i = 0; i < 10; i++) printf("%d ", out[i]);
      printf("\n");
    }
    fflush(stdout);
  MPI_Finalize();
  return 0;
}

注意這裡的資料量count指的是幾個資料參加,而這邊的操作其實是對所有執行緒的第i個資料進行的,所以傳遞的時候接收的就是經歷這些操作過後留下來的資料。化多執行緒為一個執行緒上的資料,歸一。

掃描

int MPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype, MPI_Op op, MPI_Comm comm)

MPI_Scan常用於對分佈於族中的資料做前置規約操作。此操作將序列號為0,...,i(包括i)的程序傳送緩衝區的規約結果存入序列號為i的程序接收訊息緩衝區中。這種操作支援的資料型別,操作以及對傳送及接收緩衝區的限制和規約相同。與規約相比,掃描操作設過去了root域,因為掃描是將部分值組合成n個最終值,並存放在n個程序的recvbuf中。具體的掃描操作有Op域定義。

MPI的規約和掃描操作允許每個程序貢獻向量值,而不只是標量值。向量的長度由Count定義。

#include"mpi.h"
#include<stdio.h>
#include<time.h>
#include<stdlib.h>
#include<string.h>

#define LEN 10
#define BASE 1000

int main() {
  int myid, numprocs, num[LEN], out[LEN];
  srand(time(NULL));
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    for(int i = 0; i < LEN; i++) num[i] = 10*myid + i+1;
    printf("processor %d array: ", myid);
    for(int i = 0; i < LEN; i++) printf("%d ", num[i]);
    printf("\n");fflush(stdout);
    memset(out, 0, sizeof(out));
    MPI_Barrier(MPI_COMM_WORLD);
    MPI_Scan(&num, &out, 10, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
    printf("processor %d array:", myid);
    if(out[0] == 0) printf("no\n");
    else{
      for(int i = 0; i < 10; i++) printf("%d ", out[i]);
      printf("\n");
    }
    fflush(stdout);
  MPI_Finalize();
  return 0;
}

與規約是類似的不過就是這裡的最終結果一定存放在最後一個程序中,同時注意隨著程序號的迭代,裡面的程序中的最後一個存放當前的結果。比如對於四程序來說,第二個程序存放一二程序中op操作子過後的值。

簡單範例

相關程式碼如下:

#include"mpi.h"
#include<stdio.h>
#include<math.h>

double f(double);

double f(double a) {
    return (4.0 / (1.0 + a*a));
}

int main(int argc, char* argv[]) {
    int n, myid, numprocs, i;
    double PI25DT = 3.141592653589793238462643;
    double mypi, pi, h, sum, x;
    double starttime = 0.0, endwtime;
    int namelen;
    char processor_name[MPI_MAX_PROCESSOR_NAME];

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(processor_name, &namelen);

    fprintf(stdout, "Process %d of %d is on %s\n", myid, numprocs, processor_name);
    fflush(stdout);
    n = 10000;
    if(myid == 0)
      starttime = MPI_Wtime();
    MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);
    h = 1.0/(double)n;
    sum = 0.0;
    for(i = myid+1; i <= n; i += numprocs){
      x = h * ((double)i - 0.5);
      sum += f(x);
    }
    mypi = h * sum;
    MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
    if(myid == 0) {
      endwtime = MPI_Wtime();
      printf("pi is approximately %.16f, Error is %.16f\n", pi, fabs(pi-PI25DT));
      printf("wall clock time = %f\n", endwtime-starttime);
      fflush(stdout);
    }
    MPI_Finalize();
    return 0;
}

這裡本質上利用的是積分求pi,1/(1+x^2)的積分是arctanx,通過這種方式來實現。最後通過規約操作中的求和將四個執行緒的內容相加就可以了,也就是將其中的操作基本上分成四等分,然後來求解。

小結

  • 通訊子中的所有程序必須呼叫群集通訊歷程。如果有意個程序沒有呼叫,會產生奇奇怪怪的錯誤。
  • 一個程序一旦結束了群集操作就從群集例程中返回。
  • 每個群集歷程,也就是前面的群集函數都有阻塞的功能

MPI入門到此ending。完結撒花,感謝陪伴。

何當共剪西窗燭,卻話巴山夜雨時。

江湖再會,哈哈哈。