2018年5月21日 星期一

阿里鐵軍 - 企業價值的考核



阿里鐵軍第二章講的是公司的願景,教育訓練和績效考核,先來看看阿里巴巴的價值觀的演進,從原本的獨孤九劍:
  • 創新
  • 激情
  • 開放
  • 教學相長
  • 群策群力
  • 質量
  • 專注
  • 服務與尊重
  • 簡易

精簡為六脈神劍:
  • 客戶第一
  • 團隊合作
  • 擁抱變化
  • 誠信
  • 激情
  • 敬業

乍看一下跟一般公司的價值觀沒什麼差別,不過就是口號罷了?大部分的公司可能把價值觀當口號喊完就算了,了不起讓員工能都能背起來,但是如果要落實價值觀呢?阿里的觀念就是把它融入到考評系統,在阿里巴巴價值觀考核和業績考核各佔50%,但是最讓我感到好奇的就是對於價值觀的考核??考評員工是否有符合這些價值觀?到底該怎麼做呢?作文大賽嗎?XD

很好奇的上網查了一下,每條價值觀都有具體的行為準則和等級,然後如果符合也必須舉出具體案例(不管是自評和長官評),雖說上有政策下有對策,KPI 訂出來都可能被玩弄,但是對於大部分的人來說都是很具體可以落實的,不是那種虛無縹緲自己覺得給自己幾分,或是長官覺得給你幾分就幾分


而對於用人的準則也有整理如下(真得很愛取名字和口號XD)




另外也找到下面這個投影片做的蠻不錯的。






2018年5月10日 星期四

Spark 小技巧系列 - 讀取檔案過大發生 java.lang.NegativeArraySizeException 該怎麼處理?


雖然我們知道單一檔案不要太大,或太小,但是有時候人在江湖身不由己,如果遇到單一檔案太大時,系統可能就會噴出以下錯誤:


[WARN] BlockManager   : Putting block rdd_12_0 failed due to exception java.lang.NegativeArraySizeException.
[WARN] BlockManager   : Block rdd_12_0 could not be removed as it was not found on disk or in memory
[ERROR] Executor          : Exception in task 0.0 in stage 3.0 (TID 259)
java.lang.NegativeArraySizeException
        at org.apache.spark.unsafe.types.UTF8String.concatWs(UTF8String.java:901)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:234)
        at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
        at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:86)
        at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:139)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:378)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1109)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) 

2018年5月9日 星期三

Spark 小技巧系列 - left join 後把null 改為0


如果使用Spark 的 left outer join 遇到沒有的資料通常就會以NULL顯示,如下圖所示:


這時候如果我想要計算CTR = click/ impression 會發生什麼事?直接噴錯給你看,然後也不知道發生什麼事....

org.apache.spark.sql.AnalysisException: Resolved attribute(s) 'impressionCount,'clickCount missing from viewCount#965L,(impressionCount / total)#582,dsType#1189,rid#14,impressionCount#534L,recommendCount#1198L,clickCount#1441L,siteId#16 in operator 'Project [siteId#16, rid#14, impressionCount#534L, (impressionCount / total)#582, viewCount#965L, dsType#1189, recommendCount#1198L, clickCount#1441L, ('clickCount / 'impressionCount) AS CTR#2022]. Attribute(s) with the same name appear in the operation: impressionCount,clickCount. Please check if the right attribute(s) are used.;;
'Project [siteId#16, rid#14, impressionCount#534L, (impressionCount / total)#582, viewCount#965L, dsType#1189, recommendCount#1198L, clickCount#1441L, ('clickCount / 'impressionCount) AS CTR#2022]
+- AnalysisBarrier
      +- LogicalRDD [siteId#16, rid#14, impressionCount#534L, (impressionCount / total)#582, viewCount#965L, dsType#1189, recommendCount#1198L, clickCount#1441L], false


    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:289)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3295)
    at org.apache.spark.sql.Dataset.select(Dataset.scala:1307)
    at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2192)
    at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2159)


其實原因就是有Null的存在,這時候只要使用以下技巧補零就可以了。

Dataset join1 = impression.join(broadcast(view), col, LeftOuter.toString())
                               .na()
                               .fill(0, new String[] {"viewCount"});

這段的意思就是會把null 的值,補上任何你想要的值,然後就解決了~


2018年5月3日 星期四

加速 Spark 寫到雲端儲存空間(AWS S3/ Azure blob/ Google GCS )的速度



不知道大家有沒有遇到過明明 Spark 程式都結束了,檔案也寫完了,Driver program 確還不會停止,感覺就當在那邊的經驗?

很多技術細節沒有遇到還真得不知道會有這種設計和改善的方法,會發現這個密技是因為下面幾個條件同時成立才注意到的:

1. 使用雲端儲存空間


為了節省成本,我們並沒有架設自己的HDFS Server,取而代之的都是把要分析的資料和結果儲存在雲端儲存空間( AWS S3/ Azure blob/ Google GCS)。這個選擇會大大降低維運成本和提高檔案的保存可靠度,不過缺點就是失去data locality 的速度優勢,而且每次分析都從雲端拉檔案下來也會花不少時間,所以就是以時間效能換取成本和可靠度。