2016-11-16 48 views
5

我試圖設置一個Sparkstreaming代碼,它從Kafka服務器讀取行,但使用另一個本地文件中寫入的規則處理它。我對數據流和sparkContext其他應用所有其他火花功能創建的StreamingContext - 像字符串操作,讀本地文件等SparkContext和StreamingContext是否可以在同一個程序中共存?

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ReadLine") 
val ssc = new StreamingContext(sparkConf, Seconds(15)) 
ssc.checkpoint("checkpoint") 

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 
    val sentence = lines.toString 

    val conf = new SparkConf().setAppName("Bi Gram").setMaster("local[2]") 
    val sc = new SparkContext(conf) 
    val stringRDD = sc.parallelize(Array(sentence)) 

但這引發以下錯誤

Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: 
org.apache.spark.SparkContext.<init>(SparkContext.scala:82) 
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874) 
org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81) 

回答

11

一個應用程序只能有一個SparkContextStreamingContext創建於SparkContext。只需要使用SparkContext創建ssc StreamingContext

val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(15)) 

如果使用以下構造函數。

StreamingContext(conf: SparkConf, batchDuration: Duration) 

它在內部創建另一個SparkContext

this(StreamingContext.createNewSparkContext(conf), null, batchDuration) 

SparkContext可以從StreamingContext通過

ssc.sparkContext 
得到
相關問題