以下內容搬運自本人個人網站:
在個人網站閱讀體驗更佳!
首先貼一下課程官網,方便大家查閱.
首先 在 這裡看lab1的要求,我們先執行幾個官網給出的命令搭建實驗環境
$ git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824
$ cd 6.824
$ ls
Makefile src
$
這樣就能直接拉取到所需的程式碼
大致目錄如下:
然後繼續看官網:
We supply you with a simple sequential mapreduce implementation in src/main/mrsequential.go. It runs the maps and reduces one at a time, in a single process. We also provide you with a couple of MapReduce applications: word-count in mrapps/wc.go, and a text indexer in mrapps/indexer.go. You can run word count sequentially as follows:
大致是提供一個MapReduce application demo 你可以照著run一下.
$ cd ~/6.824
$ cd src/main
$ go build -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run mrsequential.go wc.so pg*.txt
$ more mr-out-0
A 509
ABOUT 2
ACT 8
...
於是我在goland試了一下:
報錯貼了Google:原來是Windows環境做不了這個lab啊
稍加思索—>發現異常–>放棄實驗跑去linux繼續苟
然後就會輸出它給你的文字的單詞統計(這也是我做的第一個hadoop框架有用的demo.hadoop核心程式碼也是mapreduce.
clone出來的倉庫中只有一個資料夾是和第一個lab mapreduce相關的
mr資料夾!蕪湖
MapReduce系統是由一個master程序和多個worker行程群組成,master和worker之間是通過RPC(Remote Procedure Call)進行通訊,master程序負責給多個worker分配任務,記錄任務完成狀態,並且需要處理worker奔潰或者超時執行等問題,worker需要處理相應的任務,處理完畢傳送報告給master,再請求下一個任務。
master結構:
type Flag struct {
processing bool
finished bool
}
type Master struct {
FileNames []string
MapFlags []Flag
ReduceFlags []Flag
MapTaskCnts []int
ReduceTaskCnts []int
MapAllDone bool
ReduceALLDone bool
MapNum int
ReduceNum int
Mut sync.Mutex
}
Worker結構:
type TaskState int
const (
MapState TaskState = 0
ReduceState TaskState = 1
StopState TaskState = 2
WaitState TaskState = 3
)
type WorkerTask struct {
MapID int
ReduceID int
ReduceNum int
MapNum int
MapTaskCnt int
ReduceTaskCnt int
State TaskState
FileName string
MapFunction func(string, string) []KeyValue
ReduceFunction func(string, []string) string
}
Master介面:
建立Master:
func MakeMaster(files []string, nReduce int) *Master {
m := Master{FileNames: files,
MapFlags: make([]Flag, len(files), len(files)),
ReduceFlags: make([]Flag, nReduce, nReduce),
MapNum: len(files),
ReduceNum: nReduce,
MapAllDone: false,
ReduceALLDone: false,
MapTaskCnts: make([]int, len(files)),
ReduceTaskCnts: make([]int, nReduce),
}
m.server()
args, reply := NoArgs{}, NoReply{}
go m.HandleTimeOut(&args, &reply)
return &m
}
生成worker task:
func (m *Master) CreateWorkerTask(args *NoArgs, workerTask *WorkerTask) error {
m.Mut.Lock()
defer m.Mut.Unlock()
if !m.MapAllDone {
for idx := 0; idx < m.MapNum; idx++ {
if !m.MapFlags[idx].processing && !m.MapFlags[idx].finished {
workerTask.ReduceNum = m.ReduceNum
workerTask.MapNum = m.MapNum
workerTask.State = MapState
workerTask.MapID = idx
workerTask.FileName = m.FileNames[idx]
m.MapTaskCnts[idx]++
workerTask.MapTaskCnt = m.MapTaskCnts[idx]
m.MapFlags[idx].processing = true
return nil
}
}
workerTask.State = WaitState
return nil
}
if !m.ReduceALLDone {
for idx := 0; idx < m.ReduceNum; idx++ {
if !m.ReduceFlags[idx].processing && !m.ReduceFlags[idx].finished {
workerTask.State = ReduceState
workerTask.ReduceNum = m.ReduceNum
workerTask.MapNum = m.MapNum
workerTask.ReduceID = idx
m.ReduceTaskCnts[idx]++
workerTask.ReduceTaskCnt = m.ReduceTaskCnts[idx]
m.ReduceFlags[idx].processing = true
return nil
}
}
workerTask.State = WaitState
return nil
}
workerTask.State = StopState
return nil
}
函數首先會獲得互斥鎖,然後判斷MapAllDone是否為false,為false進入迴圈遍歷,如果某個任務的processing狀態和finished狀態都為false,說明這個任務可以需要被處理,可以分配,講設定引數寫入到輸出引數中,並標誌master中當前任務的狀態processing為true以及序列號。如果沒有任務需要處理,說明map有些任務正在處理,有些已完成。進入等待階段。判斷ReduceALLDone與前面類似。不加以敘述。
處理worker report
func (m *Master) HandleWorkerReport(wr *WorkerReportArgs, task *NoReply) error {
m.Mut.Lock()
defer m.Mut.Unlock()
if wr.IsSuccess {
if wr.State == MapState {
if wr.MapTaskCnt == m.MapTaskCnts[wr.MapID] {
m.MapFlags[wr.MapID].finished = true
m.MapFlags[wr.MapID].processing = false
}
} else {
if wr.ReduceTaskCnt == m.ReduceTaskCnts[wr.ReduceID] {
m.ReduceFlags[wr.ReduceID].finished = true
m.ReduceFlags[wr.ReduceID].processing = false
}
}
} else {
if wr.State == MapState {
if m.MapFlags[wr.MapID].finished == false {
m.MapFlags[wr.MapID].processing = false
}
} else {
if m.ReduceFlags[wr.ReduceID].finished == false {
m.ReduceFlags[wr.ReduceID].processing = false
}
}
}
for id := 0; id < m.MapNum; id++ {
if !m.MapFlags[id].finished {
break
} else {
if id == m.MapNum-1 {
m.MapAllDone = true
}
}
}
for id := 0; id < m.ReduceNum; id++ {
if !m.ReduceFlags[id].finished {
break
} else {
if id == m.ReduceNum-1 {
m.ReduceALLDone = true
}
}
}
return nil
}
輸入引數有一個標示位,表示任務是否成功,成功判斷任務狀態以及序列號,如果序列號與master對應上,可以表明這個任務成功,如果對不上,說明這個是個timeout任務,無需處理.如果任務標誌位為false,進入錯誤處理,判斷任務是否完成,因為可能是timeout任務標誌位為false,未完成讓processing置0,CreateWorkerTask可以重新分配。最後判斷Map任務和Reduce任務是否相應全部完成,全部完成可以設定MapALLDone和ReduceALLDone為true。
處理timeout:
func (m *Master) HandleTimeOut(args *NoArgs, reply *NoReply) error {
for {
m.Mut.Lock()
if m.MapAllDone && m.ReduceALLDone {
m.Mut.Unlock()
break
}
time.Sleep(30 * time.Millisecond)
if !m.MapAllDone {
for idx := 0; idx < m.MapNum; idx++ {
if m.MapFlags[idx].finished == false {
m.MapFlags[idx].processing = false
}
}
} else {
for idx := 0; idx < m.ReduceNum; idx++ {
if m.ReduceFlags[idx].finished == false {
m.ReduceFlags[idx].processing = false
}
}
}
m.Mut.Unlock()
time.Sleep(2000 * time.Millisecond)
}
return nil
}
處理timeout很簡單,先判斷MapAllDone和ReduceAllDone是不是都為true,都為true都退出即可.然後判斷m任務有無完成,沒有完成任務的processing清為0,就可以讓createWorkerTask重新分配沒有完成的任務.最後釋放鎖,睡眠2S,可以看到Handletimeout函數是以2S為間隔,2s沒有完成任務的視為timeout.
Work介面
生成Work
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
wt := WorkerTask{
MapFunction: mapf,
ReduceFunction: reducef,
}
for {
wt.GetWorkerTask()
if wt.State == MapState {
wt.DoMapWork()
} else if wt.State == ReduceState {
wt.DoReduceWork()
} else if wt.State == StopState {
break
} else if wt.State == WaitState {
time.Sleep(300 * time.Millisecond)
}
}
return
}
mrworker會呼叫worker函數,傳入map函數和reduce函數,根據函數引數建立一個worker,然後進入迴圈,呼叫GetWorkerTask函數,這個函數會呼叫Master.CreateWorkerTask函數,並傳入兩個引數,得到任務分配後,講相應的引數和狀態賦值給worker。worker就可以根據狀態進入處理相應任務或者睡眠,或者退出。
Map Work
func (wt *WorkerTask) DoMapWork() {
file, err := os.Open(wt.FileName)
content, err := ioutil.ReadAll(file)
file.Close()
kvs := wt.MapFunction(wt.FileName, string(content))
intermediate := make([][]KeyValue, wt.ReduceNum, wt.ReduceNum)
for _, kv := range kvs {
idx := ihash(kv.Key) % wt.ReduceNum
intermediate[idx] = append(intermediate[idx], kv)
}
for idx := 0; idx < wt.ReduceNum; idx++ {
intermediateFileName := fmt.Sprintf("mr-%d-%d", wt.MapID, idx)
file, err = os.Create(intermediateFileName)
data, _ := json.Marshal(intermediate[idx])
_, err = file.Write(data)
file.Close()
}
wt.ReportWorkerTask(nil)
}
func (wt *WorkerTask) ReportWorkerTask(err error) {
wra := WorkerReportArgs{
MapID: wt.MapID,
ReduceID: wt.ReduceID,
State: wt.State,
IsSuccess: true,
}
if wt.State == MapState {
wra.MapTaskCnt = wt.MapTaskCnt
} else {
wra.ReduceTaskCnt = wt.ReduceTaskCnt
}
wrr := NoReply{}
if err != nil {
wra.IsSuccess = false
}
call("Master.HandleWorkerReport", &wra, &wrr)
}
Map work就是讀取相應的檔案,呼叫MapFunction生成KeyValue對,然後根據雜湊函數得到要講當前key分配到哪一塊中,總共有ReduceNum塊,最後根據這麼塊生成對應map以及reduce塊的檔案。然後呼叫ReportWorkerTask報告成功,傳入nil表示成功。ReportWorkerTask內部會呼叫Master.HandleWorkerReport函數來彙報這一執行結果。
Reduce Work
func (wt *WorkerTask) DoReduceWork() {
kvsReduce := make(map[string][]string)
for idx := 0; idx < wt.MapNum; idx++ {
filename := fmt.Sprintf("mr-%d-%d", idx, wt.ReduceID)
file, err := os.Open(filename)
content, err := ioutil.ReadAll(file)
file.Close()
kvs := make([]KeyValue, 0)
err = json.Unmarshal(content, &kvs)
for _, kv := range kvs {
_, ok := kvsReduce[kv.Key]
if !ok {
kvsReduce[kv.Key] = make([]string, 0)
}
kvsReduce[kv.Key] = append(kvsReduce[kv.Key], kv.Value)
}
}
ReduceResult := make([]string, 0)
for key, val := range kvsReduce {
ReduceResult = append(ReduceResult, fmt.Sprintf("%v %v\n", key, wt.ReduceFunction(key, val)))
}
outFileName := fmt.Sprintf("mr-out-%d", wt.ReduceID)
err := ioutil.WriteFile(outFileName, []byte(strings.Join(ReduceResult, "")), 0644)
wt.ReportWorkerTask(nil)
}
這裡首先讀取相同塊的所有檔案,需要對相同key的內容聚合在一起,然後迴圈呼叫ReduceFunction得到Reduce的結果,最後生成輸出.
到這裡MapReduce實現的就差不多了,關於MapReduce,總結下來是:map對每個檔案生成單詞和單一數目,分在不同的區塊儲存,Reduce對不同區塊進行統計,得到最終結果.講這兩個過程直接包裝起來就是mapreduce.
關於MapReduce的論文,可以閱讀這裡.
當然由於是04年的論文,所以現在的翻譯資源已經很豐富了(正經人誰去讀原版那種單詞都認識合成一句話就不知道講什麼的東西呢.
最後放一張過test圖片:
—MIT6.824 lab1 end
-----------2020.10.11
@copyright ------------baijianruoliorz@Github--------------------------------