2017-06-07 24 views
0

我從磁盤星火重複數據刪除技術,以獲得更大的RDD

df_ = sqlContext.read.json("/Users/spark_stats/test.json") 

它包含50萬行加載的數據幀。
我的腳本正常工作在這個大小,但我想測試它,例如5M行,有沒有辦法重複DF 9次? (沒關係,讓我有在DF式兩份)

我已經使用了工會,但它實在是太慢了(因爲我認爲它使從磁盤每次讀取)

df = df_ 
for i in range(9): 
    df = df.union(df_) 

你有一個清潔的方式來做到這一點的想法?

謝謝

+0

使用.cache()一次從數據源讀取。 – StackPointer

+0

謝謝,謝謝,工作得很好 –

回答

0

您可以使用爆炸。它應該只從原始磁盤讀取一次:

from pyspark.sql.types import * 
from pyspark.sql.functions import * 

schema = StructType([StructField("f1", StringType()), StructField("f2", StringType())]) 

data = [("a", "b"), ("c", "d")] 
rdd = sc.parallelize(data) 
df = sqlContext.createDataFrame(rdd, schema) 

# Create an array with as many values as times you want to duplicate the rows 
dups_array = [lit(i) for i in xrange(9)] 
duplicated = df.withColumn("duplicate", array(*dups_array)) \ 
       .withColumn("duplicate", explode("duplicate")) \ 
       .drop("duplicate")