Spark讀取elasticsearch資料指南

2022-06-08 21:00:51

最近要在 Spark job 中通過 Spark SQL 的方式讀取 Elasticsearch 資料,踩了一些坑,總結於此。

環境說明

  • Spark job 的編寫語言為 Scala,scala-library 的版本為 2.11.8。

  • Spark 相關依賴包的版本為 2.3.2,如 spark-core、spark-sql。

  • Elasticsearch 資料

    schema

    {
      "settings": {
        "number_of_replicas": 1
      },
      "mappings": {
        "label": {
          "properties": {
            "docId": {
              "type": "keyword"
            },
            "labels": {
              "type": "nested",
              "properties": {
                "id": {
                  "type": "long"
                },
                "label": {
                  "type": "keyword"
                }
              }
            },
            "itemId": {
              "type": "long"
            }
          }
        }
      }
    }
    

    sample data

    {
      "took" : 141,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 17370929,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "aen-label-v1",
            "_type" : "label",
            "_id" : "123_ITEM",
            "_score" : 1.0,
            "_source" : {
              "docId" : "123_ITEM",
              "labels" : [
                {
                  "id" : 7378,
                  "label" : "1kg"
                }
              ],
              "itemId" : 123
            }
          },
          {
            "_index" : "aen-label-v1",
            "_type" : "label",
            "_id" : "456_ITEM",
            "_score" : 1.0,
            "_source" : {
              "docId" : "456_ITEM",
              "labels" : [
                {
                  "id" : 7378,
                  "label" : "2kg"
                }
              ],
              "itemId" : 456
            }
          }
        ]
      }
    }
    

準備工作

既然要用 Spark SQL,當然少不了其對應的依賴,

dependencies {
  implementation 'org.apache.spark:spark-core_2.11:2.3.2'
  implementation 'org.apache.spark:spark-sql_2.11:2.3.2'
}

對於 ES 的相關庫,如同 官網 所說,要在 Spark 中存取 ES,需要將 elasticsearch-hadoop 依賴包加入到 Spark job 執行的類路徑中,具體而言就是新增到 Spark job 工程的依賴中,公司的 nexus 中當前最新的版本為 7.15.0,且目前我們是使用 gradle 管理依賴,故新增依賴的程式碼如下,

dependencies {
  implementation 'org.elasticsearch:elasticsearch-hadoop:7.15.0'
}

本地測試

對於 Spark,基於資源管理器的不同,可以在兩種模式下執行:本地模式和叢集模式,可通過 --master 引數來指定資源管理器的方式。本地模式時,不依賴額外的 Spark 叢集,Spark 將在同一臺機器上執行所有內容,非常方便用於本地測試,對於 Spark SQL,只需要在建立 SparkSession 時採用 local 的模式即可,

class MyUtils extends Serializable {
  def esHost() = s"es.sherlockyb.club"
  
  // local mode
  def getLocalSparkSession: SparkSession = SparkSession.builder()
    .master("local")
    .getOrCreate()
  
  // cluster mode
  def getSparkSession: SparkSession = SparkSession.builder()
    .enableHiveSupport()
    .config("spark.sql.broadcastTimeout", "3600")
    .getOrCreate()
}

測試程式碼

object LocalTest extends LazyLogging {
  def main(args: Array[String]): Unit = {
    new LocalTest().run()
  }
}

class LocalTest {
  def run(): Unit = {
    val myUtils = new MyUtils
    val spark = myUtils.getLocalSparkSession
    import spark.implicits._

    var start = System.currentTimeMillis()
    val attributeId = 7378L
    val labelNames = Array("aen-label-retail", "aen-label-seller")
    spark.read
      .format("es")
      .option("es.nodes", myUtils.esHost())
      .option("es.port", "9200")
      .option("es.nodes.wan.only", value = true)
      .option("es.resource", Joiner.on(",").join(java.util.Arrays.asList(labelNames:_*)) + "/label")
      .option("es.scroll.size", 2000)
      .load()
      .createOrReplaceTempView("temp_labels")
    
    val sqlDf = spark.sql("select itemId, labels from temp_labels where itemId in (123, 456)")
    val newDf = sqlDf
      .map(row => {
        val labels = row.getAs[Seq[Row]]("labels")
        val labelValue = labels.find(p => p.getAs[Long]("id") == attributeId).map(p => p.getAs[String]("label"))

        (row.getAs[Long]("itemId"), attributeId, labelValue.orNull)
      })
      .withColumn("final_result", lit("PASS"))
      .toDF("itemId", "attributeId", "label", "final_result")

    val finalDf = newDf.toDF("itemId", "attributeId", "label", "result")
    finalDf.printSchema()
    finalDf.show()
    
    var emptyDf = newDf
      .filter(col("label").isNotNull)
      .toDF("itemId", "attributeId", "label", "result")
    emptyDf = emptyDf.union(finalDf)
    emptyDf.printSchema()
    emptyDf.show()

    emptyDf.filter(col("itemId") === 6238081929L and col("label").notEqual(col("result")))
      .show()

    val attributeTypeIds = Array.fill(3)(100)
    val attributeTypeIdsStr = Joiner.on(",").join(java.util.Arrays.asList(attributeTypeIds:_*))
    println(attributeTypeIdsStr)


    import scala.collection.JavaConverters._
    emptyDf = emptyDf.filter(!col("itemId").isin(trainItemIds.asScala.map(Long2long).toList:_*))
    emptyDf.show(false)
  }
}

