2017-07-02 85 views
1

我在創建基本Spark流應用程序時遇到困難。如何整合Spark和Kafka直接流

現在,我在本地機器上試用它。

我已經完成以下設置。

-SETUP動物園管理員

-SETUP卡夫卡(版本:kafka_2.10-0.9.0.1)

使用以下命令

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

-Started生產者和消費者-Created話題在兩個不同的CMD終端上使用以下命令:

生產者:

kafka-console-producer.bat --broker-list localhost:9092 --topic test

消費者:

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

現在,我可以接受,我在消費控制檯生產者終端輸入數據。

現在我試圖將Kafka整合到Apache Spark流中。

下面是我從官方文檔中引用的示例代碼。 Kafka & Spark SetupKafka & Spark Integration

public class KafkaStreamingTry { 
    static Map<String, Object> kafkaParams = new HashMap<>(); 


    public static void main(String[] args) throws InterruptedException, StreamingQueryException { 
     // Create a local StreamingContext with two working thread and batch interval of 1 second 
     SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Sampleapp"); 
     JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10)); 

     kafkaParams.put("bootstrap.servers", "localhost:9092"); 
     kafkaParams.put("key.deserializer", StringDeserializer.class); 
     kafkaParams.put("value.deserializer", StringDeserializer.class); 
     kafkaParams.put("group.id", "0"); 
     kafkaParams.put("auto.offset.reset", "earliest"); // from-beginning? 
     kafkaParams.put("enable.auto.commit", false); 

     Collection<String> topics = Arrays.asList("test"); 

     final JavaInputDStream<ConsumerRecord<String, String>> stream = 
       KafkaUtils.createDirectStream(
         jssc, 
         LocationStrategies.PreferConsistent(), 
         ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
       ); 


     System.out.println("Direct Stream created? "); 
     stream.mapToPair(
       new PairFunction<ConsumerRecord<String, String>, String, String>() { 
        @Override 
        public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception { 
         System.out.println("record key : "+record.key()+" value is : "+record.value()); 
         return new Tuple2<>(record.key(), record.value()); 
        } 
       }); 

     System.out.println("Reached the end."); 
    } 
} 

如果我運行這個程序,以下是日誌。

