MIT6.824 LAB1

2020-10-12 11:00:09

以下內容搬運自本人個人網站:

在個人網站閱讀體驗更佳!

我的個人網站

mit6.824 系列學習

img

首先貼一下課程官網,方便大家查閱.

mit6.824

lab1實現:

首先 在 這裡看lab1的要求,我們先執行幾個官網給出的命令搭建實驗環境

$ git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824
$ cd 6.824
$ ls
Makefile src
$

這樣就能直接拉取到所需的程式碼

大致目錄如下:

然後繼續看官網:

We supply you with a simple sequential mapreduce implementation in src/main/mrsequential.go. It runs the maps and reduces one at a time, in a single process. We also provide you with a couple of MapReduce applications: word-count in mrapps/wc.go, and a text indexer in mrapps/indexer.go. You can run word count sequentially as follows:

大致是提供一個MapReduce application demo 你可以照著run一下.

$ cd ~/6.824
$ cd src/main
$ go build -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run mrsequential.go wc.so pg*.txt
$ more mr-out-0
A 509
ABOUT 2
ACT 8
...

於是我在goland試了一下:

報錯貼了Google:原來是Windows環境做不了這個lab啊

稍加思索—>發現異常–>放棄實驗跑去linux繼續苟

然後就會輸出它給你的文字的單詞統計(這也是我做的第一個hadoop框架有用的demo.hadoop核心程式碼也是mapreduce.

clone出來的倉庫中只有一個資料夾是和第一個lab mapreduce相關的

mr資料夾!蕪湖

關於mapreduce系統框架:

MapReduce系統是由一個master程序和多個worker行程群組成,master和worker之間是通過RPC(Remote Procedure Call)進行通訊,master程序負責給多個worker分配任務,記錄任務完成狀態,並且需要處理worker奔潰或者超時執行等問題,worker需要處理相應的任務,處理完畢傳送報告給master,再請求下一個任務。

master結構:

type Flag struct {
	processing bool
	finished   bool
}

type Master struct {
	FileNames      []string
	MapFlags       []Flag
	ReduceFlags    []Flag
	MapTaskCnts    []int
	ReduceTaskCnts []int
	MapAllDone     bool
	ReduceALLDone  bool
	MapNum         int
	ReduceNum      int
	Mut            sync.Mutex
}

  • FileNames : pg*.txt這八個檔名
  • MapFlags :對應的狀態
  • ReduceFlags:同狀態
  • MapTaskCnts:記錄map當前的任務序列號,如果map任務發生timeout,HandleTimeout這個函數對map任務進行的processing標誌清0,重新分配,當前任務序列號在上個任務號中加1,如果之前發生timeout的任務來報告完成,由於小於當前任務號,HandleWorkerReport函數可以無需記錄,直接退出.
  • ReduceTaskcnts:同上
  • MapAllDone:任務全部完成,變成true
  • ReduceAllDone:Reduce任務全部完成為true
  • MapNum:Map任務數
  • ReduceNum:任務數
  • Mut:排它鎖

Worker結構:

type TaskState int

const (
	MapState    TaskState = 0
	ReduceState TaskState = 1
	StopState   TaskState = 2
	WaitState   TaskState = 3
)

type WorkerTask struct {
	MapID          int
	ReduceID       int
	ReduceNum      int
	MapNum         int
	MapTaskCnt     int
	ReduceTaskCnt  int
	State          TaskState
	FileName       string
	MapFunction    func(string, string) []KeyValue
	ReduceFunction func(string, []string) string
}
  • MapID和ReduceID:Map任務ID和Reduce任務ID
  • MapNum和ReduceNum:Map的任務總數和Reduce任務總數
  • MapTaskCnt和ReduceTaskCnt:Map任務序列號和Reduce序列號
    State:任務有四種狀態,分別是MapState,ReduceState,StopState和WaitState,MapState表示當前需要處理Map任務,ReduceState表示當前需要處理Reduce任務,WaitState表示當前沒有需要處理的任務,開始睡眠等待,StopState代表任務已全部完成,可以退出。
  • FileName:表示Map任務需要的檔名
  • MapFunction和ReduceFunction:任務根據State需要進行的Map函數或者Reduce函數

Master介面:

建立Master:

func MakeMaster(files []string, nReduce int) *Master {
	m := Master{FileNames: files,
		MapFlags:       make([]Flag, len(files), len(files)),
		ReduceFlags:    make([]Flag, nReduce, nReduce),
		MapNum:         len(files),
		ReduceNum:      nReduce,
		MapAllDone:     false,
		ReduceALLDone:  false,
		MapTaskCnts:    make([]int, len(files)),
		ReduceTaskCnts: make([]int, nReduce),
	}
	m.server()
	args, reply := NoArgs{}, NoReply{}
	go m.HandleTimeOut(&args, &reply)
	return &m
}

生成worker task:

func (m *Master) CreateWorkerTask(args *NoArgs, workerTask *WorkerTask) error {
	m.Mut.Lock()
	defer m.Mut.Unlock()
	if !m.MapAllDone {
		for idx := 0; idx < m.MapNum; idx++ {
			if !m.MapFlags[idx].processing && !m.MapFlags[idx].finished {
				workerTask.ReduceNum = m.ReduceNum
				workerTask.MapNum = m.MapNum
				workerTask.State = MapState
				workerTask.MapID = idx
				workerTask.FileName = m.FileNames[idx]
				m.MapTaskCnts[idx]++
				workerTask.MapTaskCnt = m.MapTaskCnts[idx]
				m.MapFlags[idx].processing = true
				return nil
			}
		}
		workerTask.State = WaitState
		return nil
	}
	if !m.ReduceALLDone {
		for idx := 0; idx < m.ReduceNum; idx++ {
			if !m.ReduceFlags[idx].processing && !m.ReduceFlags[idx].finished {
				workerTask.State = ReduceState
				workerTask.ReduceNum = m.ReduceNum
				workerTask.MapNum = m.MapNum
				workerTask.ReduceID = idx
				m.ReduceTaskCnts[idx]++
				workerTask.ReduceTaskCnt = m.ReduceTaskCnts[idx]
				m.ReduceFlags[idx].processing = true
				return nil
			}
		}
		workerTask.State = WaitState
		return nil
	}
	workerTask.State = StopState
	return nil
}

函數首先會獲得互斥鎖,然後判斷MapAllDone是否為false,為false進入迴圈遍歷,如果某個任務的processing狀態和finished狀態都為false,說明這個任務可以需要被處理,可以分配,講設定引數寫入到輸出引數中,並標誌master中當前任務的狀態processing為true以及序列號。如果沒有任務需要處理,說明map有些任務正在處理,有些已完成。進入等待階段。判斷ReduceALLDone與前面類似。不加以敘述。

處理worker report

func (m *Master) HandleWorkerReport(wr *WorkerReportArgs, task *NoReply) error {
	m.Mut.Lock()
	defer m.Mut.Unlock()
	if wr.IsSuccess {
		if wr.State == MapState {
			if wr.MapTaskCnt == m.MapTaskCnts[wr.MapID] {
				m.MapFlags[wr.MapID].finished = true
				m.MapFlags[wr.MapID].processing = false
			}
		} else {
			if wr.ReduceTaskCnt == m.ReduceTaskCnts[wr.ReduceID] {
				m.ReduceFlags[wr.ReduceID].finished = true
				m.ReduceFlags[wr.ReduceID].processing = false
			}
		}
	} else {
		if wr.State == MapState {
			if m.MapFlags[wr.MapID].finished == false {
				m.MapFlags[wr.MapID].processing = false
			}
		} else {
			if m.ReduceFlags[wr.ReduceID].finished == false {
				m.ReduceFlags[wr.ReduceID].processing = false
			}
		}
	}
	for id := 0; id < m.MapNum; id++ {
		if !m.MapFlags[id].finished {
			break
		} else {
			if id == m.MapNum-1 {
				m.MapAllDone = true
			}
		}
	}
	for id := 0; id < m.ReduceNum; id++ {
		if !m.ReduceFlags[id].finished {
			break
		} else {
			if id == m.ReduceNum-1 {
				m.ReduceALLDone = true
			}
		}
	}
	return nil
}

輸入引數有一個標示位,表示任務是否成功,成功判斷任務狀態以及序列號,如果序列號與master對應上,可以表明這個任務成功,如果對不上,說明這個是個timeout任務,無需處理.如果任務標誌位為false,進入錯誤處理,判斷任務是否完成,因為可能是timeout任務標誌位為false,未完成讓processing置0,CreateWorkerTask可以重新分配。最後判斷Map任務和Reduce任務是否相應全部完成,全部完成可以設定MapALLDone和ReduceALLDone為true。

處理timeout:

func (m *Master) HandleTimeOut(args *NoArgs, reply *NoReply) error {
	for {
		m.Mut.Lock()
		if m.MapAllDone && m.ReduceALLDone {
			m.Mut.Unlock()
			break
		}
		time.Sleep(30 * time.Millisecond)
		if !m.MapAllDone {
			for idx := 0; idx < m.MapNum; idx++ {
				if m.MapFlags[idx].finished == false {
					m.MapFlags[idx].processing = false
				}
			}
		} else {
			for idx := 0; idx < m.ReduceNum; idx++ {
				if m.ReduceFlags[idx].finished == false {
					m.ReduceFlags[idx].processing = false
				}
			}
		}
		m.Mut.Unlock()
		time.Sleep(2000 * time.Millisecond)
	}
	return nil
}

處理timeout很簡單,先判斷MapAllDone和ReduceAllDone是不是都為true,都為true都退出即可.然後判斷m任務有無完成,沒有完成任務的processing清為0,就可以讓createWorkerTask重新分配沒有完成的任務.最後釋放鎖,睡眠2S,可以看到Handletimeout函數是以2S為間隔,2s沒有完成任務的視為timeout.

Work介面

生成Work

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	wt := WorkerTask{
		MapFunction:    mapf,
		ReduceFunction: reducef,
	}
	for {
		wt.GetWorkerTask()
		if wt.State == MapState {
			wt.DoMapWork()
		} else if wt.State == ReduceState {
			wt.DoReduceWork()
		} else if wt.State == StopState {
			break
		} else if wt.State == WaitState {
			time.Sleep(300 * time.Millisecond)
		}
	}
	return
}