知識點

Spark SQL Data Sources

Spark SQL 通過 DataFrameReader 類支援讀取各種型別的資料來源,比如 Parquet、ORC、JSON、CSV 等格式的檔案,Hive table,以及其他 database。而 Elasticsearch 只不過是眾多資料來源中的一種,DataFrameReader 通過 format(...) 指定資料來源格式,通過 option(...) 客製化對應資料來源下的設定,最後通過 load() 載入生成 DataFrame,也就是 Dataset[Row] 的型別別名。有了 DataFrame,就可以建立一個臨時表,然後就能以 SQL 的方式讀取資料。

在 Spark 1.5 以前,Elasticsearch 在 format(...) 中對應的 source 名需要是全包名 org.elasticsearch.spark.sql,而在 Spark 1.5 以及之後的版本,source 名稱簡化為 es

Spark SQL 中 DataFrame 常用 API

  • df.printSchema(),列印 schema
  • df.show(),檢視資料列表,預設是 truncate 前 20 條,傳 false 時列出全部資料。
  • df.createOrReplaceTempView("view_name"),構建臨時表檢視,方便後續 SQL 操作。
  • df.withColumn(),新增新列或替換現有列。
    • df.withColumn("final_result", lit("PASS")) ,通過 lit 新增常數列。
  • df.filter(col("label").isNotNull),用指定的條件過濾行。
  • df.dropDuplicates("itemId","attributeId"),按指定列對行去重,返回新的資料集。
  • df.union(otherDf),將兩個 DataFrame 的記錄合併且不去重,相當於 union all。
  • df.toDF("itemId", "attributeId", "label", "final_result"),為 df 各列指定一個有意義的名稱。

Scala 與 Java 型別對映

  • scala.Long -> long
  • Array[T] -> T[]

Scala 與 Java 型別轉換

import scala.collection.JavaConverters._
newDf = df.filter(!col("itemId").isin(trainItemIds.asScala.map(Long2long).toList:_*))

Scala 中的 : _*

:_*type ascription 的一個特例,它會告訴編譯器將序列型別的單個引數視為變引數序列,即 varargs。應用例子,

val indices = Array("aen-label", "aen-label-seller")
Joiner.on(",").join(java.util.Arrays.asList(indices:_*))

踩的坑

es.nodes.wan.only

該設定項表示聯結器是否用於 WAN 上的雲或受限環境如 AWS 中的 Elasticsearch 範例,預設為 false,而公司的 Elasticsearch 叢集是在 AWS 上的,endpoint 只能在內網存取,因而剛開始測試時,遇到如下報錯,

Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available
	at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:159)
	at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:223)
	at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:73)
	at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:72)
	at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:44)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:340)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:723)

通過 option("es.nodes.wan.only", value = true) 將設定項設定為 true 後恢復正常。

importing spark.implicits._

在遍歷 DataFrame 時遇到如下編譯錯誤,

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._

在處理 DataFrame 之前需要加上 importing spark.implicits._,用於將常見的 Scala 物件轉換為 DataFrame,通常在獲取 SparkSession 後立馬 import。

Spark SQL 讀取 hive 表中 array 型別時,對於 Scala 語言,得到的型別是 WrappedArray 而不是 Array

當我們通過 createOrReplaceTempView("temp_labels") 構建一個臨時表檢視後,就可以通過 SQL 像操作 hive 表那樣讀取資料。例如讀取指定的列,

val sqlDf = spark.sql("select itemId, labels from temp_labels where itemId in (123, 456)")

通過 sqlDf.printSchema() 可以看到 sqlDf 的 schema 長這樣,

root
 |-- itemId: long (nullable = true)
 |-- labels: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- label: string (nullable = true)

labels 是包含 struct 的陣列,於是從 row 中將 labels 列讀出時想嘗試轉換為 Array,

val newDf = sqlDf.map(
  row => {
    val labels = row.getAs[Array[Row]]("labels")
    val labelValue = labels.find(p => p.getAs[Long]("id") == attributeId).map(p => p.getAs[String]("label"))

    (row.getAs[Long]("itemId"), attributeId, labelValue.orNull)
  }
)

結果報錯如下,

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lorg.apache.spark.sql.Row;

可以看到 Spark SQL 在讀取表中陣列列時,是用的 scala.collection.mutable.WrappedArray 來儲存結果的,看其類定義可知,它是間接實現 Seq 介面的,所以也可用 row.getAs[Seq[Row]]("labels") 來讀取。這裡需要注意的是,Array[T] 雖然在 Scala 原始碼定義中是 class,但其對標的 Java 型別是原生陣列 T[]

