2018年5月3日 星期四

加速 Spark 寫到雲端儲存空間(AWS S3/ Azure blob/ Google GCS )的速度



不知道大家有沒有遇到過明明 Spark 程式都結束了,檔案也寫完了,Driver program 確還不會停止,感覺就當在那邊的經驗?

很多技術細節沒有遇到還真得不知道會有這種設計和改善的方法,會發現這個密技是因為下面幾個條件同時成立才注意到的:

1. 使用雲端儲存空間


為了節省成本,我們並沒有架設自己的HDFS Server,取而代之的都是把要分析的資料和結果儲存在雲端儲存空間( AWS S3/ Azure blob/ Google GCS)。這個選擇會大大降低維運成本和提高檔案的保存可靠度,不過缺點就是失去data locality 的速度優勢,而且每次分析都從雲端拉檔案下來也會花不少時間,所以就是以時間效能換取成本和可靠度。



2. Partition 數量切太大


partition 的數量跟平行化的數量是有一定的相關性,但是對於檔案應該要切幾份這個問題在網路上都沒有一個明確的答案,有的是以cpu core 的倍數去判斷,有的是已切割完後檔案大小為考量,後來找到一篇文章 Understanding Spark Partitioning,他的判斷標準感覺還蠻有道理的,不過就是要從UI觀察:

So there is trade off between number of partitions. Below is recommended guideline –      Usually between 100 and 10K partitions depending upon cluster size and data.  
  • Lower bound – 2 X number of cores in cluster available to application     
  • Upper bound – task should take 100+ ms time to execute.If it is taking less time than your partitioned data is too small and your application might be spending more time in scheduling the tasks.
所以為了比較到底partition 對於效能的影響,我就開始做實驗把partition 數量直接從 2048 / 1024 / 512 一路往下降,並且透過UI來觀察。

當partition數量越多,檔案就會被切分越多份,其實運算起來真得會快很多,但是大到一個數量就不明顯了,而且隨著檔案數量被切分越多,副作用就開始產生了,也就是寫檔案回雲端儲存空間就變的沒有效率。


3. 網路速度變慢


其實壓死駱駝的最後一根稻草是網路速度,某一天機房網路連到GCS的速度明顯變慢,明明所有運算都已經完成了,就連存檔階段(把csv 檔存到GCS)也結束了,但是不知道為什麼spark 工作一直沒有結束,感覺就當機卡在那邊,從下圖就可以看到當天的狀況:


可以看到寫檔花了很多時間,但是寫檔完程式卻沒有結束,仍然不知道在等啥?
透過gsutil 我發現到一個奇妙的現象,雖然寫檔結束了,但是檔案才開始默默的增加?

⮀ gsutil ls -lh "gs://spark-data/output/Word2Vec/daily/" |wc -l
      90
⮀ gsutil ls -lh "gs://spark-data/output/Word2Vec/daily/" |wc -l
      91
⮀ gsutil ls -lh "gs://spark-data/output/Word2Vec/daily/" |wc -l
      92
⮀ gsutil ls -lh "gs://spark-data/output/Word2Vec/daily/" |wc -l
      94
⮀ gsutil ls -lh "gs://spark-data/output/Word2Vec/daily/" |wc -l
     153

因為我的資料 partition 成 1024 份,那代表要等它搬完1024份,想到就昏倒,再進一步觀察才發現 Spark 會在目標目錄下產生一個 _temporary的目錄,先把檔案存在那邊,等到儲存完畢,再把它搬出來....囧 於是當partition 切太大,然後網路速度又慢的狀態下,最後那段卡住的時間就是花在 driver program 搬檔案的時間。

解決方案:

這時候有什麼可能的解決方案呢?
  1. 選擇離台灣機房比較近的雲端空間 (阿都選google 台灣了...)
  2. 降低 partition 的數量(實用,但是仍需要做實驗選擇數量)
  3. 改變spark 那種寫檔案到tmp 的怪異現象

結果還真得有3這種設定 - A Spark 2.0.0 Cluster Takes a Longer Time to Append Data

If you find that a cluster using Spark 2.0.0 version takes a longer time to append data to an existing dataset and in particular, all of Spark jobs have finished, but your command has not finished, it is because driver node is moving the output files of tasks from the job temporary directory to the final destination one-by-one, which is slow with cloud storage. To resolve this issue, set mapreduce.fileoutputcommitter.algorithm.version to 2. Note that this issue does not affect overwriting a dataset or writing data to a new location.

所以之後只要再寫檔案前,增加下面這個option就可以加快寫檔的速度:

write.option("mapreduce.fileoutputcommitter.algorithm.version", "2")


不過根據官網使用這個功能伴隨著資料丟失的風險:

DirectParquetOutputCommitter is removed from Spark 2.0 due to the chance of data loss. Unfortunately until we have improved consistency from S3a we have to work with the workarounds. Things are improving with Hadoop 2.8


其它相關文章:
[1] Apache Spark Performance Tuning – Degree of Parallelism
[2] Apache Spark - Level of Parallelism
[3] SPARK-10063 - Remove DirectParquetOutputCommitter
[4] Apache Spark and Amazon S3 — Gotchas and best practices




張貼留言