相信很多小夥伴都背過shuffle的八股文,但一直不是很理解shuffle的過程,這次我通過原始碼來解讀下shuffle過程,加深對shuffle的理解,但是我自己還是個菜鳥,這篇部落格也是參考了很多資料,如果有不對的地方,請指正。
shuffle是Map Task和 Reduce Task之間的一個階段,本質上是一個跨節點跨程序間的資料傳輸,網上的資料也把MapReduce的過程細分為六個階段:
看過原始碼之後,這幾個階段劃分的還是很有道理的,首先看看官網上對shuffle的描述圖,有個印象
首先,我們先來看看Map階段的程式碼,先找到Map Task的入口(org/apache/hadoop/mapred/MapTask.java)的run方法,當map task啟動時都會執行這個方法。
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical; // 一個taskAttempt的代理,後面比較多的地方使用
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
// 啟動任務狀態彙報器,其內部有周期性的彙報執行緒(狀態彙報和心跳)
TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi); // 重要方法,可以認為初始化task啟動的一切資源了
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter); // 核心程式碼,點進去
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
這裡umbilical比較難理解,我其實也沒怎麼搞懂,看名字是個協定,這裡貼出它的註釋
任務子程序用於聯絡其父程序的協定。父程序是一個守護行程,它輪詢中央主程序以獲取新的map或reduce Task,並將其作為子程序(Child)執行。孩子和父母之間的所有通訊都是通過此協定進行的
看起來是個RPC,這個父程序我不是很清楚,我理解是在v1版本的話,這個可能是taskTracker,如果在v2版本(yarn)可能是ApplicationMaster,如果不對,請大神解答我的疑問。
進入runNewMapper方法
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes 建立Task的上下文環境
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper 通過反射建立mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format 通過反射建立inputFormat,來讀取資料
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split // 獲取切片資訊
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE> //通過反射建立RecordReader。InputFormat是通過RecordReader來讀取資料,這個也是大學問,在job submit時很關鍵
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) { // 如果沒有reduce任務,則直接寫入磁碟
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else { // 核心程式碼,建立collector收集器 ,點進去
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
input.initialize(split, mapperContext);
mapper.run(mapperContext); // 呼叫我們自己實現的mapper類
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
馬上進入collect階段了,點進 NewOutputCollector,看看如何建立Collector
private class NewOutputCollector<K,V>
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
private final MapOutputCollector<K,V> collector;
private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
private final int partitions;
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks(); // partitions數等於reduce任務數
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value, // 向對應分割區的環形緩衝區寫入(k,v)
partitioner.getPartition(key, value, partitions));
}
@Override
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();//核心方法,將資料刷出去。
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
}
點進 creareSortingCollector
@SuppressWarnings("unchecked")
private <KEY, VALUE> MapOutputCollector<KEY, VALUE> // collector是map 型別
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
Class<?>[] collectorClasses = job.getClasses( // 獲取Map Collector的型別
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class); // 說到底還是MapOutputBuffer型別
int remainingCollectors = collectorClasses.length;
Exception lastException = null;
for (Class clazz : collectorClasses) {
try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) { // MapOutputCollector是不是clazz或者其父類別
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
}
Class<? extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
MapOutputCollector<KEY, VALUE> collector =
ReflectionUtils.newInstance(subclazz, job); // 建立collector
collector.init(context); // 初始化 點進去
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector;
} catch (Exception e) {
String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
if (--remainingCollectors > 0) {
msg += " (" + remainingCollectors + " more collector(s) to try)";
}
lastException = e;
LOG.warn(msg, e);
}
}
}
這個init
方法十分的關鍵,不僅涉及了環形緩衝區
,還涉及了Spill
public void init(MapOutputCollector.Context context
// 這個方法中,主要就是對收集器物件進行一些初始化
) throws IOException, ClassNotFoundException {
job = context.getJobConf();
reporter = context.getReporter();
mapTask = context.getMapTask();
mapOutputFile = mapTask.getMapOutputFile();
sortPhase = mapTask.getSortPhase();
spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanity checks
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); // 設定環形緩衝區溢寫比例為0.8
final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
MRJobConfig.DEFAULT_IO_SORT_MB); // 預設環形緩衝區大小為100M
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
}
// 排序,預設使用的快排
// 獲取到排序物件,在資料由環形緩衝區溢寫到磁碟中前
// 並且排序是針對索引的,並非對資料進行排序。
sorter = ReflectionUtils.newInstance(job.getClass(
MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
IndexedSorter.class), job);
// buffers and accounting
// 對環形緩衝區初始化,大名鼎鼎的環形緩衝區本質上是個byte陣列
int maxMemUsage = sortmb << 20; // 將MB轉換為Bytes
// 一對kv資料有四個後設資料MATE,分別是valstart,keystart,partitions,vallen,都是int型別
// METASIZE 就是4個int轉換成byte就是4*4
maxMemUsage -= maxMemUsage % METASIZE; // 計算METE資料儲存的大小
kvbuffer = new byte[maxMemUsage]; // 後設資料陣列 以byte為單位
bufvoid = kvbuffer.length;
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer(); // 將byte單位的kvbuffer轉換成int單位的kvmeta
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
// kvmeta中存放後設資料實體的最大個數
maxRec = kvmeta.capacity() / NMETA;
softLimit = (int)(kvbuffer.length * spillper); // buffer 溢寫的閾值
bufferRemaining = softLimit;
LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
LOG.info("soft limit at " + softLimit);
LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
// k/v serialization
comparator = job.getOutputKeyComparator();
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb); // 將key寫入bb中 blockingbuffer
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb); // 將value寫入bb中
// output counters
mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter =
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
// compression 壓縮器,減少shuffle資料量
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
} else {
codec = null;
}
// combiner
// combiner map端的reduce
final Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
// 溢寫執行緒
spillInProgress = false;
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
spillThread.setDaemon(true); // 是個守護執行緒
spillThread.setName("SpillThread"); //
spillLock.lock();
try {
spillThread.start(); // 啟動一個spill執行緒
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw new IOException("Spill thread failed to initialize",
sortSpillException);
}
}
從這個類,我們可以看到環形緩衝區
的一些初始化過程,如大小為100M,開始溢寫的比例是0.8,實際上,Collector是一個宏觀的概念,本質上就是一個MapOutputBuffer物件。
後面還啟動了Spill
執行緒,不過如果是第一次進去會被阻塞這裡我們先按下不表。
至此,一些map開始之前的工作已經準備好了,至於它是怎麼工作的我們可以從我們寫的mapper中write方法debug進去,發現其實還是NewOutputCollector
中定義的write方法,點進去是MapOutputBuffer
的collect方法
public synchronized void collect(K key, V value, final int partition
) throws IOException {
reporter.progress();
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
+ key.getClass().getName());
}
if (value.getClass() != valClass) {
throw new IOException("Type mismatch in value from map: expected "
+ valClass.getName() + ", received "
+ value.getClass().getName());
}
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
checkSpillException();
bufferRemaining -= METASIZE; // 新資料collect時,先將後設資料長度前去,之後判斷
if (bufferRemaining <= 0) { // 說明已經超過閾值了
// start spill if the thread is not running and the soft limit has been
// reached
spillLock.lock();
try {
do {
// 首次spill時,spillInProgress是false
if (!spillInProgress) {
final int kvbidx = 4 * kvindex; // 單位是byte
final int kvbend = 4 * kvend; // 單位是byte
// serialized, unspilled bytes always lie between kvindex and
// bufindex, crossing the equator. Note that any void space
// created by a reset must be included in "used" bytes
final int bUsed = distanceTo(kvbidx, bufindex); // 剩下可以寫入的空間大小
final boolean bufsoftlimit = bUsed >= softLimit; // true說明已經超過softLimit了
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
resetSpill();
bufferRemaining = Math.min(
distanceTo(bufindex, kvbidx) - 2 * METASIZE,
softLimit - bUsed) - METASIZE; // 這裡是重新選擇equator吧,但是計算方式不瞭解
continue;
} else if (bufsoftlimit && kvindex != kvend) {
// spill records, if any collected; check latter, as it may
// be possible for metadata alignment to hit spill pcnt
startSpill(); //開始溢寫,裡面喚醒spill執行緒
final int avgRec = (int)
(mapOutputByteCounter.getCounter() /
mapOutputRecordCounter.getCounter());
// leave at least half the split buffer for serialization data
// ensure that kvindex >= bufindex
final int distkvi = distanceTo(bufindex, kvbidx);
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
distkvi / (METASIZE + avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
// bytes remaining before the lock must be held and limits
// checked is the minimum of three arcs: the metadata space, the
// serialization space, and the soft limit
bufferRemaining = Math.min(
// metadata max
distanceTo(bufend, newPos),
Math.min(
// serialization max
distanceTo(newPos, serBound),
// soft limit
softLimit)) - 2 * METASIZE;
}
}
} while (false); // 這是什麼寫法?????
} finally {
spillLock.unlock();
}
}
// 直接寫入buffer,不涉及spill
try {
// serialize key bytes into buffer
int keystart = bufindex;
keySerializer.serialize(key);
// key所佔空間被bufvoid分隔,則移動key,
// 將其值放在連續的空間中便於sort時key的對比
if (bufindex < keystart) {
// wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
// serialize value bytes into buffer
final int valstart = bufindex;
valSerializer.serialize(value);
// It's possible for records to have zero length, i.e. the serializer
// will perform no writes. To ensure that the boundary conditions are
// checked and that the kvindex invariant is maintained, perform a
// zero-length write into the buffer. The logic monitoring this could be
// moved into collect, but this is cleaner and inexpensive. For now, it
// is acceptable.
bb.write(b0, 0, 0);
// the record must be marked after the preceding write, as the metadata
// for this record are not yet written
int valend = bb.markRecord();
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid)); //計數器+1
// write accounting info
kvmeta.put(kvindex + PARTITION,
);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// advance kvindex
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition); // 長record就直接寫入磁碟
mapOutputRecordCounter.increment(1);
return;
}
}
這裡首先最重要的方法就是第46行的startSpill()方法,這裡點進去會發現一個spillReady.signal(),這就是喚醒之前因spillReady.await()方法阻塞的spill執行緒,這裡的spillReady就是可重入鎖,這裡spill開始正式工作,這裡涉及了環形緩衝區如何寫和如何讀,會比較抽象,我之後再寫一篇關於環形緩衝區的文章。
這裡程式碼就是Collect,本質上就是map端將輸出的(k,v)資料和它的後設資料寫入MapOutputBuffer中。
此外,這個程式碼裡也有喚醒spill執行緒的程式碼,找到SpillThread的run方法,很明顯裡面有個很重要的方法sortAndSpill
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
final long size = distanceTo(bufstart, bufend, bufvoid) +
partitions * APPROX_HEADER_LENGTH; // 寫出長度
FSDataOutputStream out = null;
FSDataOutputStream partitionOut = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);// 預設是output/spillx.out
out = rfs.create(filename);// 建立分割區檔案
final int mstart = kvend / NMETA;
final int mend = 1 + // kvend is a valid record
(kvstart >= kvend
? kvstart
: kvmeta.capacity() + kvstart) / NMETA;
// 對後設資料進行排序,先按照partition進行排序,再按照key值進行排序
// 二次排序,排的是後設資料部分
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
int spindex = mstart;
final IndexRecord rec = new IndexRecord();
final InMemValBytes value = new InMemValBytes();
for (int i = 0; i < partitions; ++i) {//迴圈分割區
// 溢寫時的臨時檔案 型別是IFile
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
// 寫入相同的partition資料
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else { // 進行combiner,避免小檔案問題
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}
// close the writer
writer.close(); /// 將檔案寫入本地磁碟中,不是HDFS上
if (partitionOut != out) {
partitionOut.close();
partitionOut = null;
}
// record offsets
// 記錄當前partition i的資訊寫入索檔案rec中
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
//spillRec中存放了spill中partition的資訊
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) writer.close();
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job); // 將記憶體中的index檔案寫入磁碟
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
if (partitionOut != null) {
partitionOut.close();
}
}
}
很明顯,spill有兩個臨時檔案生成,一個是(k,v)檔案,它儲存在預設路徑是output/spill{x}.out檔案中,注意,這段程式碼裡並沒有明顯的將(k,v)檔案寫入磁碟的程式碼,這些程式碼在writer.close()中實現。而另一個明顯寫入磁碟的是spillRec.writeToFile(indexFilename, job)
,這個存放的每個partition的index。
在SpillThread在辛辛苦苦進行sortAndSpill工作時,map Task 也不斷地產生新(k,v)寫入MapOutputBuffer中,環形緩衝區的讀執行緒和寫執行緒同時工作!!怎麼避免衝突呢?答案是反向寫。
紅色箭頭是寫(k,v)資料,藍色箭頭是寫後設資料,紫色是預留的百分之20的空間不能寫,綠色是已經寫入的資料部分,正在被spill執行緒讀取操作。
至此,spill和sort階段算是大功告成,那麼還有個疑問,如果MapOutPutBuffer還有部分資料,但這部分資料並沒有達到spill的標準,怎麼辦呢?還是回到NewOutputCollector
部分中close
方法,裡面有MapOutputBuffer的flush方法會解決這個問題。
最後就是Map Task中Shuffle過程的最後一個階段Merge,這部分有點多就不貼程式碼了,感興趣的同學可以檢視MapOutputBuffer中mergeParts方法,這個方法在上面的flush方法裡呼叫,該作用是合併spill階段產生出來的out檔案和index檔案。
Merge
過程目的很簡單,但是過程確實很複雜。首先,Merge
過程會掃描目錄獲取out檔案的地址,存放一個陣列中,同時也會獲得index檔案,存放到另一個陣列中。好奇的同學可能再想既然又要讀入到記憶體中,當初為啥要刷進磁碟裡呢,這不是閒著沒事幹嘛,確實,這是MapReduce的缺陷,太過於批次處理了,磁碟IO也限制了它的其他可能性,比如機器學習需要反覆迭代,MapReduce就做不了這個,但是這一步確實很有必要的,因為早期記憶體很貴,不是每個人都是土豪的,考慮到OOM的風險,把所有的(K,V)資料和index資料刷進磁碟是非常有必要的,但是後面又可以全讀入記憶體,那是因為快取緩衝區
這個大東西已經不再使用,記憶體就富裕起來了。
同時,Merge
過程還涉及到歸併演演算法
,這個並不是簡單的歸併
過程,而是一個很複雜的過程,因為考慮到一個partition並不只存在一種key,所以原始碼裡有著相當複雜的過程同時註釋也很迷惑人,註釋裡有優先佇列和Heap的字樣,看程式碼的時候可能以為採用了堆排序,有興趣的同學可以看看,並不是太重要(ps我也看得一知半解)。
Reduce部分我就長話短說,只看重點了。
同樣,第一步就是檢視 Reduce Task的run方法,這是啟動redduce邏輯的自動過程
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) { // reduce的三個階段
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
// 啟動任務狀態彙報器,其內部有周期性的彙報執行緒(狀態彙報和心跳)
TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewReducer();
initialize(job, getJobID(), reporter, useNewApi);//核心程式碼,初始化任務
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
// 設定shuffle外掛
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);
// 執行shuffle過程中的遠端資料拉取,在拉取的過程中
// 內部 啟動 map-completion event fetch執行緒 獲取map端完成的event資訊
// 在開啟預設5個的fetch 執行緒 拉取資料,裡面核心函數就是一直點進去是doShuffle,有兩種一種是in-memory另一種就是on-disk
// 超出shuffle記憶體就merge到disk
// shuffle外掛內部有個mergeMangager 會在合適的時候就是快超過shuffle記憶體快取的時候,啟動merge執行緒
// 這個表面是一次網路IO,本質上是一個RPC,通過umbilical代理獲取已經完成的MapTask任務的taskAttempt的ID,存入schedule中,為後面shuffle做準備
rIter = shuffleConsumerPlugin.run();
// free up the data structures
// 一個sort set,是TreeSet資料結構·
mapOutputFilesOnDisk.clear();
sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass); // 執行reduce操作,(使用者定義的邏輯)
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
Reduce Task的重點比較清晰,就是57行的初始化shuffleConsumerPlugin
這個Shuffle外掛,以及66行執行這個外掛,讓他拉取資料。
初始化shuffle外掛過程中,有兩個元件一個是schedule排程器,另一個就是MergeManager,這個MergeManger有大用處。
接下來檢視run方法
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
// TODO: This should not be necessary after HADOOP-8942
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
// 啟動 一個 event fetcher執行緒 獲取map端完成的event資訊
final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
// Start the map-output fetcher threads 啟動fetch執行緒
// fetch 執行緒 遠端從map端拉取對應partition的資料
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
// stop the scheduler
scheduler.close();
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
重點就是兩種執行緒,一種是Event fetch,另一種是fetch執行緒
首先,event fetch執行緒的作用是獲取TaskAttempt的ID等資訊,存入schedule中,方面以後Shuffle尤其是sort時使用,本質上這是個RPC,注意看event fetch初始化時的引數裡有個umbilical
代理物件。
而fetch執行緒的工作原理是通過HTTP向各個Map任務拖取它所需要的資料(至於HTTP和RPC的區別有興趣的同學可以查查),裡面最核心的方法是doShuffle
(一直點進去才能找到這個),在Copy的同時還會MergeSort。doShuffle它有兩個實現,一個是In-memory,另一個是On-disk有兩個實現(同樣的,Merge也分為這兩種)。是基於考慮到拉取相同的key值可能有很大的資料量,那麼有必要寫入磁碟中了,但為了減少這種情況,在達到快取區
(預設是64K)閾值的時候會將資料merge(如果太大的話就在磁碟中merge),Merge的工作就是交給Shuffle外掛的MergeManager管理。
所以,copy和Merge和Sort是重疊過程的。
至此,Shuffle部分的原始碼基本講解完成。