導讀 本文將分享 Apache Kyuubi 在愛奇藝的一些實踐和落地。
文章將圍繞下面三點展開:
(資料圖片)
1. 愛奇藝 Spark Thrift Server 服務演進
2. Spark SQL 平臺化適配
3. Spark SQL 服務優化
分享嘉賓|王震 愛奇藝 高級研發工程師
編輯整理|馬慧 匯豐達
出品社區|DataFun
01
愛奇藝 Spark Thrift Server 服務演進
1. 愛奇藝統一 SQL 網關 Pilot 服務架構
首先分享愛奇藝的統一 SQL 網關 Pilot 服務,Pilot 作為統一的 SQL 服務,上層是通過 JDBC 的協議對接了愛奇藝的數據開發平臺、即席查詢的平臺以及業務的取數服務。服務端通過 SQL 解析的模塊對 SQL 進行分析,對異常 SQL 進行攔截,并且智能的選擇合適的執行策略和路由策略。結合服務發現提供高可用。在配置中心可以配置標簽化的配置,并基于運行歷史提供統計和審計視圖。底層支持了 Trino、Spark、Hive 還有 Clickhouse 等引擎。其中我們使用 Kyuubi 來作為 Spark Thrift Server 服務。
2. 愛奇藝 Spark Thrift Server 服務演進
在愛奇藝,Spark Thrift Server 服務經歷了三個階段:
(1) 最開始 我們使用的是 Spark 原生的 Thrift Server 服務,支持簡單的 Ad-hoc 查詢。
(2) 后來 我們引入了 Kyuubi 的版本,支持 Ad-hoc 查詢以及少量的 ETL 任務。
(3)Kyuubi 在 的版本中做了一些架構優化,我們是在 Apache Kyuubi 進入到 Apache 之后,也升級到了 的版本,支持了 Ad-hoc 查詢,慢慢有大量的 ETL 任務。并且使用 Spark SQL 代替了 Hive,成為主要的離線處理引擎,同時也支持了離線數據湖的分析操作。
下面通過對比這 3 個服務,分享一下 Spark Thrift Server 服務的演進過程。
第一階段:Spark 原生的 Thrift Server 服務的架構
這是 Spark 原生的 Thrift Server 服務的架構,通過 啟動常駐的 Spark 服務 ,在 Driver 端去暴露 Thrift Server 的服務接口來接收并執行 SQL 請求,是 基于一個 SparkContext 的多線程的應用場景, 它的服務特性: 基于 HiveServer2 協議,可以很好地兼容 Hive 相關的生態。因為是一個常駐的服務,可以有效減少 Spark 啟動的開銷, 可以快速響應請求。
不過由于使用的是同一個 SparkContext,所以 不支持多租戶,資源不隔離,多個任務之間會相互影響,搶占資源之類的。 并且服務啟動后配置是固定的,在 啟動后資源的配置都不可以修改。
第二階段:引入 Kyuubi 的 的版本
后面我們引入了 Kyuubi 的 的版本,對 Thrift Server 進行了增強, 允許不同的連接使用不同的 SparkContext ,同時也支持 同一個用戶級別的 SparkContext 共享。
也是 基于 HiveServer2 協議的,支持了多租戶。 同時在服務啟動后可以 動態傳入一些資源配置, 調整 SparkContext 的資源,并且 不同的鏈接可以使用不同 SparkContext 進行 Executor 層面的資源隔離。
不過由于它是 Yarn-Client 模式運行 的,所有 SparkContext 其實是共享在一個 Spark Driver 的進程之內的, 所以對 Driver 端的壓力還是挺大的,也容易達到性能的瓶頸。
第三階段:升級 Kyuubi 的版本
Kyuubi 的版本對架構進行了優化,主要是 對 Server 端和引擎端進行了解耦操作。
Server 端主要是做引擎的啟動以及 SQL 轉發,同時支持引擎的共享策略, 比如支持用戶級別共享,允許同一個用戶的不同連接使用同一個 Kyuubi Engine,從而實現上下文共享,并且節省資源。
服務特點: 低耦合,Kyuubi 和 Engine 進行解耦 不同的任務使用不同的引擎,這樣就可以達到完全的資源隔離。
同時支持 各種共享策略:User 級別、Group 級別、還有 Server 級別的。使用共享引擎主要能夠節省資源,避免反復啟停引擎,可以快速響應請求。
三個服務的一個簡單的對比
可以看到 Kyuubi 版本中支持多租戶、有很好的并發支持。
資源隔離性方面: Spark Thrift Server 主要依賴于 Spark 原生的 Pools。Kyuubi 共用同一個 Driver 維護多個 SparkContext。Kyuubi 支持獨立和復用引擎。
資源的使用方面: Spark Thrift Server 是常駐的 SparkContext 資源。Kyuubi 版本是可以使用獨立的 SparkContext,也可以復用。Kyuubi 版本也可以支持獨立的引擎以及各種共享引擎,它的 獨立引擎是在執行完成之后釋放資源的,共享引擎是在超時之后釋放。
02
Spark SQL 平臺化適配
接下來分享 Spark SQL 平臺化適配,將從四個方面進行介紹:
① 標簽化配置;
② SQL 事件審計;
③ Spark SQL血緣采集;
④ 服務監控。
1. 標簽化配置
我們在使用中發現有很多任務有共同的特性。
① ETL 任務: 它的任務特點是運行時間比較長,數據量比較大,并且因為運行長需要穩定運行。
② 即席查詢的任務: 快速響應、數據量比較小,同時穩定性要求沒有那么高。
③ 對于同一個用戶和業務內部: 又有一些相似性,數據量級比較接近、用戶開發習慣比較接近。
所以我們 對這些任務進行一些標簽化的配置。 比如離線 ETL,我們配置一個 Connection 級別的獨立引擎,即席查詢使用 User 級別的引擎。不同的用戶和業務根據不同的需求配置一些資源,另外用戶開發習慣不同,可能有一些 Hive 兼容性的配置等。
用戶任務在請求的時候只需要帶上一些標簽,比如帶上一個 ETL 任務,這樣就會使用獨立的引擎,或者是一個即席查詢任務,就會用 User 引擎級別。
Kyuubi 中是提供了一個 SessionConfAdvisor 接口,允許用戶對 Session 注入一些配置,進而可以用來實現上述的標簽化配置的功能。
2. SQL 事件審計
SQL 事件審計是基于 Kyuubi 的 Event 體系進行實現的。
Kyuubi 在 Server 和 Engine 中都暴露了很多事件: 在 Server 中暴露的 Server 的啟停事件、Session 的打開\關閉事件,Operator 的執行事件。Engine 中也記錄了 Session、Operation、還有血緣等事件。
Kyuubi 中通過 Event Handler 對這些事件進行處理。在 Server 端,我們實現了一個 ES 的 Event Handler, 將 Server 的事件直接吐到 ES 中。
引擎端是用 Kyuubi 的 SparkHistory Logging EventHandler, 將 Event 記錄到了 Spark History 的 HDFS 文件 里面。
這樣我們就可以對 SQL 執行事件進行分析統計,比如可以統計出來 SQL 執行的總量、失敗量,還有 SQL 的資源使用情況。 SQL 的執行事件里面還包括了一些執行的錯誤信息,可以使用正則匹配規則建立 錯誤信息的規則庫, 來進行錯誤提示實現 輔助運維。
我們還可以基于 SQL 的一些運行歷史,對任務進行 HBO 的優化,推斷出更合適的配置。
同時 Spark UI 上面其實 Kyuubi 也通過吐出的事件渲染出了 Kyuubi 的 Tab 頁,方便了 Kyuubi 任務的分析和排障。
Kyuubi 中是可以實現自定義的 Event Handler 對事件進行處理。
3. Spark 血緣采集
這個是血緣采集的過程:
先通過 Spark Catalyst 對 SQL 解析成執行計劃。
用 SparkAtlas Connector 的插件, 通過 QueryExecutionListener 攔截到執行計劃,再把 它解析出血緣信息,再通過 Atlas 的客戶端給它發送到 Atlas 里面。
Kyuubi 中也帶了 Spark 的血緣插件,也支持列級別的血緣, 大家有興趣也可以去了解一下。
4. 服務監控
Kyuubi 服務也暴露了很多的監控指標, 服務端暴露了:GC、Memory、 Connection、Operation 等監控的指標,內部會有很多 Reporter 發送到不同的服務里面。
我們 通過自定義的 Reporter Service,把這些指標直接吐到內部的監控平臺上,再配置一些告警和視圖。
同時也開發了撥測任務,使用 JDBC 連接提交一些測試的任務到各個 Kyuubi Server 上執行,判斷任務能否正常執行。另外,定時調用 Kyuubi Server 的 Rest API 進行健康檢查。
03
Spark SQL 服務優化
1. 小文件優化
下面分享一下 Spark SQL 的服務優化,先看一下 Spark SQL 的小文件優化。
為什么產生小文件?
左邊是讀一個表,直接寫入到另外一個表里面的操作,右邊是兩個表 Join 寫入到另外一個表中。看一下左邊,假如它讀取的分區數是 100 個,寫入也是 100 個分區,最終將會寫入 100 個文件。
右邊假設兩個表的并行度也是 100,經過 Join 之后會產生 Shuffle 操作變成 200 個分區,可能就寫入 200 個文件。 所以最終寫入的文件數是跟最終寫入 Stage 的 Task 數是相關的。
如何解決小文件?
我們怎么解決小文件? 需要在寫入操作之前進行一個 Repartition 的操作,來控制寫入的 Stage 的分區數,進而控制寫入的小文件數。
左邊示例,我們插入了一個 Repartition 10 的算子,最終寫入的 Stage 的 Partition 數就是 10,最終可能也就生成 10 個文件。
右邊示例也是類似的一個操作。
Repartition 的兩個問題
① 第一個:Repartition 這個數我們怎么確定?
② 第二個:就是動態分區寫入的一個情況下,假設某一些分區的數據量比較大,又比較容易導致分區傾斜的情況。
Repartition 數量如何確定?AQE 動態合并小分區
下面看一下具體怎么解決這兩個問題。第一個 Repartition 數量如何確定?
首先我們可以把 Repartition 數設置為一個較大的值,并且因此觸發了 Shuffle 操作。再借助于 Spark 3 的 AQE 動態合并小分區的功能,自動的根據配置的大小將小分區進行合并,分區數控制在合理的范圍。這樣就不需要特意的去配置 Repartition 的數量。
動態分區寫入導致超大文件-添加隨機數
動態分區如果有 一些分區的數量比較大,進行 Repartition 操作之后,會導致數據傾斜。
對于這種情況,我們通過添加隨機數,將分區進行拆分。示例中我們加入了值為 4 的隨機數,相當于把每個分區拆成了 4 份,再對分區以及隨機數字段進行 Repartition 操作。這樣可以看到分區傾斜有一個很好的效果。
不過這樣也會存在一個問題, 如果隨機數控制的不合理,它又容易再次導致小文件的問題。
Rebalance 平衡分區( + )
在 Spark 里面,引入了 Rebalance 的特性,可以自動平衡分區, 分區過大的時候會進行拆分,分區過小的時候進行合并,這樣就可以有效地控制小文件和數據傾斜。
圖中上面兩個大分區,在進行 Rebalance 之后,會自動地分成兩個分區,小分區又會自動合并成大分區,這樣整體的效果就比較平衡。
右邊是經過 Rebalance 的寫入的執行計劃,上面是合并了小分區,下面是拆了一下數據傾斜的分區,寫入都是控制在了比較合適的范圍,基本上解決了小文件的問題。
Kyuubi 的相關特性是在 Kyuubi Extension 的插件里面,使用的時候只需要把這個插件放到 Spark_HOME 的 Jars 里面,同時配上 KyuubiSparkSQLExtension ,然后再配置一下 AQE 的一些配置,基本上就可以自動地優化小文件。
2. Z-order 優化簡介
下面分享 Kyuubi 的另一個比較優秀的特性,就是 Z-order 優化。
簡單介紹一下 Z-order 優化, Z-order 排序實際上是將多維數據映射到一維上的排序算法。
這個圖片是來自 Databricks 的技術博客,例子可以看到,假設這有一個二維數據 X、Y,左邊是一個 線性排序 :X = 0,Y = 1、2、3、4 進行排序,并且 4 個數據點存放在一個存儲單元里面。Parquet 這一類的文件格式在查詢的時候會根據文件的元信息,做 Data Skipping 操作,比如文件塊里面的最大最小值來做過濾。左邊我們要執行上面 SQL,過濾 X = 2 或者 Y = 2 的數據,第一個數據塊 X = 0,Y 的最大最小值就是 0 ~ 3 之間,所以它會被選中,會被掃描,第二個文件塊,用 X = 0,Y 的最小值是 3,會直接跳過。如此以來就會掃描到 9 個文件塊,并且查詢了 21 個無效數據點。
右邊這個是 Z-order 的排序,可以看到它就類似一個 z 型曲線排序。同樣也是 4 個點放在一個文件塊里面。第一個文件塊,如果我們也執行上面查詢 X = 2 或者 Y = 2,第一個文件塊 X、Y 最大最小值就是 0 ~ 1 之間,所以會被過濾掉。第二個文件塊 X,Y 的最大最小值是 2~3 之間,所以它就會被選中,第三個會被跳過。如此以來它會掃描到 7 個文件塊,13 個無效的數據點, 跟左邊線性對比可以看到它的掃描的文件塊數其實是減小了,并且無效的數據會少很多。
這樣 經過 Z-order 排序之后,會提高加速查詢的速度。 并且由于它多維映射到一維上, 相當于多維數據上更加臨近了,相似性更高一些,所以它的壓縮效果會提高很多。
這邊是 Z-order 的一個實現,最關鍵是將多維數據計算出 Z-value 值,再通過 Z-value 值進行排序。
這是維基百科上面的兩張圖介紹。通過一個二進制位交叉的算法,計算 Z-value 值,比如 X、Y 都變成了二進制的數,進行位交錯計算出來 Z-value 的值,通過這個值進行排序。右邊就是經過算法之后的排序效果。可以看到第一個點 000000,第二個點000001,第三個點再到 000010,再到 000011。是一個 Z 型的曲線。
這個特性在 Kyuubi 中也是放在 KyuubiExtensionSpark 的插件里面,我們只需要把它復制到 Spark Jars 里面,并且在 SparkConf 里面配置 Kyuubi Spark SQL Extension。
對存量的數據只需執行一下 Optimize 的命令, 就可以對存量數據進行重排序然后再寫入的操作。
對于增量數據,我們可以在表里面加上 、 兩個Properties, 在增量寫入的時候,它就會自動進行 Z-order 的優化寫入。
04
問答環節
Q1:Kyuubi 怎么實現持久化的 Spark Context?
A1:Kyuubi 中會啟動一個獨立的引擎,并且支持一些共享策略。比如像 User 級別共享引擎,很多人使用同一個 Hadoop 用戶進行提交,將會使用同一個引擎,引擎具有一個超時時間,在空閑一段時間后才會退出。比如你提交一個任務,執行完之后連接會斷掉,但是引擎是沒有退出的,還常駐在那里,其他的人一提交就會馬上連到引擎上面。
Q2:什么叫撥測?
A2:撥測就是對服務的健康檢測。比如每 5 分鐘提交一個 JDBC 請求,保證服務可用。每 1 分鐘去調一下它的 Rest API,確定服務是存活的,連續失敗了幾次就可以確定服務不可用,自動地對它進行一個重啟。
今天的分享就到這里,謝謝大家。
關鍵詞: