PySpark有什麼具體的方式來綁定兩個數據框,因爲我們在c中綁定了嗎?PySpark列明智綁定
實施例:
- 數據幀1具有10列
- 數據幀2具有1列
我需要cbind兩個數據幀,併爲在PySpark一個數據幀。
PySpark有什麼具體的方式來綁定兩個數據框,因爲我們在c中綁定了嗎?PySpark列明智綁定
實施例:
我需要cbind兩個數據幀,併爲在PySpark一個數據幀。
首先,讓我們創建dataframes:
df1 = spark.createDataFrame(sc.parallelize([10*[c] for c in range(10)]), ["c"+ str(i) for i in range(10)])
df2 = spark.createDataFrame(sc.parallelize([[c] for c in range(10, 20, 1)]), ["c10"])
+---+---+---+---+---+---+---+---+---+---+
| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|
+---+---+---+---+---+---+---+---+---+---+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2|
| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3|
| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4|
| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5|
| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6|
| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7|
| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8|
| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9|
+---+---+---+---+---+---+---+---+---+---+
+---+
|c10|
+---+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
然後我們想唯一標識行,有一個RDD
功能,可以做到這zipWithIndex
from pyspark.sql.types import LongType
from pyspark.sql import Row
def zipindexdf(df):
schema_new = df.schema.add("index", LongType(), False)
return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
df1_index = zipindexdf(df1)
df1_index.show()
df2_index = zipindexdf(df2)
df2_index.show()
+---+---+---+---+---+---+---+---+---+---+-----+
| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|index|
+---+---+---+---+---+---+---+---+---+---+-----+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2|
| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3|
| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4|
| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5|
| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6|
| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7|
| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8|
| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9|
+---+---+---+---+---+---+---+---+---+---+-----+
+---+-----+
|c10|index|
+---+-----+
| 10| 0|
| 11| 1|
| 12| 2|
| 13| 3|
| 14| 4|
| 15| 5|
| 16| 6|
| 17| 7|
| 18| 8|
| 19| 9|
+---+-----+
最後,我們可以加入他們的行列:
df = df1_index.join(df2_index, "index", "inner")
+-----+---+---+---+---+---+---+---+---+---+---+---+
|index| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|
+-----+---+---+---+---+---+---+---+---+---+---+---+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 10|
| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 17|
| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 16|
| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 19|
| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 15|
| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 11|
| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 13|
| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 18|
| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 12|
| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 14|
+-----+---+---+---+---+---+---+---+---+---+---+---+
要獲取ID爲單調增加的列,請在每個DataFrame上使用以下代碼,其中colName
是要按每個DataFrame排序的列名。
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df = df\
.withColumn('int', F.lit(1))\
.withColumn('consec_id', F.sum('int').over(window))\
.drop('int')\
要檢查一切都排隊正確,使用下面的代碼來看看尾或數據幀的最後rownums
。
rownums = 10
df.where(F.col('consec_id')>df.count()-rownums).show()
使用下列代碼行把目光從start_row
到數據幀的end_row
。
start_row = 20
end_row = 30
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
的作品的另一種方法是RDD方法zipWithIndex()
。簡單地修改現有的數據幀具有連續的ID使用這種RDD方法的一列,I:
zipWithIndex()
方法,zipWithIndex()
創建的整數的ID列。我也嘗試過修改原始DataFrame的方法,該索引列包含類似於@MaFF所做的zipWithIndex()
的輸出,但結果更慢。窗函數比這兩者中的任何一個都快一個數量級。大部分時間增加似乎是將DataFrame轉換爲RDD並再次返回。
請讓我知道是否有更快的方式將zipWithIndex()
RDD方法的輸出添加爲原始DataFrame中的列。
對42,000行90列DataFrame進行測試得出以下結果。
import time
def test_zip(df):
startTime = time.time()
df_1 = df \
.rdd.zipWithIndex().toDF() \
.rdd.map(lambda row: (row._1) + (row._2,)) \
.toDF(df_all_indexed.columns +['consec_id'])
start_row = 20000
end_row = 20010
df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip(df) for _ in range(5)]
['590.813秒, '390.574秒, '360.074秒, '350.436秒, '350.636秒 ']
import time
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
def test_win(df):
startTime = time.time()
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df_2 = df \
.withColumn('int', F.lit(1)) \
.withColumn('IDcol', F.sum('int').over(window)) \
.drop('int')
start_row = 20000
end_row = 20010
df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_win(df) for _ in range(5)]
[' 4.19秒, '4.508秒', '4.099秒', '4.012秒', '4.045秒']
import time
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as T
def test_zip2(df):
startTime = time.time()
schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)])
df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
start_row = 20000
end_row = 20010
df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip2(testdf) for _ in range(5)]
['820.795秒, '610.689秒, '580.181秒, '580.01秒, '570.765秒]
非常感謝。它的做工精細:) –
這不適用於可能存儲在不同分區中的兩個單獨的大型DataFrame,並且每個DataFrame在不同行上的分區之間分隔開來。從[文檔](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id)***「當前實現將分區ID在高31位,並在每個分區內的記錄號在低33位「*** – Clay
你是對的,我不能相信我寫了... ...'MonotonicallyIncreasingID'的計數有一個不同的每個任務的起源 – MaFF