2017-10-08 80 views
0

我基本上想要使用來自Kafka的數據並將其寫入HDFS。但發生的情況是,它不是在hdfs中編寫任何文件。它會創建空文件。來自Kafka的Spark流式傳輸和Avro格式的HDFS寫入

而且請指導我,如果我想寫在HDF格式的HDFS我如何修改代碼。

爲了簡單起見,我寫了本地C盤。

import org.apache.spark.SparkConf 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.SparkContext 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.KafkaUtils 
import 
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.kafka.common.serialization.StringDeserializer 

object KafkaStreaming extends App{ 
val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming") 
val conext = new SparkContext(conf) 
val ssc = new StreamingContext(conext, org.apache.spark.streaming.Milliseconds(1)) 
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> "group", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (true: java.lang.Boolean)) 
val topics = Array("topic") 
val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams)) 
val lines = stream.map(_.value) 
stream.foreachRDD(rdd => { 
    rdd.coalesce(1).saveAsTextFile("C:/data/spark/") 
}) 
ssc.start() 
ssc.awaitTermination()} 

而且下面是build.sbt

name := "spark-streaming" 
version := "1.0" 
scalaVersion := "2.11.8" 
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0" 
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0" 
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0- 
10_2.11" % "2.2.0" 
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.1" 
+0

它所需要的是一個過濾器,以檢查是否該批次是空.. stream.map(_。值).foreachRDD(RDD => { rdd.foreach(的println ) 如果){ rdd.saveAsTextFile( 「C:/數據/火花/」) }(rdd.isEmpty(!) }) 但我仍然面對的問題是,新一批覆蓋舊數據。我希望所有的數據被追加到文件中.. –

回答

0

下面點上運行您的卡夫卡消費者應用程序之前必須檢查:在卡夫卡或

  • 檢查數據可用不是

  • 更改auto.offset.resetearliest 這裏最早的意思是kafka自動將偏移重置爲最早的偏移量。

  • 啓動Kafka控制檯生產者應用程序並開始鍵入一些消息。然後啓動卡夫卡消費者代碼,再次在卡夫卡控制檯製作者上輸入一些消息,然後檢查消息是否打印到消費者控制檯。

您可以使用下面的代碼

spark.write.avro("<path>") 

行寫Avro的格式輸出我希望這將有助於你

+0

嗨謝謝你是卡夫卡正在運行,消息在卡夫卡。感謝您爲avro提供的輸入 –

1

HDFS中不寫任何文件。它會創建空文件。

請檢查這裏怎麼調試

Unable to see messages from Kafka Stream in Spark

請指導我,如果我想在Avro的格式寫在HDFS

https://github.com/sryza/simplesparkavroapp

package com.cloudera.sparkavro 

import org.apache.avro.mapred.AvroKey 
import org.apache.avro.mapreduce.{AvroJob, AvroKeyOutputFormat} 
import org.apache.hadoop.fs.Path 
import org.apache.hadoop.io.NullWritable 
import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.SparkContext._ 

object SparkSpecificAvroWriter { 
    def main(args: Array[String]) { 
    val outPath = args(0) 

    val sparkConf = new SparkConf().setAppName("Spark Avro") 
    MyKryoRegistrator.register(sparkConf) 
    val sc = new SparkContext(sparkConf) 

    val user1 = new User("Alyssa", 256, null) 
    val user2 = new User("Ben", 7, "red") 

    val records = sc.parallelize(Array(user1, user2)) 
    val withValues = records.map((x) => (new AvroKey(x), NullWritable.get)) 

    val conf = new Job() 
    FileOutputFormat.setOutputPath(conf, new Path(outPath)) 
    val schema = User.SCHEMA$ 
    AvroJob.setOutputKeySchema(conf, schema) 
    conf.setOutputFormatClass(classOf[AvroKeyOutputFormat[User]]) 
    withValues.saveAsNewAPIHadoopDataset(conf.getConfiguration) 
    } 
} 
+0

Kafka中存在消息,問題是當新批處理覆蓋現有數據時。現在我添加了空批次的過濾器。所以現在我的零件文件不再是空的了。但是,當它回顧一個新的批處理時,它會覆蓋舊的內容。你能幫我如何追加文件。以供參考我的代碼在git中心:https://github.com/Viyaan/spark-kafka-hdfs/blob/master/src/main/scala/com/spark/streaming/KafkaStreaming.scala –

+0

嘗試使用kafdrop檢查卡夫卡消息。這可以幫助你從Kafka的最後調試它。 –

1

查看您的代碼,您可以簡單地將當前時間戳添加到您正在編寫的文件中。

這應該解決您的問題。 :)

==========

如果你想所有的文件追加到一個文件,那麼你可以使用dataframes如下:

我不會推薦使用追加因爲這個文件系統的設計方式在HDFS中。但這裏是你可以嘗試的。

  1. 從您的RDD
  2. 創建一個數據框使用數據框的節省模式(「追加」),然後寫入文件。

e.g:

VAL數據幀= youRdd.toDF(); dataframe.write()。mode(SaveMode.Append).format(FILE_FORMAT).. save(path);

看看是否有幫助

+0

是的,但會創建很多文件,我只想要一個文件。謝謝你的建議你 –

+0

已經更新了上面的答案。看看是否有助於你的用例 –