0
我有一個基本的火花流字數和它只是不工作。火花流不工作
import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(appName='streaming', master="local[*]")
scc = StreamingContext(sc, batchDuration=5)
lines = scc.socketTextStream("localhost", 9998)
words = lines.flatMap(lambda line: line.split())
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
counts.pprint()
print 'Listening'
scc.start()
scc.awaitTermination()
我在另一個終端上運行nc -lk 9998
,我粘貼了一些文本。它打印出典型的日誌(沒有例外),但它結束了排隊作業一些奇怪的時間(45歲),並不斷打印此...
15/06/19 18:53:30 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:874
15/06/19 18:53:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (PythonRDD[7] at RDD at PythonRDD.scala:43)
15/06/19 18:53:30 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
15/06/19 18:53:35 INFO JobScheduler: Added jobs for time 1434754415000 ms
15/06/19 18:53:40 INFO JobScheduler: Added jobs for time 1434754420000 ms
15/06/19 18:53:45 INFO JobScheduler: Added jobs for time 1434754425000 ms
...
...
我在做什麼錯?
謝謝。我認爲'本地[*]'會自動分配基於本地可用內核的執行程序? (說過,它解決了我的問題,但很好奇爲什麼'*'不起作用) –
從文檔中不清楚,但我認爲'local [*]'與'local'類似,只創建一個線程對於接收者而言並且執行者沒有線程。 –