2015-07-28 38 views
3

使用pyspark從室壁運動消耗數據後,我有這樣的條目一個DSTREAM:將數據寫入鑲

('filename_1', [{'name': 'test'}, {'name': 'more'}, {'name': 'other'}]) 
('filename_2', [{'age': 15}, {'age': 25}]) 

我想現在要做的是元組的第二部分寫由元組的第一部分標識的位置。

在其他地方,我做了,通過使用字典的每個列表轉換成數據幀:

dataframe = sqlContext.createDataFrame(list_of_dicts) 

,寫它的東西,如:

dataframe.write.parquet('filename') 

我現在的問題是如何把每在dstream中放入一個DataFrame中。我的直覺是使用地圖來獲得每一行,並進行轉換。這需要你其實可以不通過一個地圖功能的sqlContext,因爲它失敗,此錯誤:

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063 

我不是絕對依賴於實木複合地板,但我需要某種架構(因此繞道到DataFrame)。有沒有辦法做到這一點與火花?

回答

0

您可以在foreach方法內創建SqlContext的新實例。

words.foreachRDD(
    new Function2<JavaRDD<String>, Time, Void>() { 
    @Override 
    public Void call(JavaRDD<String> rdd, Time time) { 
     SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context()); 

要了解更多詳情,你可以通過這個link

+0

據我所知,每個RDD可能仍然由多行組成(至少這就是爲什麼我認爲在鏈接的例子中稍後有地圖),所以我仍然無法將一個完整的RDD放入一個數據框因爲每一行都應該是它自己的數據框。 – DoHe

+0

你不能在地圖內使用sqlContext。它只適用於foreachRdd函數。你有沒有經過這個鏈接? – Kaushal

+0

是的,這正是問題所在。每個RDD可能仍然包含多行,但是由於每行應該成爲一個DataFrame,所以我最終得到了如下結構: 'name for name:' 'filtered = rdd.filter(lambda name_record:filter_by_name (姓名,name_record))' 'DF = sqlc.createDataFrame(記錄)' 'df.write.parquet(OUT_PATH +名,模式= '追加')' 哪個是不是最佳的。 – DoHe