"C:\Program Files\Java\jdk1.8.0_11\bin\java" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2017.1.4\lib\idea_rt.jar=51332:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2017.1.4\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_11\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\rt.jar;C:\Users\driftking9987\IdeaProjects\sparktest\target\classes;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-streaming_2.10\2.1.0\spark-streaming_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-tags_2.10\2.1.0\spark-tags_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\scalatest\scalatest_2.10\2.2.6\scalatest_2.10-2.2.6.jar;C:\Users\driftking9987\.m2\repository\org\spark-project\spark\unused\1.0.0\unused-1.0.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-core_2.10\2.1.0\spark-core_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\avro\avro-mapred\1.7.7\avro-mapred-1.7.7-hadoop2.jar;C:\Users\driftking9987\.m2\repository\org\apache\avro\avro-ipc\1.7.7\avro-ipc-1.7.7.jar;C:\Users\driftking9987\.m2\repository\org\apache\avro\avro\1.7.7\avro-1.7.7.jar;C:\Users\driftking9987\.m2\repository\org\apache\avro\avro-ipc\1.7.7\avro-ipc-1.7.7-tests.jar;C:\Users\driftking9987\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.9.13\jackson-core-asl-1.9.13.jar;C:\Users\driftking9987\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.9.13\jackson-mapper-asl-1.9.13.jar;C:\Users\driftking9987\.m2\repository\com\twitter\chill_2.10\0.8.0\chill_2.10-0.8.0.jar;C:\Users\driftking9987\.m2\repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;C:\Users\driftking9987\.m2\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;C:\Users\driftking9987\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\driftking9987\.m2\repository\com\twitter\chill-java\0.8.0\chill-java-0.8.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\xbean\xbean-asm5-shaded\4.4\xbean-asm5-shaded-4.4.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-client\2.2.0\hadoop-client-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-common\2.2.0\hadoop-common-2.2.0.jar;C:\Users\driftking9987\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-math\2.1\commons-math-2.1.jar;C:\Users\driftking9987\.m2\repository\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;C:\Users\driftking9987\.m2\repository\commons-io\commons-io\2.1\commons-io-2.1.jar;C:\Users\driftking9987\.m2\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;C:\Users\driftking9987\.m2\repository\commons-configuration\commons-configuration\1.6\commons-configuration-1.6.jar;C:\Users\driftking9987\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\driftking9987\.m2\repository\commons-digester\commons-digester\1.8\commons-digester-1.8.jar;C:\Users\driftking9987\.m2\repository\commons-beanutils\commons-beanutils\1.7.0\commons-beanutils-1.7.0.jar;C:\Users\driftking9987\.m2\repository\commons-beanutils\commons-beanutils-core\1.8.0\commons-beanutils-core-1.8.0.jar;C:\Users\driftking9987\.m2\repository\com\google\protobuf\protobuf-java\2.5.0\protobuf-java-2.5.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-auth\2.2.0\hadoop-auth-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;C:\Users\driftking9987\.m2\repository\org\tukaani\xz\1.0\xz-1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-hdfs\2.2.0\hadoop-hdfs-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-app\2.2.0\hadoop-mapreduce-client-app-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-common\2.2.0\hadoop-mapreduce-client-common-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-yarn-client\2.2.0\hadoop-yarn-client-2.2.0.jar;C:\Users\driftking9987\.m2\repository\com\google\inject\guice\3.0\guice-3.0.jar;C:\Users\driftking9987\.m2\repository\javax\inject\javax.inject\1\javax.inject-1.jar;C:\Users\driftking9987\.m2\repository\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-yarn-server-common\2.2.0\hadoop-yarn-server-common-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-shuffle\2.2.0\hadoop-mapreduce-client-shuffle-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-yarn-api\2.2.0\hadoop-yarn-api-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-core\2.2.0\hadoop-mapreduce-client-core-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-yarn-common\2.2.0\hadoop-yarn-common-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-jobclient\2.2.0\hadoop-mapreduce-client-jobclient-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-annotations\2.2.0\hadoop-annotations-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-launcher_2.10\2.1.0\spark-launcher_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-network-common_2.10\2.1.0\spark-network-common_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\fusesource\leveldbjni\leveldbjni-all\1.8\leveldbjni-all-1.8.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.6.5\jackson-annotations-2.6.5.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-network-shuffle_2.10\2.1.0\spark-network-shuffle_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-unsafe_2.10\2.1.0\spark-unsafe_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\net\java\dev\jets3t\jets3t\0.7.1\jets3t-0.7.1.jar;C:\Users\driftking9987\.m2\repository\commons-codec\commons-codec\1.3\commons-codec-1.3.jar;C:\Users\driftking9987\.m2\repository\commons-httpclient\commons-httpclient\3.1\commons-httpclient-3.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\curator\curator-recipes\2.4.0\curator-recipes-2.4.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\curator\curator-framework\2.4.0\curator-framework-2.4.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\curator\curator-client\2.4.0\curator-client-2.4.0.jar;C:\Users\driftking9987\.m2\repository\com\google\guava\guava\14.0.1\guava-14.0.1.jar;C:\Users\driftking9987\.m2\repository\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-lang3\3.5\commons-lang3-3.5.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-math3\3.4.1\commons-math3-3.4.1.jar;C:\Users\driftking9987\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\driftking9987\.m2\repository\org\slf4j\slf4j-api\1.7.16\slf4j-api-1.7.16.jar;C:\Users\driftking9987\.m2\repository\org\slf4j\jul-to-slf4j\1.7.16\jul-to-slf4j-1.7.16.jar;C:\Users\driftking9987\.m2\repository\org\slf4j\jcl-over-slf4j\1.7.16\jcl-over-slf4j-1.7.16.jar;C:\Users\driftking9987\.m2\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;C:\Users\driftking9987\.m2\repository\org\slf4j\slf4j-log4j12\1.7.16\slf4j-log4j12-1.7.16.jar;C:\Users\driftking9987\.m2\repository\com\ning\compress-lzf\1.0.3\compress-lzf-1.0.3.jar;C:\Users\driftking9987\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\driftking9987\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\driftking9987\.m2\repository\org\roaringbitmap\RoaringBitmap\0.5.11\RoaringBitmap-0.5.11.jar;C:\Users\driftking9987\.m2\repository\commons-net\commons-net\2.2\commons-net-2.2.jar;C:\Users\driftking9987\.m2\repository\org\json4s\json4s-jackson_2.10\3.2.11\json4s-jackson_2.10-3.2.11.jar;C:\Users\driftking9987\.m2\repository\org\json4s\json4s-core_2.10\3.2.11\json4s-core_2.10-3.2.11.jar;C:\Users\driftking9987\.m2\repository\org\json4s\json4s-ast_2.10\3.2.11\json4s-ast_2.10-3.2.11.jar;C:\Users\driftking9987\.m2\repository\com\thoughtworks\paranamer\paranamer\2.6\paranamer-2.6.jar;C:\Users\driftking9987\.m2\repository\org\scala-lang\scalap\2.10.0\scalap-2.10.0.jar;C:\Users\driftking9987\.m2\repository\org\scala-lang\scala-compiler\2.10.0\scala-compiler-2.10.0.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\core\jersey-client\2.22.2\jersey-client-2.22.2.jar;C:\Users\driftking9987\.m2\repository\javax\ws\rs\javax.ws.rs-api\2.0.1\javax.ws.rs-api-2.0.1.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\hk2-api\2.4.0-b34\hk2-api-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\hk2-utils\2.4.0-b34\hk2-utils-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\external\aopalliance-repackaged\2.4.0-b34\aopalliance-repackaged-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\external\javax.inject\2.4.0-b34\javax.inject-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\hk2-locator\2.4.0-b34\hk2-locator-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\javassist\javassist\3.18.1-GA\javassist-3.18.1-GA.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\core\jersey-common\2.22.2\jersey-common-2.22.2.jar;C:\Users\driftking9987\.m2\repository\javax\annotation\javax.annotation-api\1.2\javax.annotation-api-1.2.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\bundles\repackaged\jersey-guava\2.22.2\jersey-guava-2.22.2.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\osgi-resource-locator\1.0.1\osgi-resource-locator-1.0.1.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\core\jersey-server\2.22.2\jersey-server-2.22.2.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\media\jersey-media-jaxb\2.22.2\jersey-media-jaxb-2.22.2.jar;C:\Users\driftking9987\.m2\repository\javax\validation\validation-api\1.1.0.Final\validation-api-1.1.0.Final.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\containers\jersey-container-servlet\2.22.2\jersey-container-servlet-2.22.2.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\containers\jersey-container-servlet-core\2.22.2\jersey-container-servlet-core-2.22.2.jar;C:\Users\driftking9987\.m2\repository\io\netty\netty-all\4.0.42.Final\netty-all-4.0.42.Final.jar;C:\Users\driftking9987\.m2\repository\io\netty\netty\3.8.0.Final\netty-3.8.0.Final.jar;C:\Users\driftking9987\.m2\repository\com\clearspring\analytics\stream\2.7.0\stream-2.7.0.jar;C:\Users\driftking9987\.m2\repository\io\dropwizard\metrics\metrics-core\3.1.2\metrics-core-3.1.2.jar;C:\Users\driftking9987\.m2\repository\io\dropwizard\metrics\metrics-jvm\3.1.2\metrics-jvm-3.1.2.jar;C:\Users\driftking9987\.m2\repository\io\dropwizard\metrics\metrics-json\3.1.2\metrics-json-3.1.2.jar;C:\Users\driftking9987\.m2\repository\io\dropwizard\metrics\metrics-graphite\3.1.2\metrics-graphite-3.1.2.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.6.5\jackson-databind-2.6.5.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.6.5\jackson-core-2.6.5.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\module\jackson-module-scala_2.10\2.6.5\jackson-module-scala_2.10-2.6.5.jar;C:\Users\driftking9987\.m2\repository\org\scala-lang\scala-reflect\2.10.6\scala-reflect-2.10.6.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\module\jackson-module-paranamer\2.6.5\jackson-module-paranamer-2.6.5.jar;C:\Users\driftking9987\.m2\repository\org\apache\ivy\ivy\2.4.0\ivy-2.4.0.jar;C:\Users\driftking9987\.m2\repository\oro\oro\2.0.8\oro-2.0.8.jar;C:\Users\driftking9987\.m2\repository\net\razorvine\pyrolite\4.13\pyrolite-4.13.jar;C:\Users\driftking9987\.m2\repository\net\sf\py4j\py4j\0.10.4\py4j-0.10.4.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-crypto\1.0.0\commons-crypto-1.0.0.jar;C:\Users\driftking9987\.m2\repository\org\scala-lang\scala-library\2.10.6\scala-library-2.10.6.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-streaming-kafka-0-10_2.10\2.1.1\spark-streaming-kafka-0-10_2.10-2.1.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\kafka\kafka_2.10\0.10.2.1\kafka_2.10-0.10.2.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\kafka\kafka-clients\0.10.2.1\kafka-clients-0.10.2.1.jar;C:\Users\driftking9987\.m2\repository\net\sf\jopt-simple\jopt-simple\5.0.3\jopt-simple-5.0.3.jar;C:\Users\driftking9987\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\driftking9987\.m2\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;C:\Users\driftking9987\.m2\repository\org\apache\zookeeper\zookeeper\3.4.9\zookeeper-3.4.9.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-sql_2.10\2.1.1\spark-sql_2.10-2.1.1.jar;C:\Users\driftking9987\.m2\repository\com\univocity\univocity-parsers\2.2.1\univocity-parsers-2.2.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-sketch_2.10\2.1.1\spark-sketch_2.10-2.1.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-catalyst_2.10\2.1.1\spark-catalyst_2.10-2.1.1.jar;C:\Users\driftking9987\.m2\repository\org\codehaus\janino\janino\3.0.0\janino-3.0.0.jar;C:\Users\driftking9987\.m2\repository\org\codehaus\janino\commons-compiler\3.0.0\commons-compiler-3.0.0.jar;C:\Users\driftking9987\.m2\repository\org\antlr\antlr4-runtime\4.5.3\antlr4-runtime-4.5.3.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-column\1.8.1\parquet-column-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-common\1.8.1\parquet-common-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-encoding\1.8.1\parquet-encoding-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-hadoop\1.8.1\parquet-hadoop-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-format\2.3.0-incubating\parquet-format-2.3.0-incubating.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-jackson\1.8.1\parquet-jackson-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-sql-kafka-0-10_2.11\2.1.1\spark-sql-kafka-0-10_2.11-2.1.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-tags_2.11\2.1.1\spark-tags_2.11-2.1.1.jar" kafkatry 
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
    17/07/03 01:23:55 INFO SparkContext: Running Spark version 2.1.0 
    17/07/03 01:23:55 WARN SparkContext: Support for Scala 2.10 is deprecated as of Spark 2.1.0 
    17/07/03 01:23:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
    17/07/03 01:23:56 INFO SecurityManager: Changing view acls to: driftking9987 
    17/07/03 01:23:56 INFO SecurityManager: Changing modify acls to: driftking9987 
    17/07/03 01:23:56 INFO SecurityManager: Changing view acls groups to: 
    17/07/03 01:23:56 INFO SecurityManager: Changing modify acls groups to: 
    17/07/03 01:23:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(driftking9987); groups with view permissions: Set(); users with modify permissions: Set(driftking9987); groups with modify permissions: Set() 
    17/07/03 01:23:57 INFO Utils: Successfully started service 'sparkDriver' on port 51353. 
    17/07/03 01:23:57 INFO SparkEnv: Registering MapOutputTracker 
    17/07/03 01:23:57 INFO SparkEnv: Registering BlockManagerMaster 
    17/07/03 01:23:57 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 
    17/07/03 01:23:57 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 
    17/07/03 01:23:58 INFO DiskBlockManager: Created local directory at C:\Users\driftking9987\AppData\Local\Temp\blockmgr-4535fb48-e3c2-4af7-a57a-c5c54c2d9ed1 
    17/07/03 01:23:58 INFO MemoryStore: MemoryStore started with capacity 352.5 MB 
    17/07/03 01:23:58 INFO SparkEnv: Registering OutputCommitCoordinator 
    17/07/03 01:23:58 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
    17/07/03 01:23:58 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.56.1:4040 
    17/07/03 01:23:59 INFO Executor: Starting executor ID driver on host localhost 
    17/07/03 01:23:59 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51374. 
    17/07/03 01:23:59 INFO NettyBlockTransferService: Server created on 192.168.56.1:51374 
    17/07/03 01:23:59 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 
    17/07/03 01:23:59 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.56.1, 51374, None) 
    17/07/03 01:23:59 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.56.1:51374 with 352.5 MB RAM, BlockManagerId(driver, 192.168.56.1, 51374, None) 
    17/07/03 01:23:59 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.56.1, 51374, None) 
    17/07/03 01:23:59 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.56.1, 51374, None) 
    17/07/03 01:24:00 WARN KafkaUtils: overriding enable.auto.commit to false for executor 
    17/07/03 01:24:00 WARN KafkaUtils: overriding auto.offset.reset to none for executor 
    17/07/03 01:24:00 WARN KafkaUtils: overriding executor group.id to spark-executor-0 
    17/07/03 01:24:00 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135 
    17/07/03 01:24:00 INFO SparkContext: Invoking stop() from shutdown hook 
    17/07/03 01:24:00 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040 
    17/07/03 01:24:00 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
    17/07/03 01:24:00 INFO MemoryStore: MemoryStore cleared 
    17/07/03 01:24:00 INFO BlockManager: BlockManager stopped 
    17/07/03 01:24:00 INFO BlockManagerMaster: BlockManagerMaster stopped 
    17/07/03 01:24:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
    17/07/03 01:24:00 INFO SparkContext: Successfully stopped SparkContext 
    17/07/03 01:24:00 INFO ShutdownHookManager: Shutdown hook called 
    17/07/03 01:24:00 INFO ShutdownHookManager: Deleting directory C:\Users\driftking9987\AppData\Local\Temp\spark-cd270e2f-257c-4878-88b4-8f908d65f76a 

