專案需求需要空間計算能力,開始選型Sedona(GeoSpark)來完成,
需求需要每一條資料在滿足某條件的情況下,去查詢某張表進行空間匹配,找到離這個點(point)最近的一條道路(lineString)
第一個方案: 使用sedona來使用臨近道路的判斷
由於sedona本質還是使用spark的能力,所以遵循spark的開發規則,不能在rdd.map
裡面幹活,sedona也不支援批次查,只能一條一條匹配。 虛擬碼如下
val spatial_sql =
"""
| select
| ST_GeomFromWKT(geom) geom, name, adcode
| from ods.ods_third_party_road_data
|""".stripMargin
val third_party_road_df = spark.sql(spatial_sql).toDF()
aoi_day_s_df.rdd.collect().par.map(row => {
val tmp_location = row.getAs[String]("poi_location")
val near_street = spatialQueryStreet(third_party_road_df, city_code, tmp_location)
println(near_street)
...
)
def spatialQueryStreet(third_party_road_df:DataFrame, city_code:String, location: String): String = {
val frame = third_party_road_df.where("adcode = '%s'".format(city_code)).toDF()
val tp_road_spatial_rdd = Adapter.toSpatialRdd(frame, "geom")
tp_road_spatial_rdd.buildIndex(IndexType.RTREE, false)
val geometryFactory = new GeometryFactory()
val x = location.substring(location.indexOf("(") + 1, location.indexOf(" "))
val y = location.substring(location.indexOf(" ") + 1, location.indexOf(")"))
val pointObject = geometryFactory.createPoint(new Coordinate(x.toDouble, y.toDouble))
val usingIndex = true
val result = KNNQuery.SpatialKnnQuery(tp_road_spatial_rdd, pointObject, 1, usingIndex)
if (result.isEmpty) {
return ""
} else {
val dst = result.get(0)
//System.out.println("==== dst.getUserData: " + dst.getUserData.toString)
val strings = dst.getUserData.toString.split("\t")
val near_street = strings(0)
//System.out.println("==== near_street: " + near_street)
near_street
}
結果效率不高,因為每條資料都要匹配,sedona又不能在rdd.map
中使用,所以必須先collect().map
,這就不能利用到spark多節點並行的特性; 2. 每條資料都基於third_party_road_df
建立了空間索引來查,效率更低了(如果只有一條資料還勉強可以接受)
方案2: 改sedona為JTS來處理,jts直接建立rtree,可以在rdd.map
中處理,而且建立速度也更快一些,效率更高了
虛擬碼如下
poi_build_aoi_aoi_day_s_df.rdd.map(row => {
val tmp_location = row.getAs[String]("poi_location")
val rtree = createRtree(model_list)
near_street = spatialQueryStreet(rtree, tmp_location)
println(near_street)
...
)
def createRtree(third_party_road_list: Array[ThirdPartyModel]): STRtree = {
val rtree = new STRtree()
for (model <- third_party_road_list) {
val geom = model.geometry
geom.setUserData(model.name)
rtree.insert(geom.getEnvelopeInternal, model.geometry)
}
rtree.build()
rtree
}
def spatialQueryStreet(rtree: STRtree, location: String): String = {
if (rtree == null) {
""
}
val geometryFactory = new GeometryFactory()
val x = location.substring(location.indexOf("(") + 1, location.indexOf(" "))
val y = location.substring(location.indexOf(" ") + 1, location.indexOf(")"))
val pointObject = geometryFactory.createPoint(new Coordinate(x.toDouble, y.toDouble))
val result = rtree.nearestNeighbour(pointObject.getEnvelopeInternal, pointObject, new GeometryItemDistance())
val name = result.asInstanceOf[Geometry].getUserData.asInstanceOf[String]
println(s"nearestNeighbour name: $name")
name
}
通過這次修改,由原來跑3個小時(甚至更多)的任務在15分鐘內就跑完了
PS: 經嘗試rtree
不能通過廣播變數傳送出去,會報序列化異常
其實還可以再優化一下,上面每條資料還是建立了一次rtree
, 可以改為mapPartition
,然後只建一次rtree
, 資料量大時效果更佳
aoi_day_s_df.rdd.mapPartitions(iterator => {
// rtree 放到iterator.map 外面建立,搞一次就ok了,更快(不過我沒有試驗,應該是百分百可行的)
val rtree = createRtree(model_list)
val seq = iterator.map(row => {
val tmp_location = row.getAs[String]("poi_location")
near_street = spatialQueryStreet(rtree, tmp_location)
println(near_street)
...
)
seq
)