2015-07-28 77 views
1

我在爲Spark Streaming部署運行字數統計示例時遇到問題。我正在嘗試部署與Spark示例一起提供的相同文件,但我希望將此特定示例構建和部署爲獨立應用程序。在Spark上部署運行字數

我的文件是這樣的:

package test; 

import scala.Tuple2; 
import com.google.common.collect.Lists; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.api.java.StorageLevels; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

import java.util.regex.Pattern; 

public final class JavaNetworkWordCount { 
    private static final Pattern SPACE = Pattern.compile(" "); 

    public static void main(String[] args) { 
     if (args.length < 2) { 
      System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); 
      System.exit(1); 
     } 


     // Create the context with a 1 second batch size 
     SparkConf sparkConf = new SparkConf() 
       .setAppName("JavaNetworkWordCount"); 
     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
       Durations.seconds(1)); 

     // Create a JavaReceiverInputDStream on target ip:port and count the 
     // words in input stream of \n delimited text (eg. generated by 'nc') 
     // Note that no duplication in storage level only for running locally. 
     // Replication necessary in distributed scenario for fault tolerance. 
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], 
       Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); 
     JavaDStream<String> words = lines 
       .flatMap(new FlatMapFunction<String, String>() { 
        public Iterable<String> call(String x) { 
         return Lists.newArrayList(SPACE.split(x)); 
        } 
       }); 
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() { 
        public Tuple2<String, Integer> call(String s) { 
         return new Tuple2<String, Integer>(s, 1); 
        } 
       }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
      public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
      } 
     }); 

     wordCounts.print(); 
     ssc.start(); 
     ssc.awaitTermination(); 
    } 
} 

我的POM是這樣的:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 

<groupId>io.tester</groupId> 
<artifactId>streamer</artifactId> 
<version>0.0.1-SNAPSHOT</version> 
<packaging>jar</packaging> 

<name>streamer</name> 
<url>http://maven.apache.org</url> 

<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <scala.binary.version>2.11</scala.binary.version> 
    <spark.version>1.4.1</spark.version> 
    <java.version>1.7</java.version> 
</properties> 

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 
     <scope>provided</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 
     <scope>provided</scope> 
    </dependency> 
</dependencies> 



<build> 
    <pluginManagement> 
     <plugins> 
      <plugin> 
       <artifactId>maven-assembly-plugin</artifactId> 
       <configuration> 
        <archive> 
         <manifest> 
          <mainClass>test.JavaNetworkWordCount</mainClass> 
         </manifest> 
        </archive> 
        <descriptorRefs> 
         <descriptorRef>jar-with-dependencies</descriptorRef> 
        </descriptorRefs> 
       </configuration> 
      </plugin> 
     </plugins> 
    </pluginManagement> 
</build> 

,我得到的錯誤是:

java.lang.NoClassDefFoundError: com/google/common/collect/Lists 

我通過我的jar看我用maven構建。它有一個附加的依賴關係,但它似乎並沒有任何依賴關係。我通過mvn assembly運行它:single。我究竟做錯了什麼?

+0

請提供您如何提交作業...用火花提交? –

+0

./bin/spark-submit --class io.tester.JavaNetworkWordCount --master local [2] /tmp/streamer-0.0.1-SNAPSHOT.jar localhost 9999 – james

回答

0

由於maven-assembly-plugin表示

如果您的項目要打包神器在尤伯杯罐子,組裝插件只提供基本的支持。如需更多控制,請使用Maven Shade插件

您可以嘗試使用maven-shade-plugin。嘗試更換maven-assembly-plugin插件標籤:

<plugin> 
    <groupId>org.apache.maven.plugins</groupId> 
    <artifactId>maven-shade-plugin</artifactId> 
    <version>2.3</version> 
    <executions> 
    <!-- Run shade goal on package phase --> 
    <execution> 
    <phase>package</phase> 
    <goals> 
     <goal>shade</goal> 
    </goals> 
    <configuration> 
     <transformers> 
     <!-- add Main-Class to manifest file --> 
     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 
      <mainClass>test.JavaNetworkWordCount</mainClass> 
     </transformer> 
     </transformers> 
    </configuration> 
     </execution> 
    </executions> 
</plugin> 

這樣應該可以創建一個包含所有你的依賴脂肪罐子。

+0

剛試過這個。沒有工作。我也試過裝配插件。也沒有爲我工作。 with-dependencies jar被構建,但是它沒有依賴關係。只是我的課程。它與Scala有什麼關係?另外,有沒有一種方法可以提交給Spark而沒有可執行的jar文件?在此先感謝您的幫助 – james

0

我明白了。我有兩個問題。一,我沒有注意到我有提供的條款(愚蠢的剪切和粘貼錯誤)。我遇到的第二個問題是,其中一個依賴關係被簽名,我需要明確排除簽名文件。我的最終產品,實際上萬一有人是這樣的長相是別人有這個問題:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 

<groupId>io.tester</groupId> 
<artifactId>streamer</artifactId> 
<version>0.0.1-SNAPSHOT</version> 
<packaging>jar</packaging> 

<name>streamer</name> 
<url>http://maven.apache.org</url> 

<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <scala.binary.version>2.11</scala.binary.version> 
    <spark.version>1.4.1</spark.version> 
    <java.version>1.7</java.version> 
</properties> 

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 

    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 

    </dependency> 
</dependencies> 



<build> 
    <plugins> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-shade-plugin</artifactId> 
      <version>2.3</version> 
      <executions> 
       <!-- Run shade goal on package phase --> 
       <execution> 
        <phase>package</phase> 
        <goals> 
         <goal>shade</goal> 
        </goals> 
        <configuration> 
         <filters> 
          <filter> 
           <artifact>*:*</artifact> 
           <excludes> 
            <exclude>META-INF/*.SF</exclude> 
            <exclude>META-INF/*.DSA</exclude> 
            <exclude>META-INF/*.RSA</exclude> 
           </excludes> 
          </filter> 
         </filters> 
         <transformers> 
          <!-- add Main-Class to manifest file --> 
          <transformer 
           implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 
           <mainClass>io.tester.streamer.JavaNetworkWordCount</mainClass> 
          </transformer> 
         </transformers> 
        </configuration> 
       </execution> 
      </executions> 
     </plugin> 
    </plugins> 
</build>