2

將Flume代理收集的Twitter數據傳遞給Spark Stream時,我遇到了麻煩。我只能使用Flume來獨立下載twits。但我得到以下錯誤。我覺得這是FlumeUtils.createStream()中默認的UTF-8編碼問題。我該如何改變它?我應該改變什麼?UTF-8編碼錯誤,同時連接Flume twitter流以在python中產生火花

錯誤pyspark終端上:

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main 
    process() 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/flume.py", line 107, in func 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/flume.py", line 36, in utf8_decoder 
    return s.decode('utf-8') 
    File "/usr/lib/python2.7/encodings/utf_8.py", line 16, in decode 
    return codecs.utf_8_decode(input, errors, True) 
UnicodeDecodeError: 'utf8' codec can't decode byte 0xe4 in position 17: invalid continuation byte 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
17/01/01 15:36:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 

PySpark代碼:

from pyspark.sql import SparkSession 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.flume import FlumeUtils 

ss = SparkSession.builder \ 
    .master("local[2]") \ 
    .appName("Stream_Analysis")\ 
    .config("spark.sql.crossJoin.enabled", "true") \ 
    .getOrCreate() 

sc = ss.sparkContext 

strm = StreamingContext(sc, 5) 

flume = FlumeUtils.createStream(strm,"localhost", 9999) 
flume.pprint() 
strm.start() 
strm.awaitTermination() 

Cmd的啓動pyspark

spark-submit --jars ~/project/spark-streaming-flume-assembly_2.11-2.0.2.jar ~/project/news_stream_flume/news_stream_analysis.py localhost 9999 

水槽CONF:

# Name the components on this agent 
FlumeAgent.sources = Twitter 
FlumeAgent.sinks = spark 
FlumeAgent.channels = MemChannel 

# Twitter source 
FlumeAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource 
FlumeAgent.sources.Twitter.consumerKey = x 
FlumeAgent.sources.Twitter.consumerSecret = y 
FlumeAgent.sources.Twitter.accessToken = z 
FlumeAgent.sources.Twitter.accessTokenSecret = xx 
FlumeAgent.sources.Twitter.keywords = flume, spark 

FlumeAgent.sinks.spark.type = avro 
FlumeAgent.sinks.spark.channel = memoryChannel 
FlumeAgent.sinks.spark.hostname = localhost 
FlumeAgent.sinks.spark.port = 9999 
FlumeAgent.sinks.spark.batch-size = 1 

# Use a channel which buffers events in memory 
FlumeAgent.channels.MemChannel.type = memory 
FlumeAgent.channels.MemChannel.capacity = 10000 
FlumeAgent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
FlumeAgent.sources.Twitter.channels = MemChannel 
FlumeAgent.sinks.spark.channel = MemChannel 

Cmd的運行水槽劑:

flume-ng agent --name FlumeAgent --conf-file /home/hduser/project/flume_config_2src_spark_avro -f /usr/lib/flume-ng/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console 

回答

1

FlumeUtils.createStream需要bodyDecoder參數,它是用於字符串解碼的功能。默認實現只檢查Nonedecodes爲UTF-8:

def utf8_decoder(s): 
    """ Decode the unicode as UTF-8 """ 
    if s is None: 
     return None 
    return s.decode('utf-8') 
  • 在Python 2.x的,你應該能夠用自己的來取代它,它使用所需的編碼,甚至跳過帶有身份完全解碼(lambda x: x)。

  • 的Python 3.x的,可能需要一些額外的步驟(JVM側映射與_.getBytes)繞過String - 在地幔巖>unicode映射。