我們的團隊是在一個類似的情況,我們設法解決它:
我們在同一個環境中運行:
- 星火DataProc圖片1版1.6.1用Hadoop 2.7
簡單SparkStream室壁運動的腳本歸結爲:
# Run the script as
# spark-submit \
# --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.1\
# demo_kinesis_streaming.py\
# --awsAccessKeyId FOO\
# --awsSecretKey BAR\
# ...
import argparse
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
ap = argparse.ArgumentParser()
ap.add_argument('--awsAccessKeyId', required=True)
ap.add_argument('--awsSecretKey', required=True)
ap.add_argument('--stream_name')
ap.add_argument('--region')
ap.add_argument('--app_name')
ap = ap.parse_args()
kinesis_application_name = ap.app_name
kinesis_stream_name = ap.stream_name
kinesis_region = ap.region
kinesis_endpoint_url = 'https://kinesis.{}.amazonaws.com'.format(ap.region)
spark_context = SparkContext(appName=kinesis_application_name)
streamingContext = StreamingContext(spark_context, 60)
kinesisStream = KinesisUtils.createStream(
ssc=streamingContext,
kinesisAppName=kinesis_application_name,
streamName=kinesis_stream_name,
endpointUrl=kinesis_endpoint_url,
regionName=kinesis_region,
initialPositionInStream=InitialPositionInStream.TRIM_HORIZON,
checkpointInterval=60,
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
awsAccessKeyId=ap.awsAccessKeyId,
awsSecretKey=ap.awsSecretKey
)
kinesisStream.pprint()
streamingContext.start()
streamingContext.awaitTermination()
的代碼進行了檢測在AWS EMR和使用相同的Spark 1.6.1使用Hadoop 2.7設置本地環境中工作。
- 腳本將返回空RDDS 沒有打印任何錯誤,而有是DataProc的室壁運動流中數據。
- 我們用下面的測試ENVS它DataProc,並他們沒有工作。
- 通過
gcloud
命令提交作業;
ssh
進入羣集主節點並在yarn
客戶端模式下運行;
ssh
進入羣集主節點並運行爲local[*]
。
一旦通過了如下的值更新/etc/spark/conf/log4.properties
啓用詳細日誌記錄:
log4j.rootCategory=DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=DEBUG
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=DEBUG
log4j.logger.org.apache.spark=DEBUG
log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=DEBUG
log4j.logger.org.spark-project.jetty.server.handler.ContextHandler=DEBUG
log4j.logger.org.apache=DEBUG
log4j.logger.com.amazonaws=DEBUG
我們已經注意到一些奇怪的日誌中(請注意,spark-streaming-kinesis-asl_2.10:1.6.1
使用aws-sdk-java/1.9.37
爲依靠某種方式aws-sdk-java/1.7.4
[由用戶代理建議]):
16/07/10 06:30:16 DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer: PROCESS task encountered execution exception:
java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.model.GetRecordsResult.getMillisBehindLatest()Ljava/lang/Long;
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.checkAndSubmitNextTask(ShardConsumer.java:137)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.consumeShard(ShardConsumer.java:126)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:334)
at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)
Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.model.GetRecordsResult.getMillisBehindLatest()Ljava/lang/Long;
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:119)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
content-length:282
content-type:application/x-amz-json-1.1
host:kinesis.ap-southeast-2.amazonaws.com
user-agent:SparkDemo,amazon-kinesis-client-library-java-1.4.0, aws-sdk-java/1.7.4 Linux/3.16.0-4-amd64 OpenJDK_64-Bit_Server_VM/25.91-b14/1.8.0_91
x-amz-date:20160710T063016Z
x-amz-target:Kinesis_20131202.GetRecords
看來,DataProc不得不建立自己的Spark與更古老的AWS SDK的依賴關係,並結合代碼使用時,它會炸燬需要AWS SDK的非常新的版本雖然我們不是確定究竟哪個模塊導致了這個錯誤。
更新: 基礎上@ DennisHuo的評論,此行爲是Hadoop的泄漏引起的類路徑: https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-project/pom.xml#L650
爲了使事情變得糟糕的是AWS KCL 1.4.0(火花1.6.1使用) will suppress any runtime error silently而不是投擲RuntimeException
,並在調試時引起很多頭痛。
最終我們的解決方案是建立我們的org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.1
其所有com.amazonaws.*
陰影的。
構建JAR具有以下POM(更新spark/extra/kinesis-asl/pom.xml
),並與--jars
國旗狗屎新的JAR中
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.6.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<!-- Kinesis integration is not included by default due to ASL-licensed code. -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Kinesis Integration</name>
<properties>
<sbt.project.name>streaming-kinesis-asl</sbt.project.name>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws.kinesis.client.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>${aws.kinesis.producer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-test-tags_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<!-- At a minimum we must include this to force effective pom generation -->
<include>org.spark-project.spark:unused</include>
<include>com.amazonaws:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.amazonaws</pattern>
<shadedPattern>foo.bar.YO.com.amazonaws</shadedPattern>
<includes>
<include>com.amazonaws.**</include>
</includes>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
你能還提供了'demo_kinesis_streaming.py'的內容是什麼?看看你的日誌,我看到:'spark.submit.deployMode - > client spark.master - > local [*]'這意味着由於某種原因你的spark-submit沒有使用集羣的實際Spark設置(假設你在Dataproc集羣上運行spark-submit);也許有什麼是壓倒一切的將spark.master設置爲本地? –
從這個看似[相關的SO問題](http://stackoverflow.com/questions/26941844/apache-spark-kinesis-sample-not-working)和這個[AWS論壇問題](https://forums.aws。 amazon.com/message.jspa?messageID=705666]看起來是空的可能與沒有足夠的執行程序運行來接收和處理數據有關。如果你以某種方式得到本地[2],那麼它可能不是能夠處理你的流,而且,在一個dataproc集羣中,你需要確保你有一個足夠大的集羣來容納足夠的執行器以及 –
Hi @DennisHuo,謝謝你的迴應。下面是鏈接到[demo_kinesis_streaming .py](https://gist.github.com/sshrestha-datalicious/c5efcdcb014da69d361768646f898788)。 關於你對'client spark.master'的評論是'local [*]',我已經通過日誌測試了這個腳本(' ssh')轉換成Dataproc主節點並做了一個'sp ARK-submit'。 謝謝 Suren –