2017-07-17 90 views
1

火花流應用不打印簡單的語句到driver's stdout,在這裏,我想打印一些說法,只是轉化dstream_2之後,但它纔剛剛印刷OFR首批唯一。我預計它將被打印爲每批執行。不能以星火打印流應用

val sparkConf = new SparkConf().setMaster("yarn-cluster") 
           .setAppName("SparkJob") 
           .set("spark.executor.memory","2G") 
           .set("spark.dynamicAllocation.executorIdleTimeout","5") 


val streamingContext = new StreamingContext(sparkConf, Minutes(1)) 

var historyRdd: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD 

var historyRdd_2: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD 


val stream_1 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_1)) 
val dstream_2 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_2)) 


val dstream_2 = stream_2.map((r: Tuple2[String, GenericData.Record]) => 
{ 
    //some mapping 
} 
//Not Working 
print("Printing Test") 
val historyDStream = dstream_1.transform(rdd => rdd.union(historyRdd)) 
dstream_2.foreachRDD(r => r.repartition(500)) 
val historyDStream_2 = dstream_2.transform(rdd => rdd.union(historyRdd_2)) 
val fullJoinResult = historyDStream.fullOuterJoin(historyDStream_2) 

val filtered = fullJoinResult.filter(r => r._2._1.isEmpty) 


filtered.foreachRDD{rdd => 

    val formatted = rdd.map(r => (r._1 , r._2._2.get)) 

    historyRdd_2.unpersist(false) // unpersist the 'old' history RDD 
    historyRdd_2 = formatted // assign the new history 
    historyRdd_2.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation 
} 


val filteredStream = fullJoinResult.filter(r => r._2._2.isEmpty) 


filteredStream.foreachRDD{rdd => 
    val formatted = rdd.map(r => (r._1 , r._2._1.get)) 
    historyRdd.unpersist(false) // unpersist the 'old' history RDD 
    historyRdd = formatted // assign the new history 
    historyRdd.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation 
} 
streamingContext.start() 
streamingContext.awaitTermination() 

} }

回答

1
print("Printing Test")

在該位置上的程序首先計算時將只獲得打印一次。 要添加每批間隔一些控制檯輸出,我們需要把I/O操作的輸出經營範圍:

這將讓每打印一次:

dstream2.foreachRDD{ _ -> print("Printing Test") } 
+0

這是同爲測井(SLF4J)? – JSR29

+0

您是否也可以提供此打印行爲的原因。 – JSR29

+0

對於日誌語句應該是相同的。 Re行爲:spark流在dstreams上運行。超出dstream操作範圍的任何內容都將被評估爲程序中的任何正常代碼。 Thr無意識的事情是,dstreams上的操作只在程序中聲明。實際執行發生在火花流調度器中。 – maasg