2018年5月10日 星期四

Spark 小技巧系列 - 讀取檔案過大發生 java.lang.NegativeArraySizeException 該怎麼處理?


雖然我們知道單一檔案不要太大,或太小,但是有時候人在江湖身不由己,如果遇到單一檔案太大時,系統可能就會噴出以下錯誤:


[WARN] BlockManager   : Putting block rdd_12_0 failed due to exception java.lang.NegativeArraySizeException.
[WARN] BlockManager   : Block rdd_12_0 could not be removed as it was not found on disk or in memory
[ERROR] Executor          : Exception in task 0.0 in stage 3.0 (TID 259)
java.lang.NegativeArraySizeException
        at org.apache.spark.unsafe.types.UTF8String.concatWs(UTF8String.java:901)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:234)
        at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
        at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:86)
        at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:139)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:378)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1109)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) 

2018年5月9日 星期三

Spark 小技巧系列 - left join 後把null 改為0


如果使用Spark 的 left outer join 遇到沒有的資料通常就會以NULL顯示,如下圖所示:


這時候如果我想要計算CTR = click/ impression 會發生什麼事?直接噴錯給你看,然後也不知道發生什麼事....

org.apache.spark.sql.AnalysisException: Resolved attribute(s) 'impressionCount,'clickCount missing from viewCount#965L,(impressionCount / total)#582,dsType#1189,rid#14,impressionCount#534L,recommendCount#1198L,clickCount#1441L,siteId#16 in operator 'Project [siteId#16, rid#14, impressionCount#534L, (impressionCount / total)#582, viewCount#965L, dsType#1189, recommendCount#1198L, clickCount#1441L, ('clickCount / 'impressionCount) AS CTR#2022]. Attribute(s) with the same name appear in the operation: impressionCount,clickCount. Please check if the right attribute(s) are used.;;
'Project [siteId#16, rid#14, impressionCount#534L, (impressionCount / total)#582, viewCount#965L, dsType#1189, recommendCount#1198L, clickCount#1441L, ('clickCount / 'impressionCount) AS CTR#2022]
+- AnalysisBarrier
      +- LogicalRDD [siteId#16, rid#14, impressionCount#534L, (impressionCount / total)#582, viewCount#965L, dsType#1189, recommendCount#1198L, clickCount#1441L], false


    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:289)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3295)
    at org.apache.spark.sql.Dataset.select(Dataset.scala:1307)
    at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2192)
    at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2159)


其實原因就是有Null的存在,這時候只要使用以下技巧補零就可以了。

Dataset join1 = impression.join(broadcast(view), col, LeftOuter.toString())
                               .na()
                               .fill(0, new String[] {"viewCount"});

這段的意思就是會把null 的值,補上任何你想要的值,然後就解決了~


2018年5月3日 星期四

加速 Spark 寫到雲端儲存空間(AWS S3/ Azure blob/ Google GCS )的速度



不知道大家有沒有遇到過明明 Spark 程式都結束了,檔案也寫完了,Driver program 確還不會停止,感覺就當在那邊的經驗?

很多技術細節沒有遇到還真得不知道會有這種設計和改善的方法,會發現這個密技是因為下面幾個條件同時成立才注意到的:

1. 使用雲端儲存空間


為了節省成本,我們並沒有架設自己的HDFS Server,取而代之的都是把要分析的資料和結果儲存在雲端儲存空間( AWS S3/ Azure blob/ Google GCS)。這個選擇會大大降低維運成本和提高檔案的保存可靠度,不過缺點就是失去data locality 的速度優勢,而且每次分析都從雲端拉檔案下來也會花不少時間,所以就是以時間效能換取成本和可靠度。