Process finished with exit code 0 

現在,如果我添加

jssc.start();// Start the computation 
jssc.awaitTermination(); 

它給出以下以下錯誤,

17/07/03 01:26:46 ERROR StreamingContext: Error starting the context, marking it as stopped 
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:233) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) 
    at KafkaStreamingTry.main(KafkaStreamingTry.java:74) 
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:233) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) 
    at KafkaStreamingTry.main(KafkaStreamingTry.java:74) 
17/07/03 01:26:46 INFO SparkContext: Invoking stop() from shutdown hook 
17/07/03 01:26:46 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040 
17/07/03 01:26:46 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
17/07/03 01:26:46 INFO MemoryStore: MemoryStore cleared 
17/07/03 01:26:46 INFO BlockManager: BlockManager stopped 
17/07/03 01:26:46 INFO BlockManagerMaster: BlockManagerMaster stopped 
17/07/03 01:26:46 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
17/07/03 01:26:46 INFO SparkContext: Successfully stopped SparkContext 
17/07/03 01:26:46 INFO ShutdownHookManager: Shutdown hook called 

這是顯而易見的。

可以請告訴我如何讓它運行?我的要求是,只要我在生產者終端中輸入任何數據,java應用程序應該承認它並至少打印一次。我將嘗試弄清楚我將收到的json的計算部分。

