我試圖在集羣模式下運行EMR 4.3上Kinesis Streaming word-count示例的版本失敗。具體而言,即使我可以訪問流的元數據,也不會從Kinesis讀取消息。Kinesis流式處理示例在EMR(EMR 4.3,Spark 1.6)上無法工作在集羣模式下
此相同的代碼不相同的EMR集羣上運行的客戶端模式(即「本地[*]」),但是當我試圖做到這一點在集羣模式下室壁運動接收機的第一份工作是停留:
起初我以爲這是資源/線程數量的問題,但基於配置和我在紗線看到和Spark UI,這似乎不是這種情況(請參閱下面的所有相關配置)。
我正在尋找爲什麼應用程序無法從Kinesis中讀取的任何指針,或者在配置或設置中建議進行更改以使其在羣集模式下工作。
配置和安裝細節
相關的Kinesis流有一個單一的碎片。
我用下面的配置在EMR集羣設置:
[{"classification":"capacity-scheduler",
"properties":{"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"}},
{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}},
{"classification":"spark-defaults","properties":{
"spark.executor.instances":"0",
"spark.dynamicAllocation.enabled":"true"}}]
我想要的代碼運行:
val appName = "ks_"+DateTime.now().toString(formatter);
val sparkConf = new SparkConf().setAppName(appName)
val sc = new SparkContext(sparkConf);
val batchIntervalInSec = 5
val batchInterval = Seconds(batchIntervalInSec)
val ssc = new StreamingContext(sc, batchInterval)
ssc.checkpoint("/checkpoint")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val numStreams = numShards
val kinesisCheckpointInterval = Seconds(batchIntervalInSec-1)
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val kinesisStreams = (0 until numStreams).map { i =>
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_ONLY)
}
val unionStreams = ssc.union(kinesisStreams)
val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination();
這是火花命令我運行:
spark-submit --deploy-mode cluster
--class com.komoona.spark.kmn_spark_scala.KinesisStream
--master yarn --conf spark.executor.cores=4
--conf spark.executor.instances=2
--conf spark.streaming.blockInterval=1000ms
--jars /home/hadoop/lib/spark-streaming-kinesis-asl-assembly_2.10-1.6.0.jar,/home/hadoop/lib/amazon-kinesis-client-1.6.1.jar,
test_app_full.jar
編輯: 我注意到,在具有2個執行人配置(如在命令行中指定)儘管只有一個執行器和驅動器被示出在火花UI運行:
這會是問題的來源?任何想法可能導致這種情況?
我現在是否指定遺囑執行人的數量作爲命令行參數所看到的,所以忽略我的建議的一部分。 – Leandro