3

我試圖連接到谷歌從亞馬遜Dataproc的Kinesis流,但我只得到空RDDS。室壁運動流與在谷歌Dataproc空記錄星火1.6.1 Hadoop的2.7.2

Command: spark-submit --verbose --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.2 demo_kinesis_streaming.py --awsAccessKeyId XXXXX  --awsSecretKey XXXX 

詳細日誌:https://gist.github.com/sshrestha-datalicious/e3fc8ebb4916f27735a97e9fcc42136c

更多詳細信息
星火1.6.1
的Hadoop 2.7.2
裝配中使用:/usr/lib/spark/lib/spark-assembly-1.6 .1 hadoop2.7.2.jar

令人驚訝的是,當我下載並使用Hadoop 2.6.0用下面的命令使用含有SPARK 1.6.1裝配工作。

Command: SPARK_HOME=/opt/spark-1.6.1-bin-hadoop2.6 spark-submit --verbose --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.2 demo_kinesis_streaming.py --awsAccessKeyId XXXXX  --awsSecretKey XXXX 

我不知道是否有兩個版本的Hadoop和室壁運動ASL之間的任何版本衝突,或者有與谷歌Dataproc自定義設置的事情。

任何幫助,將不勝感激。

感謝
蘇倫

+0

你能還提供了'demo_kinesis_streaming.py'的內容是什麼?看看你的日誌,我看到:'spark.submit.deployMode - > client spark.master - > local [*]'這意味着由於某種原因你的spark-submit沒有使用集羣的實際Spark設置(假設你在Dataproc集羣上運行spark-submit);也許有什麼是壓倒一切的將spark.master設置爲本地? –

+0

從這個看似[相關的SO問題](http://stackoverflow.com/questions/26941844/apache-spark-kinesis-sample-not-working)和這個[AWS論壇問題](https://forums.aws。 amazon.com/message.jspa?messageID=705666]看起來是空的可能與沒有足夠的執行程序運行來接收和處理數據有關。如果你以某種方式得到本地[2],那麼它可能不是能夠處理你的流,而且,在一個dataproc集羣中,你需要確保你有一個足夠大的集羣來容納足夠的執行器以及 –

+0

Hi @DennisHuo,謝謝你的迴應。下面是鏈接到[demo_kinesis_streaming .py](https://gist.github.com/sshrestha-datalicious/c5efcdcb014da69d361768646f898788)。 關於你對'client spark.master'的評論是'local [*]',我已經通過日誌測試了這個腳本(' ssh')轉換成Dataproc主節點並做了一個'sp ARK-submit'。 謝謝 Suren –

回答

2

我們的團隊是在一個類似的情況,我們設法解決它:

我們在同一個環境中運行:

  • 星火DataProc圖片1版1.6.1用Hadoop 2.7
  • 簡單SparkStream室壁運動的腳本歸結爲:

    # Run the script as 
    # spark-submit \ 
    # --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.1\ 
    # demo_kinesis_streaming.py\ 
    # --awsAccessKeyId FOO\ 
    # --awsSecretKey BAR\ 
    # ... 
    
    import argparse 
    
    from pyspark import SparkContext 
    from pyspark.streaming import StreamingContext 
    from pyspark.storagelevel import StorageLevel 
    
    from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream 
    
    ap = argparse.ArgumentParser() 
    ap.add_argument('--awsAccessKeyId', required=True) 
    ap.add_argument('--awsSecretKey', required=True) 
    ap.add_argument('--stream_name') 
    ap.add_argument('--region') 
    ap.add_argument('--app_name') 
    ap = ap.parse_args() 
    
    kinesis_application_name = ap.app_name 
    kinesis_stream_name = ap.stream_name 
    kinesis_region = ap.region 
    kinesis_endpoint_url = 'https://kinesis.{}.amazonaws.com'.format(ap.region) 
    
    spark_context = SparkContext(appName=kinesis_application_name) 
    streamingContext = StreamingContext(spark_context, 60) 
    
    kinesisStream = KinesisUtils.createStream(
        ssc=streamingContext, 
        kinesisAppName=kinesis_application_name, 
        streamName=kinesis_stream_name, 
        endpointUrl=kinesis_endpoint_url, 
        regionName=kinesis_region, 
        initialPositionInStream=InitialPositionInStream.TRIM_HORIZON, 
        checkpointInterval=60, 
        storageLevel=StorageLevel.MEMORY_AND_DISK_2, 
        awsAccessKeyId=ap.awsAccessKeyId, 
        awsSecretKey=ap.awsSecretKey 
    ) 
    
    kinesisStream.pprint() 
    
    streamingContext.start() 
    streamingContext.awaitTermination() 
    
  • 的代碼進行了檢測在AWS EMR和使用相同的Spark 1.6.1使用Hadoop 2.7設置本地環境中工作

  • 腳本將返回空RDDS 沒有打印任何錯誤,而有是DataProc的室壁運動流中數據。
  • 我們用下面的測試ENVS它DataProc,並他們沒有工作。
    1. 通過gcloud命令提交作業;
    2. ssh進入羣集主節點並在yarn客戶端模式下運行;
    3. ssh進入羣集主節點並運行爲local[*]

一旦通過了如下的值更新/etc/spark/conf/log4.properties啓用詳細日誌記錄:

log4j.rootCategory=DEBUG, console 
    log4j.appender.console=org.apache.log4j.ConsoleAppender 
    log4j.appender.console.target=System.err 
    log4j.appender.console.layout=org.apache.log4j.PatternLayout 
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n 
    log4j.logger.org.eclipse.jetty=ERROR 
    log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR 
    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=DEBUG 
    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=DEBUG 
    log4j.logger.org.apache.spark=DEBUG 
    log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=DEBUG 
    log4j.logger.org.spark-project.jetty.server.handler.ContextHandler=DEBUG 
    log4j.logger.org.apache=DEBUG 
    log4j.logger.com.amazonaws=DEBUG 

我們已經注意到一些奇怪的日誌中(請注意,spark-streaming-kinesis-asl_2.10:1.6.1使用aws-sdk-java/1.9.37爲依靠某種方式aws-sdk-java/1.7.4 [由用戶代理建議]):

16/07/10 06:30:16 DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer: PROCESS task encountered execution exception: 
    java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.model.GetRecordsResult.getMillisBehindLatest()Ljava/lang/Long; 
     at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
     at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.checkAndSubmitNextTask(ShardConsumer.java:137) 
     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.consumeShard(ShardConsumer.java:126) 
     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:334) 
     at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174) 

    Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.model.GetRecordsResult.getMillisBehindLatest()Ljava/lang/Long; 
     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:119) 
     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48) 
     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

    content-length:282 
    content-type:application/x-amz-json-1.1 
    host:kinesis.ap-southeast-2.amazonaws.com 
    user-agent:SparkDemo,amazon-kinesis-client-library-java-1.4.0, aws-sdk-java/1.7.4 Linux/3.16.0-4-amd64 OpenJDK_64-Bit_Server_VM/25.91-b14/1.8.0_91 
    x-amz-date:20160710T063016Z 
    x-amz-target:Kinesis_20131202.GetRecords 

