2017-04-06 67 views
1

我喜歡在spark中將數據框寫入dynamodb。 因此我使用rdd.saveAsHadoopDataset(JobConf)。但rdd類型不匹配。它需要hadoopRDD類型的rdd。因此我喜歡將數據幀轉換爲rdd.I使用了df.rdd,它給出了rdd而不是hadoopRDD。我正在使用spark-scala API。如果有更好的方法將Dataframe寫入Dyanmodb的火花,這將有所幫助。如何將Spark中的DataFrame轉換爲HadoopRDD

回答

1

你不需要轉換你的RDD。

因爲Hadoop API是圍繞鍵值對構建的,所以Spark圍繞RDD自動包裝PairRDDFunctions(它增加了額外的功能),其中數據存儲在Tuple2對象中。所以你只需要把你的數據放入RDD[(T,V)],那麼你將有saveAsHadoopDataset方法可用。

下面是一個例子:

import org.apache.hadoop.mapred.JobConf 
val tupleRDD : RDD[(Int, Int)] = sc.parallelize(Array((1,2), (3,4), (5,6))) 
val jobConf = new JobConf() 

設置任何需要的設定。

tupleRDD.saveAsHadoopDataset(jobConf) 
+0

謝謝你的答案。你有什麼Spark-Scala的例子上面。其實我是新手火花,所以它會有所幫助。提前感謝。 – Yogesh

+0

增加了一個什麼樣的RDD可以工作的例子。 – jamborta

0

如果有人正在尋找從spark-scala到dyanmodb的數據框。然後下面可能會有所幫助。

import com.amazonaws.services.dynamodbv2.document.Item 
import com.amazonaws.services.dynamodbv2.document.DynamoDB 

var json_arr=df.toJSON.collect() //Convert dataframe to json array 
val table = dynamoDB.getTable("table_name") //dynamoDB is connection to dynamodb 
for (element <- json_arr) { 
     val item = Item.fromJSON(element) 
     table.putItem(item) 
    } 
+0

dynamoDB是如何實例化的? –

相關問題