hadoop實現求共同好友

2022-01-13 21:00:02

前言

在很多社交APP中,比如大家熟悉的QQ好友列表中,開啟對談框,經常可以看到下面有一欄共同好友的推薦列表,使用者通過這種方式,可以新增潛在的關聯好友
在這裡插入圖片描述
這種功能該如何實現呢?對redis比較瞭解的同學應該能很快想到,可以使用redis來實現這個功能。沒錯,redis確實是個不錯的可以實現這個功能的方案。

但redis的實現有一定的侷限性,因為redis儲存和資料和計算時需要耗費較多的記憶體資源,設想一下,像騰訊QQ這樣的規模,如果用這種方式做的話,估計Redis伺服器的投入成本將是一筆不小的開銷。

利用hadoop中的MapReduce同樣可以實現這個功能,該如何實現呢?

業務分析

下面是原始的資料檔案,第一欄可理解為本人,第二行為該使用者的好友列表,以逗號分割,比如A使用者的好友包括:B,C,D,F,E,O這幾個,後面的行依次類推

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

現在的需求是:通過原始的資料檔案,輸出該檔案中所有使用者中哪些人兩兩之間存在共同好友並輸出,格式如下:

A-B C,E
A-C	F,D
A-D	E,F
......

實現思路分析

步驟一:將原始資料拆分為如下格式

通過這一步,得到一組K/V,可以清晰的反映出一個使用者的所有好友

B:A			#B是A的好友
C:A			#C是A的好友
D:A			#D是A的好友
F:A
E:A
O:A

A:B
C:B
E:B
K:B

F:C
A:C
D:C
I:C

B:E
C:E
D:E
M:E
L:E

步驟二、對第一步的資料進一步處理成如下格式

從第一步格式完畢後的資料,可以很明顯看出並總結出一個規律,那就是左邊那些使用者的好友列表,以C使用者為例,可以看出C這個使用者有A,B,E三個好友,反過來講,ABE這三個使用者,他們有一個共同的好友A

其他的類推進行理解

C  A-B-E  #C是A和B和E的共同好友
D  A-C	  #D是A和B的共同好友
A  B-C	  #A是B和C的共同好友
B  A-E    #A是E和B的共同好友
......

步驟三、將步驟二中的資料調換位置

從步驟2中我們得知,C的好友有ABE,反過來說,ABE他們的共同好友有C,針對這種超過3個的,可以考慮下一步進行兩兩組合即可

A-B-E   C     #A、B、E有共同好友C
A-C     D     #A與C有共同好友D
B-C     A     #B與C有共同好友A
A-E     B     #A與E有共同好友B

步驟四、將步驟三得到的資料繼續拆分

步驟三中,像 : A-B-E C 這種資料,顯然需要進一步拆分,因為最終的結果是求取兩兩好友之間的共同好友,所以可以拆為: A-B C,A-E C,B-E C,為下一步資料組合做最後的準備

A-B  C
A-E  C
B-E  C
A-C  D
B-C  A
A-E  B
......

步驟五、將步驟四得到的資料合併

在使用MapReduce程式設計中我們知道,Map階段出去的資料,進入reduce方法中的資料都是key相同的,以第四步中的: A-E 這個key為例,就有2個,這樣通過 reduce方法最終輸出的結果就是: A-E C,B ,即A-E 這兩個使用者的共同好友為 C和B

A-B  C        #A,B共同好友有C
A-E  C,B      #A,E有共同好友 C,B
B-E  C        #B,E有共同好友 C
A-C  D        #A,C有共同好友 D
B-C  A        #B,C有共同好友 A
......

通過以上的資料分析,最終可以達到預期的效果,同時也可以看出,上面的步驟劃分到MapRedcue中,顯然一個MapReduce肯定是無法完成的,至少需要2個

下面是結合上面的步驟分析,得出需要兩個MapReduce的資料流程圖,參考這個圖來協助我們分析編寫程式碼邏輯做參考
在這裡插入圖片描述