判斷 Column 是否為 null 時,需要用 is nullis not null,而不是 === !==

對於錯誤的用法,filter 並不會生效,就像下面這樣

newDf.filter(col("label") !== null)

這一點和 hive 表以及 MySQL 表判斷欄位是否為 null,是保持一致的,應該像下面這樣,

newDf.filter(col("label").isNotNull)

最終程式碼

import com.google.common.base.Joiner
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

object TestMain extends LazyLogging {
  def main(args: Array[String]): Unit = {
    val myUtils = new MyUtils
    new TestApp(myUtils).run()
  }
}

class TestApp(myUtils: MyUtils) extends Serializable with LazyLogging {  
  def esDf(spark: SparkSession, indices: Array[String]): DataFrame = {
    spark.read
      .format("es")
      .option("es.nodes", myUtils.esHost())
      .option("es.port", "9200")
      .option("es.nodes.wan.only", value = true)
      .option("es.resource", Joiner.on(",").join(java.util.Arrays.asList(indices:_*)) + "/label")
      .option("es.scroll.size", 2000)
      .load()
  }
  
  def run(): Unit = {
    val spark = myUtils.getSparkSession
    import spark.implicits._
    
    val esTempView = "es_label"
    val labelNames = Array("aen-label-retail", "aen-label-seller")
    esDf(spark, labelNames).createOrReplaceTempView(esTempView)
    
    val labelDf = getLabelDf(spark, itemIdsStr, attributeTypeIds, esTempView)
    println("debug log")
    labelDf.printSchema()
    labelDf.show()
    labelDf.createOrReplaceTempView("final_labels")
    
    val data = spark.sql(
      s"""
      |select cc.*, pp.final_result, pp.label, null as remark
      |from temp_request cc
      |left join final_labels pp
      |on cc.itemid = pp.itemId
      |and cc.attributetypeid = pp.attributeId
      |where cc.profile = '$jobId'
      |""".stripMargin)

    data.distinct().write.mode(SaveMode.Overwrite)
    .option("compression", "gzip")
    .json(s"s3://sherlockyb-test/check-precision/job_id=$jobId")
  }
  
  def getLabelDf(spark: SparkSession, itemIdsStr: String, attributeTypeIds: Array[String], esTempView: String): DataFrame = {
    import spark.implicits._

    val sqlDf = spark.sql(s"select itemId, labels from $esTempView where itemId in ($itemIdsStr)")
    val emptyDf = spark.emptyDataFrame
    var labelDf = emptyDf
    attributeTypeIds.foreach(attributeTypeId => {
      val attributeDf = sqlDf
        .map(row => {
          val labels = row.getAs[Seq[Row]]("labels")
          val labelValue = labels.find(p => p.getAs[Long]("id") == attributeTypeId.toLong).map(p => p.getAs[String]("label"))

          (row.getAs[Long]("itemId"), attributeTypeId.toLong, labelValue.orNull)
        })
        .withColumn("final_result", lit("PASS"))
        .toDF("itemId", "attributeId", "label", "final_result")
        .filter(col("label").isNotNull)
      if (labelDf == emptyDf) {
        labelDf = attributeDf
      } else {
        labelDf = labelDf.union(attributeDf)
      }
    })

    labelDf.dropDuplicates("itemId","attributeId")
  }
}

補充:提交 spark job

將 job 工程打包為 Jar,上傳到 AWS 的 s3,比如 s3://sherlockyb-test/1.0.0/artifacts/spark/ 目錄下,然後通過 Genie 提交 spark job 到 Spark 叢集執行。Genie 是 Netflix 研發的聯合作業執行引擎,提供 REST-full API 來執行各種巨量資料作業,如 Hadoop、Pig、Hive、Spark、Presto、Sqoop 等。

def run_spark(job_name, spark_jar_name, spark_class_name, arg_str, spark_param=''):
    import pygenie

    pygenie.conf.DEFAULT_GENIE_URL = "genie.sherlockyb.club"

    job = pygenie.jobs.GenieJob() \
        .genie_username('sherlockyb') \
        .job_name(job_name) \
        .job_version('0.0.1') \
        .metadata(teamId='team_account') \
        .metadata(teamCredential='team_password')

    job.cluster_tags(['type:yarn-kerberos', 'sched:default'])
    job.command_tags(['type:spark-submit-kerberos', 'ver:2.3.2'])
    job.command_arguments(
        f"--class {spark_class_name} {spark_param} "
        f"s3a://sherlockyb-test/1.0.0/artifacts/spark/{spark_jar_name} "
        f"{arg_str}"
    )

    # Submit the job to Genie
    running_job = job.execute()
    running_job.wait()
    
    return running_job.status

首發連結: https://www.yangbing.club/2022/06/03/Spark-reading-elasticsearch-guide/
許可協定: 除特殊宣告外,本博文均採用 CC BY-NC-SA 3.0 CN 許可協定,轉載請註明出處!