2015-06-19 186 views
0

我有一個基本的火花流字數和它只是不工作。火花流不工作

import sys 
from pyspark import SparkConf, SparkContext 
from pyspark.streaming import StreamingContext 

sc = SparkContext(appName='streaming', master="local[*]") 
scc = StreamingContext(sc, batchDuration=5) 

lines = scc.socketTextStream("localhost", 9998) 
words = lines.flatMap(lambda line: line.split()) 
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y) 

counts.pprint() 

print 'Listening' 
scc.start() 
scc.awaitTermination() 

我在另一個終端上運行nc -lk 9998,我粘貼了一些文本。它打印出典型的日誌(沒有例外),但它結束了排隊作業一些奇怪的時間(45歲),並不斷打印此...

15/06/19 18:53:30 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:874 
15/06/19 18:53:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (PythonRDD[7] at RDD at PythonRDD.scala:43) 
15/06/19 18:53:30 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 
15/06/19 18:53:35 INFO JobScheduler: Added jobs for time 1434754415000 ms 
15/06/19 18:53:40 INFO JobScheduler: Added jobs for time 1434754420000 ms 
15/06/19 18:53:45 INFO JobScheduler: Added jobs for time 1434754425000 ms 
... 
... 

我在做什麼錯?

回答

1

Spark Streaming需要多個執行程序才能工作。嘗試使用本地[4]作爲主人。

+1

謝謝。我認爲'本地[*]'會自動分配基於本地可用內核的執行程序? (說過,它解決了我的問題,但很好奇爲什麼'*'不起作用) –

+0

從文檔中不清楚,但我認爲'local [*]'與'local'類似,只創建一個線程對於接收者而言並且執行者沒有線程。 –