匹配標籤開發:性別標籤

2020-09-29 15:00:40

絮叨兩句:
博主是一名資料分析實習生,利用部落格記錄自己所學的知識,也希望能幫助到正在學習的同學們
人的一生中會遇到各種各樣的困難和折磨,逃避是解決不了問題的,唯有以樂觀的精神去迎接生活的挑戰
少年易老學難成,一寸光陰不可輕。
最喜歡的一句話:今日事,今日畢


性別標籤開發

終於到了標籤開發的環節,九九八十一難,最後的終點也是起點,大家繼續加油

開發準備工作

↓↓↓↓↓↓↓↓↓↓點選下方連結,就可以獲取POM檔案和前期所要準備的工作!必點↓↓↓↓↓↓↓↓↓↓
企業級360°全方位人物誌:標籤開發[匹配標籤](前期準備工作)①
樣例類:HBaseMeta 與 TagRule 提前定義好樣例類,為了後面方便使用資料
在這裡插入圖片描述

HBase資料來源source,直接讀取Hbase資料會很慢,使用提前準備好的工具類讀取Hbase資料
在這裡插入圖片描述


/**
 * 匹配標籤:性別標籤開發
 */
object Gender_Tag {

  def main(args: Array[String]): Unit = {
    //1.建立sparksession 物件  用於讀取Mysql和Hbase資料庫
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Gender_Tag").getOrCreate()
    val sparkContext: SparkContext = spark.sparkContext
    sparkContext.setLogLevel("WARN")
//-------------------------------------------------------------------------



    //2.連線Mysql  用於讀取四級標籤與五級標籤
    val url: String ="jdbc:mysql://bd001:3306/tags_new?characterEncoding=UTF-8"
    val table: String ="tbl_basic_tag"//要讀取那張表
    val properties: Properties = new Properties()
    properties.put("user","root")//mysql資料庫使用者名稱
    properties.put("password","123456")//mysql資料庫密碼
//--------------------------------------------------------------------------



    //2.1讀取MySQL資料  測試讀取myslq資料是否成功
    val mysqlConnect: DataFrame = spark.read.jdbc(url, table, properties)
   //mysqlConnect.show()
 /**
     *
     * +---+----+--------+--------------------+--------+-----+---+-------------------+-------------------+-----+------+
     * | id|name|industry|                rule|business|level|pid|              ctime|              utime|state|remark|
     * +---+----+--------+--------------------+--------+-----+---+-------------------+-------------------+-----+------+
     * |  1|  電商|    null|                null|    null|    1| -1|2019-10-25 23:31:36|2019-10-25 23:31:36| null|  null|
     * |  2| 某電商|    null|                null|    null|    2|  1|2019-10-25 23:31:36|2019-10-25 23:31:36| null|  null|
     * |  3|人口屬性|    null|                null|    null|    3|  2|2019-10-25 23:31:36|2019-10-25 23:31:36| null|  null|
     * |  4|  性別|    null|inType=HBase##zkH...|    null|    4|  3|2019-10-26 15:47:32|2019-10-26 15:47:32| null|  null|
     * |  5|   男|    null|                   1|    null|    5|  4|2019-10-26 15:49:33|2019-10-26 15:49:33| null|  null|
     * |  6|   女|    null|                   2|    null|    5|  4|2019-10-26 15:49:44|2019-10-26 15:49:44| null|  null|
     * |  7|  職業|    null|inType=HBase##zkH...|    null|    4|  3|2019-10-27 02:56:58|2019-10-27 02:57:02| null|  null|
     * |  8|  學生|    null|                   1|    null|    5|  7|               null|               null| null|  null|
     * |  9| 公務員|    null|                   2|    null|    5|  7|               null|               null| null|  null|
     * | 10|  軍人|    null|                   3|    null|    5|  7|               null|               null| null|  null|
     * | 11|  警察|    null|                   4|    null|    5|  7|               null|               null| null|  null|
     * | 12|  教師|    null|                   5|    null|    5|  7|               null|               null| null|  null|
     * | 13|  白領|    null|                   6|    null|    5|  7|               null|               null| null|  null|
     * | 14| 年齡段|    null|inType=HBase##zkH...|    null|    4|  3|               null|               null| null|  null|
     * | 15| 50後|    null|   19500101-19591231|    null|    5| 14|               null|               null| null|  null|
     * | 16| 60後|    null|   19600101-19691231|    null|    5| 14|               null|               null| null|  null|
     * | 17| 70後|    null|   19700101-19791231|    null|    5| 14|               null|               null| null|  null|
     * | 18| 80後|    null|   19800101-19891231|    null|    5| 14|               null|               null| null|  null|
     * | 19| 90後|    null|   19900101-19991231|    null|    5| 14|               null|               null| null|  null|
     * | 20| 00後|    null|   20000101-20091231|    null|    5| 14|               null|               null| null|  null|
     * +---+----+--------+--------------------+--------+-----+---+-------------------+-------------------+-----+------+
     * only showing top 20 rows
     */
//--------------------------------------------------------------------------------



    //引入隱式轉換
    import  spark.implicits._
    //引入SparkSql內建函數
    import org.apache.spark.sql.functions._
    //引入java和scala 相互轉換
    import scala.collection.JavaConverters._

    //3.讀取四級標籤   開始讀取四級標籤
    val four_Tag: Dataset[Row] = mysqlConnect.select('id, 'rule).where("name='性別'")
   //four_Tag.show(false)
 /**
     * +---+-------------------------------------------------------------------------------------------------------------+
     * |id |rule                                                                                                         |
     * +---+-------------------------------------------------------------------------------------------------------------+
     * |4  |inType=HBase##zkHosts=192.168.10.20##zkPort=2181##hbaseTable=tbl_users##family=detail##selectFields=id,gender|
     * +---+-------------------------------------------------------------------------------------------------------------+
     */
      //3.1 獲取四級標籤Id    五級標籤的Pid是四級標籤的Id

    val four_Id: Int = four_Tag.map(row => {
      val id: Int = row.getAs("id").toString.toInt
      id
    }).collectAsList().get(0)
	//println("四級標籤Id:",four_Id)
    /**
     * (四級標籤Id:,4)
     */
 //3.2 獲取四級標籤rule  用於進行處理之後讀取Hbase資料
    val four_Map_Tag = four_Tag.map(row => {
      row.getAs("rule").toString
        .split("##")
        .map(kv => {
          val kvalue: Array[String] = kv.split("=")
          (kvalue(0), kvalue(1))
        })
    }).collectAsList().get(0).toMap

	//println("四級標籤rule:",four_Map_Tag)
    /**
     * (四級標籤rule:,Map(selectFields -> id,gender, inType -> HBase, zkHosts -> 192.168.10.20, zkPort -> 2181, hbaseTable -> tbl_users, family -> detail))
     */
 //3.2.1 將map封裝成樣例類 目的是為了更方便的獲取值
    val hBaseMetaCaseClass = mapToHbaseCaseClass(four_Map_Tag)
//    println("將Map轉換成樣例類之後",hBaseMetaCaseClass)
    /**
     * (將Map轉換成樣例類之後,HBaseMeta(HBase,192.168.10.20,2181,tbl_users,detail,id,gender,))
     */

  //4.讀取五級標籤
    val five_Tag: Dataset[Row] = mysqlConnect.select('id, 'rule).where("pid=" + four_Id)
//    five_Tag.show()
    /**
     * +---+----+
     * | id|rule|
     * +---+----+
     * |  5|   1|
     * |  6|   2|
     * +---+----+
     */
    //5.讀取Hbase資料
    val tbl_users: DataFrame = spark.read.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
      .option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
      .option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
      .option(HBaseMeta.HBASETABLE, hBaseMetaCaseClass.hbaseTable)
      .option(HBaseMeta.SELECTFIELDS, hBaseMetaCaseClass.selectFields)
      .load()
//    tbl_users.show()

    /**
     * +---+------+
     * | id|gender|
     * +---+------+
     * |  1|     2|
     * | 10|     2|
     * |100|     2|
     * |101|     1|
     * |102|     2|
     * |103|     1|
     * |104|     1|
     * |105|     2|
     * |106|     1|
     * |107|     1|
     * |108|     1|
     * |109|     1|
     * | 11|     2|
     * |110|     2|
     * |111|     1|
     * |112|     2|
     * |113|     1|
     * |114|     1|
     * |115|     1|
     * |116|     2|
     * +---+------+
     * only showing top 20 rows
     */
//-----------------------上面程式碼已經將四級標籤與五級標籤的資料拿了出來,通過四級標籤的結果讀取Hbase的資料----------------------

 //6.將五級標籤與tbl_users[使用者表]進行匹配
 val five_List_Tag: List[TagRule] = five_Tag.map(row => {
      val id = row.getAs("id").toString.toInt
      val rule = row.getAs("rule").toString
      TagRule(rule = rule, id = id)
    }).collectAsList().asScala.toList
   // println("five_List_Tag:",five_List_Tag)
    /**
     * (five_List_Tag:,List(TagRule(5,1), TagRule(6,2)))
     */

    /**
     * 寫一個UDF函數用於將habse的資料與五級標籤進行匹配

     */
     val userDefinedFunction = udf((parameter: String) => {
       var id=0
       for (elem <- five_List_Tag) {
         if (elem.rule == parameter) {
           id=elem.id
           }
         }
       id
     })
    //得到最終的結果標籤
    val new_Tag: DataFrame = tbl_users.select('id.as("userId"), userDefinedFunction('gender).as("tagsId"))
    new_Tag.show()
    /**
     * +------+------+
     * |userId|tagsId|
     * +------+------+
     * |     1|     6|
     * |    10|     6|
     * |   100|     6|
     * |   101|     5|
     * |   102|     6|
     * |   103|     5|
     * |   104|     5|
     * |   105|     6|
     * |   106|     5|
     * |   107|     5|
     * |   108|     5|
     * |   109|     5|
     * |    11|     6|
     * |   110|     6|
     * |   111|     5|
     * |   112|     6|
     * |   113|     5|
     * |   114|     5|
     * |   115|     5|
     * |   116|     6|
     * +------+------+
     * only showing top 20 rows
     */
 //7.讀取Hbase的歷史資料,將新資料與老資料合併
    // 考慮,hbase中最終標籤表裡已經有資料了,直接將新的資料寫入,會發生什麼問題? 答:會覆蓋
    //考慮,現在已經通過追加的方式解決了覆蓋的問題,如相同的程式多跑幾次會發生什麼問題? 答會重複
    //重複問題的解決辦法,在追加資料之後,程序一次去重操作就可以了
    val old_Tag: DataFrame = spark.read.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
      .option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
      .option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
      .option(HBaseMeta.HBASETABLE, "test")
      .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
      .load()
  //    old_Tag.show()
    /**
     * 還沒有寫入資料,
     * +------+------+
     * |userId|tagsId|
     * +------+------+
     * +------+------+
     */
    //7.1開始合併資料
    if (old_Tag.count()==0){
    //證明還沒有資料直接將資料寫入
    new_Tag.write.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
      .option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
      .option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
      .option(HBaseMeta.HBASETABLE, "test")
      .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
      .save()

    }else{
      val append_Tag: UserDefinedFunction = udf((old_T: String, new_T: String) => {
        println(old_T,new_T)
        if (old_T=="") {
          new_T
        } else if (new_T=="") {
          old_T
        } else if (old_T=="" && new_T=="") {
          ""
        }else{
          val all_T = old_T + "," + new_T
          //進行去重
          val all_TAG = all_T.split(",").distinct.mkString(",")
          all_TAG
        }

      })
      val old_Append_new: DataFrame = old_Tag.join(new_Tag,old_Tag.col("userId")===new_Tag.col("userId"))
        .select(
          when(old_Tag.col("userId").isNotNull, old_Tag.col("userId"))
            .when(new_Tag.col("userId").isNotNull, new_Tag.col("userId")).as("userId"),
          append_Tag(old_Tag.col("tagsId"), new_Tag.col("tagsId")).as("tagsId"))
  
//8.將最終結果寫入到Hbase中            old_Append_new.write.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
        .option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
        .option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
        .option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
        .option(HBaseMeta.HBASETABLE, "test")
        .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
        .save()
    }
}
  def mapToHbaseCaseClass(four_Map_Tag: Map[String, String]) = {
    HBaseMeta(
      inType = four_Map_Tag.getOrElse(HBaseMeta.INTYPE,""),
      zkHosts = four_Map_Tag.getOrElse(HBaseMeta.ZKHOSTS,""),
      zkPort = four_Map_Tag.getOrElse(HBaseMeta.ZKPORT,""),
      hbaseTable = four_Map_Tag.getOrElse(HBaseMeta.HBASETABLE,""),
      family = four_Map_Tag.getOrElse(HBaseMeta.FAMILY,""),
      selectFields = four_Map_Tag.getOrElse(HBaseMeta.SELECTFIELDS,""),
      rowKey = four_Map_Tag.getOrElse(HBaseMeta.ROWKEY,""))
  }
}

總結

開發流程:

  1. 建立sparksession 物件 用於讀取Mysql和Hbase資料庫
  2. 連線Mysql 用於讀取四級標籤與五級標籤
  3. 讀取四級標籤
  4. 讀取五級標籤
  5. 讀取Hbase資料 根據第3步處理好的資料用來讀取Hbase資料
  6. 將五級標籤與tbl_users[使用者表]進行匹配
  7. 讀取Hbase的歷史資料,將新資料與老資料合併
  8. 將最終資料寫入到Hbase

本篇部落格主要為大家提供了匹配型性別標籤如何進行開發的一個步驟流程。如有不懂得地方可以私信我,然後幫你講解

	如有什麼不對的地方,還請幫忙糾正錯誤!

在這裡插入圖片描述