mrworker會呼叫worker函數,傳入map函數和reduce函數,根據函數引數建立一個worker,然後進入迴圈,呼叫GetWorkerTask函數,這個函數會呼叫Master.CreateWorkerTask函數,並傳入兩個引數,得到任務分配後,講相應的引數和狀態賦值給worker。worker就可以根據狀態進入處理相應任務或者睡眠,或者退出。

Map Work

func (wt *WorkerTask) DoMapWork() {
	file, err := os.Open(wt.FileName)
	content, err := ioutil.ReadAll(file)
	file.Close()
	kvs := wt.MapFunction(wt.FileName, string(content))
	intermediate := make([][]KeyValue, wt.ReduceNum, wt.ReduceNum)
	for _, kv := range kvs {
		idx := ihash(kv.Key) % wt.ReduceNum
		intermediate[idx] = append(intermediate[idx], kv)
	}
	for idx := 0; idx < wt.ReduceNum; idx++ {
		intermediateFileName := fmt.Sprintf("mr-%d-%d", wt.MapID, idx)
		file, err = os.Create(intermediateFileName)
		data, _ := json.Marshal(intermediate[idx])
		_, err = file.Write(data)
		file.Close()
	}
	wt.ReportWorkerTask(nil)
}

func (wt *WorkerTask) ReportWorkerTask(err error) {
	wra := WorkerReportArgs{
		MapID:     wt.MapID,
		ReduceID:  wt.ReduceID,
		State:     wt.State,
		IsSuccess: true,
	}
	if wt.State == MapState {
		wra.MapTaskCnt = wt.MapTaskCnt
	} else {
		wra.ReduceTaskCnt = wt.ReduceTaskCnt
	}
	wrr := NoReply{}
	if err != nil {
		wra.IsSuccess = false
	}
	call("Master.HandleWorkerReport", &wra, &wrr)
}

