2017-02-24 33 views
1

我使用的火花流和正在創建從卡夫卡消息此數據框:創建數據框出兩個不同陣列的火花

|customer|initialLoadComplete|initialLoadRunning|  messageContent|  tableName| 
+--------+-------------------+------------------+--------------------+-----------------+ 
| A|    false|    true|TEFault_IdReason...|Timed_Event_Fault| 
| A|    false|    true|TEFault_IdReason...|Timed_Event_Fault| 
+--------+-------------------+------------------+--------------------+-----------------+ 

現在我想提取出在messageContent,在messageContent基本上就像一個CSV,其中包括原始數據,第一行是列。 我可以通過以下方式從messageContent字段中提取出標題。

val Array1 = ssc.sparkContext.parallelize(rowD.getString(2).split("\u0002")(0)) 

所以數組1看起來像這樣:

Array1: col1^Acol2^Acol3 

ARRAY2基本上是原始數據,通過^ A和記錄由^ B分隔分隔各列的值。

^A是一個列分隔符。^B是記錄分隔符

所以這是數組2可能是什麼樣子:

Array2 = value1^Avalue2^Avalue3^Bvalue4^Avalue5^Avalue6^Bvalue7^Avalue8^Avalue9 

基本上我想創建一個數據框出這個,所以它看起來是這樣的:

col1 | col2 | col3 
------------------------- 
value1 | value2 | value3 
value4 | value5 | value6 
value7 | value8 | value9 

^B是記錄分隔符。

當我們從一個HDFS文件中讀取數據,我們創建通過這個命令一個數據幀:

val df = csc.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", "\u0001").load(hdfsFile) 

但是這一次我創建從存儲兩個數組一個數據幀。 Array1是array2中值的標題,而array2是^ B分開的記錄。

什麼能在這種方法中創建一個數據幀像我一樣創建從文件中數據幀的等價物。

+0

因此Array1和Array2只有一個類型爲String的元素,其分隔符爲^ A和^ B? –

+0

它是一個並行的收集,更新我的問題 – Ahmed

+0

我只需要知道在數組1和數組2的每個項目看起來像..所以我們可以提供解決方案,而無需任何假設.. –

回答

1

我推斷從你的問題如下。

數組1是隻有一個條目col1^Acol2^Acol3

的RDD ARRAY2是每個條目看起來像這樣一個RDD。 value1^Avalue2^Avalue3^Bvalue4^Avalue5^Avalue6^Bvalue7^Avalue8^Avalue9

有了這些假設下面應該工作。

val array1 = sc.parallelize(Seq("col1\u0002col2\u0002col3")) 
val array2 = sc.parallelize(Seq("value1\u0001value2\u0001value3\u0002value4\u0001value5\u0001value6\u0002value7\u0001value8\u0001value9")) 
val data = array2.flatMap(x => x.split("\u0002")).map(x => x.split('\u0001')).collect() 

val result = array2 
       .flatMap(x => x.split("\u0002")) 
       .map(x => x.split('\u0001')) 
       .map({ case Array(x,y,z) => (x,y,z)}) 
       .toDF(array1.flatMap(x => x.split('\u0002')).collect(): _*) 

result.show() 
+------+------+------+ 
| col1| col2| col3| 
+------+------+------+ 
|value1|value2|value3| 
|value4|value5|value6| 
|value7|value8|value9| 
+------+------+------+ 
+1

嗨,是的,我能夠自己實現一個類似的解決方案,並得到它的工作。謝謝 – Ahmed

+0

您的歡迎! –

相關問題