2016-08-20 145 views
5

zipwithindex的equivelant假設我有以下數據框:星火:在數據幀

dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)] 
df = sc.parallelize(dummy_data).toDF(['letter','number']) 

,我想創建以下數據框:

[('a',0),('b',2),('c',1),('d',3),('e',0)] 

我要做的就是將其轉換爲rdd並使用zipWithIndex功能並加入結果後:

convertDF = (df.select('number') 
       .distinct() 
       .rdd 
       .zipWithIndex() 
       .map(lambda x:(x[0].number,x[1])) 
       .toDF(['old','new'])) 


finalDF = (df 
      .join(convertDF,df.number == convertDF.old) 
      .select(df.letter,convertDF.new)) 

是否在數據框中有與zipWIthIndex類似的功能?是否有另一種更有效的方法來完成這項任務?

+2

http://stackoverflow.com/q/32760888/1560062 – zero323

回答

0

請檢查https://issues.apache.org/jira/browse/SPARK-23074在數據框中的這種直接功能奇偶性。upvote jira如果您有興趣在Spark的某個時間點看到這一點。

這裏有一個解決辦法,雖然在PySpark:

def dfZipWithIndex (df, offset=1, colName="rowId"): 
    ''' 
     Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
     and preserves a schema 

     :param df: source dataframe 
     :param offset: adjustment to zipWithIndex()'s index 
     :param colName: name of the index column 
    ''' 

    new_schema = StructType(
        [StructField(colName,LongType(),True)]  # new added field in front 
        + df.schema.fields       # previous schema 
       ) 

    zipped_rdd = df.rdd.zipWithIndex() 

    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) 

    return spark.createDataFrame(new_rdd, new_schema) 

這也可以在abalon包。