2016-03-06 54 views
1

我試圖在集羣模式下運行EMR 4.3上Kinesis Streaming word-count示例的版本失敗。具體而言,即使我可以訪問流的元數據,也不會從Kinesis讀取消息。Kinesis流式處理示例在EMR(EMR 4.3,Spark 1.6)上無法工作在集羣模式下

此相同的代碼不相同的EMR集羣上運行的客戶端模式(即「本地[*]」),但是當我試圖做到這一點在集羣模式下室壁運動接收機的第一份工作是停留: spark ui jobs

,我什麼也看不到星火UI的流頁: spark ui streaming

起初我以爲這是資源/線程數量的問題,但基於配置和我在紗線看到和Spark UI,這似乎不是這種情況(請參閱下面的所有相關配置)。

我正在尋找爲什麼應用程序無法從Kinesis中讀取的任何指針,或者在配置或設置中建議進行更改以使其在羣集模式下工作。


配置和安裝細節

相關的Kinesis流有一個單一的碎片。

我用下面的配置在EMR集羣設置:

[{"classification":"capacity-scheduler", 

"properties":{"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"}}, 

{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}}, 

{"classification":"spark-defaults","properties":{ 

"spark.executor.instances":"0", 
"spark.dynamicAllocation.enabled":"true"}}] 

這是怎麼我的環境設置看起來像火花UI: saprkui env

我想要的代碼運行:

val appName = "ks_"+DateTime.now().toString(formatter); 
val sparkConf = new SparkConf().setAppName(appName) 


val sc = new SparkContext(sparkConf); 
val batchIntervalInSec = 5 
val batchInterval = Seconds(batchIntervalInSec)  

val ssc = new StreamingContext(sc, batchInterval) 
ssc.checkpoint("/checkpoint") 
val kinesisClient = new AmazonKinesisClient(credentials) 
kinesisClient.setEndpoint(endpointUrl) 
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size 

val numStreams = numShards 

val kinesisCheckpointInterval = Seconds(batchIntervalInSec-1) 
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() 


val kinesisStreams = (0 until numStreams).map { i => 
    KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, 
    InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_ONLY) 
} 

val unionStreams = ssc.union(kinesisStreams) 

val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) 


val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) 

wordCounts.print() 
ssc.start() 
ssc.awaitTermination(); 

這是火花命令我運行:

spark-submit --deploy-mode cluster 
--class com.komoona.spark.kmn_spark_scala.KinesisStream 
--master yarn --conf spark.executor.cores=4 
--conf spark.executor.instances=2 
--conf spark.streaming.blockInterval=1000ms 
--jars /home/hadoop/lib/spark-streaming-kinesis-asl-assembly_2.10-1.6.0.jar,/home/hadoop/lib/amazon-kinesis-client-1.6.1.jar, 
test_app_full.jar 

編輯: 我注意到,在具有2個執行人配置(如在命令行中指定)儘管只有一個執行器和驅動器被示出在火花UI運行: executors

這會是問題的來源?任何想法可能導致這種情況?

回答

0

你的配置中有0個火花執行器 - 我相信你必須增加它。另外,查看日誌中是否有錯誤。

+0

我現在是否指定遺囑執行人的數量作爲命令行參數所看到的,所以忽略我的建議的一部分。 – Leandro

0

經過幾天的頭痛,我再次檢查了我的kinesis配置,並將聚合模式更改爲關閉,它解決了整個問題。 kinesis聚合模式對我來說在本地工作, 但在羣集模式下不emr

+0

您是如何更改聚合模式的? – user1158559

+1

如果您正在使用logstash(如我們所做的那樣),您可以在logstach配置中進行更改,這樣可以解決此問題:https://issues.apache.org/jira/browse/SPARK-14421。但經過幾個月大規模(每小時100GB)的火花和驅動工作後,我的重寫是遠離kinesis,它的工作規模很小,但一旦你開始向許多木材生產者裝載它,我們就會遇到很多延遲問題日誌和許多日誌的到達時間不止一次到達 –

+0

Thankyou對於Kinesis的頭像。我找不到任何文檔,但通過任何配置選項來更改此聚合模式。你有鏈接到這樣的事情? 我最終回滾到Spark 1.4.1的工作位置,但只給了我實際的有效負載但沒有元數據。 – user1158559

0

這對我有效。你也有機會look-

Spark not able to fetch events from Amazon Kinesis

TL; DR

There are 2 versions of the foreachRDD available

unionStreams.foreachRDD 
unionStreams.foreachRDD ((rdd:RDD[Array[Byte]], time: Time) 

For some reason the first one is not able to get me the results but changing to the second one fetches me the results as expected. Yet to explore the reason.

1

我曾與星火+室壁運動+ EMR(多個版本測試)聚合室壁運動流同樣的問題...原來即使Kinesis庫是使用protobuf-java-2.6.1(由於KCL依賴關係而產生的要求)明確構建的,EMR羣集的配置方式仍然會在實踐中使用protobuf-java-2.5.0

我一直沒能看近得弄清楚爲什麼發生這種情況,但我的快速和骯髒的解決辦法是刪除/usr/lib/spark/jars/protobuf-java-2.5.0.jar,並在同一個位置(主節點),與我自己protobuf-java-2.6.1更換。我把一個版本在S3中,有一個引導作用,以aws s3 cp2.6.1罐子倒在/usr/lib/spark/jars正確的位置,然後將以下添加到您的火花提交(更換Scala和火花酌情版本):

--packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0,\ com.google.protobuf:protobuf-java-2.6.1

比我更聰明的人可能會發現一個比這更正確的解決方案,但我還沒有看到從刪除protobuf-java-2.5.0的任何明顯的副作用,但這並不意味着它們不存在。

您可以檢查,如果這是你用--master local[*]代替--master yarn,並期待在日誌以下運行(在你的火花主站)有同樣的問題:

17/01/31 19:24:13 ERROR Worker: Worker.run caught exception, sleeping for 1000 milli seconds! java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: com.google.protobuf.LazyStringList.getUnmodifiableView()Lcom/google/protobuf/LazyStringList;

TL; DR -

protobuf-java-2.6.1.jar更換protobuf-java-2.5.0.jar/usr/lib/spark/jars/的火花主

添加(更換Scala和火花版本)

--packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0,\ com.google.protobuf:protobuf-java-2.6.1 到您的火花提交命令行