2016-07-25 30 views
0

我的pyspark過程的輸出部分大小不均勻,但可預測具有n ** 2模式(0,1,2,4,8,16等)。這是我的過程:pyspark不均勻地分配負載,增加雙倍尺寸的零件

我從谷歌的BigQuery加載數據是這樣的:

dConf = { 
    "mapred.bq.project.id": project_id, 
    "mapred.bq.gcs.bucket": bucket, 
    "mapred.bq.input.project.id": project_id, 
    "mapred.bq.input.dataset.id":dataset_id, 
    "mapred.bq.input.table.id": table_id 
} 

rdd_dataset_raw = sc.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", 
    "org.apache.hadoop.io.LongWritable", 
    "com.google.gson.JsonObject", 
    conf=dConf 
) 

,其輸出看起來像這樣(rdd_dataset_raw.take(2)):

[(0, u'{"group_id":"1","pertubations":"Current Affairs,Sport,Technology"}'), 
(67, u'{"group_id":"2","pertubations":"Current Affairs,Sport,Celeb Gossip"}')] 

一些瑣碎的處理,以重新劃分:

rdd_dataset = (
    rdd_dataset_raw 
    .repartition(nr_partitions) 
    .map(lambda t, json=json: json.loads(t[1])) 
) 

看起來像這樣:

[{u'group_id': u'1', u'pertubations': u'Current Affairs,Sport,Technology'}, 
{u'group_id': u'2', u'pertubations': u'Current Affairs,Sport,Celeb Gossip'}] 

當我保存RDD到谷歌存儲有:

rdd_dataset_raw.saveAsTextFile("gs://bucket/directory") 

這將創建nr_partitions部分文件。

但是,這些零件文件大小不均。它們在n**2中增加,其中n是零件文件號。換句話說,

part-00000包含0線
part-00001含有1線
part-00002包含2行
part-00003包含4行
part-00004包含8行

大多數這些也幾乎完成立即,後面的部分用完了內存。

這是怎麼回事!?如何使分區承擔均勻負載?

回答

0

這是與partitionBy更換repartition簡單:

rdd_dataset = (
    rdd_dataset_raw 
    .partitionBy(nr_partitions) 
    .map(lambda t, json=json: json.loads(t[1])) 
) 

注意這需要儘可能早地完成。傳遞一個未分區的rdd,然後再分區。

Docs