絮叨兩句:
博主是一名資料分析實習生,利用部落格記錄自己所學的知識,也希望能幫助到正在學習的同學們
人的一生中會遇到各種各樣的困難和折磨,逃避是解決不了問題的,唯有以樂觀的精神去迎接生活的挑戰
少年易老學難成,一寸光陰不可輕。
最喜歡的一句話:今日事,今日畢
終於到了標籤開發的環節,九九八十一難,最後的終點也是起點,大家繼續加油
↓↓↓↓↓↓↓↓↓↓點選下方連結,就可以獲取POM檔案和前期所要準備的工作!必點↓↓↓↓↓↓↓↓↓↓
企業級360°全方位人物誌:標籤開發[匹配標籤](前期準備工作)①
樣例類:HBaseMeta 與 TagRule 提前定義好樣例類,為了後面方便使用資料
HBase資料來源source,直接讀取Hbase資料會很慢,使用提前準備好的工具類讀取Hbase資料
/**
* 匹配標籤:性別標籤開發
*/
object Gender_Tag {
def main(args: Array[String]): Unit = {
//1.建立sparksession 物件 用於讀取Mysql和Hbase資料庫
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Gender_Tag").getOrCreate()
val sparkContext: SparkContext = spark.sparkContext
sparkContext.setLogLevel("WARN")
//-------------------------------------------------------------------------
//2.連線Mysql 用於讀取四級標籤與五級標籤
val url: String ="jdbc:mysql://bd001:3306/tags_new?characterEncoding=UTF-8"
val table: String ="tbl_basic_tag"//要讀取那張表
val properties: Properties = new Properties()
properties.put("user","root")//mysql資料庫使用者名稱
properties.put("password","123456")//mysql資料庫密碼
//--------------------------------------------------------------------------
//2.1讀取MySQL資料 測試讀取myslq資料是否成功
val mysqlConnect: DataFrame = spark.read.jdbc(url, table, properties)
//mysqlConnect.show()
/**
*
* +---+----+--------+--------------------+--------+-----+---+-------------------+-------------------+-----+------+
* | id|name|industry| rule|business|level|pid| ctime| utime|state|remark|
* +---+----+--------+--------------------+--------+-----+---+-------------------+-------------------+-----+------+
* | 1| 電商| null| null| null| 1| -1|2019-10-25 23:31:36|2019-10-25 23:31:36| null| null|
* | 2| 某電商| null| null| null| 2| 1|2019-10-25 23:31:36|2019-10-25 23:31:36| null| null|
* | 3|人口屬性| null| null| null| 3| 2|2019-10-25 23:31:36|2019-10-25 23:31:36| null| null|
* | 4| 性別| null|inType=HBase##zkH...| null| 4| 3|2019-10-26 15:47:32|2019-10-26 15:47:32| null| null|
* | 5| 男| null| 1| null| 5| 4|2019-10-26 15:49:33|2019-10-26 15:49:33| null| null|
* | 6| 女| null| 2| null| 5| 4|2019-10-26 15:49:44|2019-10-26 15:49:44| null| null|
* | 7| 職業| null|inType=HBase##zkH...| null| 4| 3|2019-10-27 02:56:58|2019-10-27 02:57:02| null| null|
* | 8| 學生| null| 1| null| 5| 7| null| null| null| null|
* | 9| 公務員| null| 2| null| 5| 7| null| null| null| null|
* | 10| 軍人| null| 3| null| 5| 7| null| null| null| null|
* | 11| 警察| null| 4| null| 5| 7| null| null| null| null|
* | 12| 教師| null| 5| null| 5| 7| null| null| null| null|
* | 13| 白領| null| 6| null| 5| 7| null| null| null| null|
* | 14| 年齡段| null|inType=HBase##zkH...| null| 4| 3| null| null| null| null|
* | 15| 50後| null| 19500101-19591231| null| 5| 14| null| null| null| null|
* | 16| 60後| null| 19600101-19691231| null| 5| 14| null| null| null| null|
* | 17| 70後| null| 19700101-19791231| null| 5| 14| null| null| null| null|
* | 18| 80後| null| 19800101-19891231| null| 5| 14| null| null| null| null|
* | 19| 90後| null| 19900101-19991231| null| 5| 14| null| null| null| null|
* | 20| 00後| null| 20000101-20091231| null| 5| 14| null| null| null| null|
* +---+----+--------+--------------------+--------+-----+---+-------------------+-------------------+-----+------+
* only showing top 20 rows
*/
//--------------------------------------------------------------------------------
//引入隱式轉換
import spark.implicits._
//引入SparkSql內建函數
import org.apache.spark.sql.functions._
//引入java和scala 相互轉換
import scala.collection.JavaConverters._
//3.讀取四級標籤 開始讀取四級標籤
val four_Tag: Dataset[Row] = mysqlConnect.select('id, 'rule).where("name='性別'")
//four_Tag.show(false)
/**
* +---+-------------------------------------------------------------------------------------------------------------+
* |id |rule |
* +---+-------------------------------------------------------------------------------------------------------------+
* |4 |inType=HBase##zkHosts=192.168.10.20##zkPort=2181##hbaseTable=tbl_users##family=detail##selectFields=id,gender|
* +---+-------------------------------------------------------------------------------------------------------------+
*/
//3.1 獲取四級標籤Id 五級標籤的Pid是四級標籤的Id
val four_Id: Int = four_Tag.map(row => {
val id: Int = row.getAs("id").toString.toInt
id
}).collectAsList().get(0)
//println("四級標籤Id:",four_Id)
/**
* (四級標籤Id:,4)
*/
//3.2 獲取四級標籤rule 用於進行處理之後讀取Hbase資料
val four_Map_Tag = four_Tag.map(row => {
row.getAs("rule").toString
.split("##")
.map(kv => {
val kvalue: Array[String] = kv.split("=")
(kvalue(0), kvalue(1))
})
}).collectAsList().get(0).toMap
//println("四級標籤rule:",four_Map_Tag)
/**
* (四級標籤rule:,Map(selectFields -> id,gender, inType -> HBase, zkHosts -> 192.168.10.20, zkPort -> 2181, hbaseTable -> tbl_users, family -> detail))
*/
//3.2.1 將map封裝成樣例類 目的是為了更方便的獲取值
val hBaseMetaCaseClass = mapToHbaseCaseClass(four_Map_Tag)
// println("將Map轉換成樣例類之後",hBaseMetaCaseClass)
/**
* (將Map轉換成樣例類之後,HBaseMeta(HBase,192.168.10.20,2181,tbl_users,detail,id,gender,))
*/
//4.讀取五級標籤
val five_Tag: Dataset[Row] = mysqlConnect.select('id, 'rule).where("pid=" + four_Id)
// five_Tag.show()
/**
* +---+----+
* | id|rule|
* +---+----+
* | 5| 1|
* | 6| 2|
* +---+----+
*/
//5.讀取Hbase資料
val tbl_users: DataFrame = spark.read.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
.option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
.option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
.option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
.option(HBaseMeta.HBASETABLE, hBaseMetaCaseClass.hbaseTable)
.option(HBaseMeta.SELECTFIELDS, hBaseMetaCaseClass.selectFields)
.load()
// tbl_users.show()
/**
* +---+------+
* | id|gender|
* +---+------+
* | 1| 2|
* | 10| 2|
* |100| 2|
* |101| 1|
* |102| 2|
* |103| 1|
* |104| 1|
* |105| 2|
* |106| 1|
* |107| 1|
* |108| 1|
* |109| 1|
* | 11| 2|
* |110| 2|
* |111| 1|
* |112| 2|
* |113| 1|
* |114| 1|
* |115| 1|
* |116| 2|
* +---+------+
* only showing top 20 rows
*/
//-----------------------上面程式碼已經將四級標籤與五級標籤的資料拿了出來,通過四級標籤的結果讀取Hbase的資料----------------------
//6.將五級標籤與tbl_users[使用者表]進行匹配
val five_List_Tag: List[TagRule] = five_Tag.map(row => {
val id = row.getAs("id").toString.toInt
val rule = row.getAs("rule").toString
TagRule(rule = rule, id = id)
}).collectAsList().asScala.toList
// println("five_List_Tag:",five_List_Tag)
/**
* (five_List_Tag:,List(TagRule(5,1), TagRule(6,2)))
*/
/**
* 寫一個UDF函數用於將habse的資料與五級標籤進行匹配
*/
val userDefinedFunction = udf((parameter: String) => {
var id=0
for (elem <- five_List_Tag) {
if (elem.rule == parameter) {
id=elem.id
}
}
id
})
//得到最終的結果標籤
val new_Tag: DataFrame = tbl_users.select('id.as("userId"), userDefinedFunction('gender).as("tagsId"))
new_Tag.show()
/**
* +------+------+
* |userId|tagsId|
* +------+------+
* | 1| 6|
* | 10| 6|
* | 100| 6|
* | 101| 5|
* | 102| 6|
* | 103| 5|
* | 104| 5|
* | 105| 6|
* | 106| 5|
* | 107| 5|
* | 108| 5|
* | 109| 5|
* | 11| 6|
* | 110| 6|
* | 111| 5|
* | 112| 6|
* | 113| 5|
* | 114| 5|
* | 115| 5|
* | 116| 6|
* +------+------+
* only showing top 20 rows
*/
//7.讀取Hbase的歷史資料,將新資料與老資料合併
// 考慮,hbase中最終標籤表裡已經有資料了,直接將新的資料寫入,會發生什麼問題? 答:會覆蓋
//考慮,現在已經通過追加的方式解決了覆蓋的問題,如相同的程式多跑幾次會發生什麼問題? 答會重複
//重複問題的解決辦法,在追加資料之後,程序一次去重操作就可以了
val old_Tag: DataFrame = spark.read.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
.option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
.option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
.option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
.option(HBaseMeta.HBASETABLE, "test")
.option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
.load()
// old_Tag.show()
/**
* 還沒有寫入資料,
* +------+------+
* |userId|tagsId|
* +------+------+
* +------+------+
*/
//7.1開始合併資料
if (old_Tag.count()==0){
//證明還沒有資料直接將資料寫入
new_Tag.write.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
.option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
.option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
.option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
.option(HBaseMeta.HBASETABLE, "test")
.option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
.save()
}else{
val append_Tag: UserDefinedFunction = udf((old_T: String, new_T: String) => {
println(old_T,new_T)
if (old_T=="") {
new_T
} else if (new_T=="") {
old_T
} else if (old_T=="" && new_T=="") {
""
}else{
val all_T = old_T + "," + new_T
//進行去重
val all_TAG = all_T.split(",").distinct.mkString(",")
all_TAG
}
})
val old_Append_new: DataFrame = old_Tag.join(new_Tag,old_Tag.col("userId")===new_Tag.col("userId"))
.select(
when(old_Tag.col("userId").isNotNull, old_Tag.col("userId"))
.when(new_Tag.col("userId").isNotNull, new_Tag.col("userId")).as("userId"),
append_Tag(old_Tag.col("tagsId"), new_Tag.col("tagsId")).as("tagsId"))
//8.將最終結果寫入到Hbase中 old_Append_new.write.format("cn.itcast.userprofile.up24.tools.HBaseDataSource")
.option(HBaseMeta.ZKHOSTS, hBaseMetaCaseClass.zkHosts)
.option(HBaseMeta.ZKPORT, hBaseMetaCaseClass.zkPort)
.option(HBaseMeta.FAMILY, hBaseMetaCaseClass.family)
.option(HBaseMeta.HBASETABLE, "test")
.option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
.save()
}
}
def mapToHbaseCaseClass(four_Map_Tag: Map[String, String]) = {
HBaseMeta(
inType = four_Map_Tag.getOrElse(HBaseMeta.INTYPE,""),
zkHosts = four_Map_Tag.getOrElse(HBaseMeta.ZKHOSTS,""),
zkPort = four_Map_Tag.getOrElse(HBaseMeta.ZKPORT,""),
hbaseTable = four_Map_Tag.getOrElse(HBaseMeta.HBASETABLE,""),
family = four_Map_Tag.getOrElse(HBaseMeta.FAMILY,""),
selectFields = four_Map_Tag.getOrElse(HBaseMeta.SELECTFIELDS,""),
rowKey = four_Map_Tag.getOrElse(HBaseMeta.ROWKEY,""))
}
}
開發流程:
本篇部落格主要為大家提供了匹配型性別標籤如何進行開發的一個步驟流程。如有不懂得地方可以私信我,然後幫你講解
如有什麼不對的地方,還請幫忙糾正錯誤!