This是我之前詢問過的問題,並試圖按照建議的架構。

感謝

編輯:

POM。XML

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>com.cs</groupId> 
    <artifactId>sparktest</artifactId> 
    <version>1.0-SNAPSHOT</version> 
    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 
    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.10</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.10</artifactId> 
      <version>2.1.0</version> 
     </dependency> 


     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.10.6</version> 
     </dependency> 
     <!--For kafka integgration--> 
     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.10 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka-0-10_2.10</artifactId> 
      <version>2.1.1</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 --> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.10</artifactId> 
      <version>0.10.2.1</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_2.10</artifactId> 
      <version>2.1.1</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql-kafka-0-10_2.11</artifactId> 
      <version>2.1.1</version> 
     </dependency> 



    </dependencies> 
</project> 

回答

1

創建與卡夫卡創建JavaPairDStream直接流之後。現在,您可以遍歷JavaPairDStream並打印卡夫卡消息的鍵和值。

JavaPairDStream<String, String> jPairDStream = stream.mapToPair(
     new PairFunction<ConsumerRecord<String, String>, String, String>() { 
      @Override 
      public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception { 
       return new Tuple2<>(record.key(), record.value()); 
      } 
     }); 

jPairDStream.foreachRDD(jPairRDD -> { 
     jPairRDD.foreach(rdd -> { 
      System.out.println("key="+rdd._1()+" value="+rdd._2()); 
     }); 
    }); 