Map work就是讀取相應的檔案,呼叫MapFunction生成KeyValue對,然後根據雜湊函數得到要講當前key分配到哪一塊中,總共有ReduceNum塊,最後根據這麼塊生成對應map以及reduce塊的檔案。然後呼叫ReportWorkerTask報告成功,傳入nil表示成功。ReportWorkerTask內部會呼叫Master.HandleWorkerReport函數來彙報這一執行結果。

Reduce Work

func (wt *WorkerTask) DoReduceWork() {
	kvsReduce := make(map[string][]string)
	for idx := 0; idx < wt.MapNum; idx++ {
		filename := fmt.Sprintf("mr-%d-%d", idx, wt.ReduceID)
		file, err := os.Open(filename)
		content, err := ioutil.ReadAll(file)
		file.Close()
		kvs := make([]KeyValue, 0)
		err = json.Unmarshal(content, &kvs)
		for _, kv := range kvs {
			_, ok := kvsReduce[kv.Key]
			if !ok {
				kvsReduce[kv.Key] = make([]string, 0)
			}
			kvsReduce[kv.Key] = append(kvsReduce[kv.Key], kv.Value)
		}
	}
	ReduceResult := make([]string, 0)
	for key, val := range kvsReduce {
		ReduceResult = append(ReduceResult, fmt.Sprintf("%v %v\n", key, wt.ReduceFunction(key, val)))
	}
	outFileName := fmt.Sprintf("mr-out-%d", wt.ReduceID)
	err := ioutil.WriteFile(outFileName, []byte(strings.Join(ReduceResult, "")), 0644)
	wt.ReportWorkerTask(nil)
}

這裡首先讀取相同塊的所有檔案,需要對相同key的內容聚合在一起,然後迴圈呼叫ReduceFunction得到Reduce的結果,最後生成輸出.

END:

到這裡MapReduce實現的就差不多了,關於MapReduce,總結下來是:map對每個檔案生成單詞和單一數目,分在不同的區塊儲存,Reduce對不同區塊進行統計,得到最終結果.講這兩個過程直接包裝起來就是mapreduce.

關於MapReduce的論文,可以閱讀這裡.

當然由於是04年的論文,所以現在的翻譯資源已經很豐富了(正經人誰去讀原版那種單詞都認識合成一句話就不知道講什麼的東西呢.

最後放一張過test圖片:

​ —MIT6.824 lab1 end

​ -----------2020.10.11

@copyright ------------baijianruoliorz@Github--------------------------------