看來,DataProc不得不建立自己的Spark與更古老的AWS SDK的依賴關係,並結合代碼使用時,它會炸燬需要AWS SDK的非常新的版本雖然我們不是確定究竟哪個模塊導致了這個錯誤。

更新: 基礎上@ DennisHuo的評論,此行爲是Hadoop的泄漏引起的類路徑: https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-project/pom.xml#L650

爲了使事情變得糟糕的是AWS KCL 1.4.0(火花1.6.1使用) will suppress any runtime error silently而不是投擲RuntimeException,並在調試時引起很多頭痛。


最終我們的解決方案是建立我們的org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.1其所有com.amazonaws.*陰影的。

構建JAR具有以下POM(更新spark/extra/kinesis-asl/pom.xml),並與--jars國旗狗屎新的JAR中​​

<?xml version="1.0" encoding="UTF-8"?> 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <parent> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-parent_2.10</artifactId> 
    <version>1.6.1</version> 
    <relativePath>../../pom.xml</relativePath> 
    </parent> 

    <!-- Kinesis integration is not included by default due to ASL-licensed code. --> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming-kinesis-asl_2.10</artifactId> 
    <packaging>jar</packaging> 
    <name>Spark Kinesis Integration</name> 

    <properties> 
    <sbt.project.name>streaming-kinesis-asl</sbt.project.name> 
    </properties> 

    <dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_${scala.binary.version}</artifactId> 
     <version>${project.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_${scala.binary.version}</artifactId> 
     <version>${project.version}</version> 
     <type>test-jar</type> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_${scala.binary.version}</artifactId> 
     <version>${project.version}</version> 
     <type>test-jar</type> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>com.amazonaws</groupId> 
     <artifactId>amazon-kinesis-client</artifactId> 
     <version>${aws.kinesis.client.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>com.amazonaws</groupId> 
     <artifactId>amazon-kinesis-producer</artifactId> 
     <version>${aws.kinesis.producer.version}</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.mockito</groupId> 
     <artifactId>mockito-core</artifactId> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.scalacheck</groupId> 
     <artifactId>scalacheck_${scala.binary.version}</artifactId> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-test-tags_${scala.binary.version}</artifactId> 
    </dependency> 
    </dependencies> 

    <build> 
    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> 
    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> 

    <plugins> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-shade-plugin</artifactId> 
      <configuration> 
      <shadedArtifactAttached>false</shadedArtifactAttached> 

      <artifactSet> 
       <includes> 
       <!-- At a minimum we must include this to force effective pom generation --> 
       <include>org.spark-project.spark:unused</include> 
       <include>com.amazonaws:*</include> 
       </includes> 
      </artifactSet> 

      <relocations> 
       <relocation> 
       <pattern>com.amazonaws</pattern> 
       <shadedPattern>foo.bar.YO.com.amazonaws</shadedPattern> 
       <includes> 
        <include>com.amazonaws.**</include> 
       </includes> 
       </relocation> 
      </relocations> 

      </configuration> 
      <executions> 
      <execution> 
       <phase>package</phase> 
       <goals> 
       <goal>shade</goal> 
       </goals> 
      </execution> 
      </executions> 
     </plugin> 
    </plugins> 
    </build> 
</project> 
+0

@DennisHuo,你能否確認DataProc發佈的是'aws-java-sdk'的舊版本?如果是這樣,是否有機會默認遮蔽它?謝謝。 –

+0

我只是加倍檢查,Dataproc特定的代碼絕對不會導入任何較舊的'aws-java-sdk'依賴項。它來自[hadoop-mapreduce構建本身](https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-project/pom.xml#L650),顯然主要針對[S3A]( https://github.com/apache/hadoop/blob/c58a59f7081d55dd2108545ebf9ee48cf43ca944/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java)。不幸的是,在覈心Hadoop中遮蔽它會有些冒險,因爲其他人將會依賴漏洞類路徑行爲。 –

+0

基於Hadoop的項目提交[其中更新AWS-java的SDK版本](https://github.com/apache/hadoop/commit/f7b0f292e722fa819900f455a070be1d7bf97072),有在S3接口必須是一些細微的變化製作。然而,檢查的Javadoc它確實出現的較新版本是至少向後兼容,即使[方法已不](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/ S3 /模型/ ObjectMetadata.html#getServerSideEncryption--)。 –