jssc.start();    
jssc.awaitTermination(); 

您使用的是帶有kafka-console-producer.bat將產生消息,其中重點將是無效的命令。爲了產生與兩個鍵和值的對以下命令主題test使用消息。在控制檯輸入以逗號分隔鍵和值像key1,value1

kafka-console-producer.bat --property parse.key=true --property key.separator=, --broker-list localhost:9092 --topic test 

你的POM文件有不同的版本火花文物。確保對所有工件使用相同的版本。您需要以下依賴項來運行您的程序。

<dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_2.11</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.11</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
</dependencies> 
+0

我做了上面的更改,現在至少不會退出。但我得到'INFO AppInfoParser:卡夫卡版本:0.10.2.1 信息AppInfoParser:卡夫卡的commitid:e89bffd6b2eff799 '我認爲這意味着卡夫卡正在運行,但是當我寫的生產者終端東西,沒有什麼是在程序中發生的事情。 – driftking9987

+0

流的應用程序產生的日誌繼續,你必須滾動查看消息。嘗試在kafka-console-producer中放置更多的消息。我測試了你的代碼,它工作正常。 – abaghel

+0

它必須是我,誰是失去了一些東西,但我所做的一切,如上所述。我在生產者終端中添加了50條新消息,但仍未打印任何內容。 – driftking9987

3

我認爲日誌說,你所需要的一切:)

拋出:IllegalArgumentException:要求失敗:註冊號輸出操作,所以沒有執行

什麼是輸出操作?例如:

  • foreachRDD
  • 打印
  • saveAsHadoopFile
  • 等。更多你可以在這個文件link

您必須添加一些操作您的應用程序,例如保存stream.mapToPair變量,然後調用foreachRDD該變量或打印()顯示值

+0

你能告訴我應該是什麼stream.mapToPair'的'的數據類型,以及如何對此進行輸出操作? – driftking9987

0

爲您的代碼添加一個動作可以幫助

stream.print() 

有一個嘗試,祝你好運

+0

對不起,我已經試過了。它不起作用。 – driftking9987

相關問題