編碼實現

1、第一個map類

public class FirstMapper extends Mapper<LongWritable,Text,Text,Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String val = value.toString();
        String[] split = val.split(":");
        //A:B,C,D,F,E,O  拆分後,左邊是原使用者,右邊是好友
        String user = split[0];
        String friends = split[1];
        String[] friendLists = friends.split(",");
        //Map1 輸出的結果為 :
        /**
         * B A
         * C A
         * D A
         * F A
         * E A
         */
        for(String str :friendLists ){
            context.write(new Text(str),new Text(user));
        }
    }

}

2、第一個Reduce類

public class FirstReducer extends Reducer<Text,Text,Text,Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuffer stringBuffer = new StringBuffer();
        for (Text text : values){
            stringBuffer.append(text).append("-");
        }
        //最終寫出去的資料格式為: A-E B ......
        context.write(new Text(stringBuffer.toString()),key);
    }

}

3、第一個Job類

public class FirstJob {

    public static void main(String[] args) throws Exception {

        //1、獲取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2、設定jar路徑
        job.setJarByClass(FirstJob.class);

        //3、關聯mapper 和 Reducer
        job.setMapperClass(FirstMapper.class);
        job.setReducerClass(FirstReducer.class);

        //4、設定 map輸出的 key/val 的型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //5、設定最終輸出的key / val 型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //6、設定最終的輸出路徑
        String inputPath = "F:\\網路硬碟\\csv\\friends.txt";
        String outPath = "F:\\網路硬碟\\csv\\friends1";

        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outPath));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

執行上面的Job程式碼,然後開啟執行完畢後的第一個階段的檔案,從內容格式上看,符合第一階段的輸出結果要求的, 即下面的這種資料格式
在這裡插入圖片描述
在這裡插入圖片描述

4、第二個map類

public class SecondMapper extends Mapper<LongWritable,Text,Text,Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // I-K-C-B-G-F-H-O-D-	A  階段1的檔案輸出格式
        /**
         * 最終輸出格式:
         * I-K A
         * I-C A
         * I-B A
         * ......
         */
        //需要將左邊的資料進行兩兩拆分,與V進行組合輸出
        String val = value.toString();
        String[] split = val.split("\t");

        String v2 = split[1];
        String[] allUsers = split[0].split("-");
        Arrays.sort(allUsers);

        for(int i=0;i<allUsers.length-1;i++){
            for(int j=i+1;j<allUsers.length;j++){
                context.write(new Text(allUsers[i] + "-" + allUsers[j]),new Text(v2));
            }
        }
    }
}

5、第二個Reducer類

public class SecondReducer extends Reducer<Text,Text,Text,Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //上一步輸出的結果:
        /**
         * A-B C
         * A-B D
         * A-E C
         * A-E D
         * ......
         */
        //只需要將相同的key的Val進行組合即可,即 : A-B C-D,A-E C-D
        StringBuffer stringBuffer = new StringBuffer();
        for (Text text :values ){
            stringBuffer.append(text.toString()).append("-");
        }
        context.write(key,new Text(stringBuffer.toString()));
    }

}

6、第二個Job類

public class SecondJob {
    public static void main(String[] args) throws Exception {

        //1、獲取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2、設定jar路徑
        job.setJarByClass(SecondJob.class);

        //3、關聯mapper 和 Reducer
        job.setMapperClass(SecondMapper.class);
        job.setReducerClass(SecondReducer.class);

        //4、設定 map輸出的 key/val 的型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //5、設定最終輸出的key / val 型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //6、設定最終的輸出路徑
        String inputPath = "F:\\網路硬碟\\csv\\friends1\\part-r-00000";
        String outPath = "F:\\網路硬碟\\csv\\friends2";

        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outPath));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

執行上面的Job程式碼,檢視最終的輸出結果,可以看到,也是符合我們預期的業務的
在這裡插入圖片描述