2017-03-05 75 views
0

我必須在pyspark中實施pandas .apply(function,axis = 1)(以應用行明智函數)。由於我是新手,我不確定它是否可以通過映射函數或使用UDF來實現。我無法在任何地方找到任何類似的實施。按行排列在pyspark中的數據幀上的行明智操作

基本上我想要的是將行傳遞給函數,執行一些操作來創建依賴於當前行和先前行的值的新列,然後返回修改的行以創建新的數據框。 與熊貓使用的函數的一個下面給出:

previous = 1 
def row_operation(row): 
    global previous 
    if pd.isnull(row["PREV_COL_A"])==True or (row["COL_A"]) != (row["PREV_COL_A"]): 
     current = 1 
    elif row["COL_C"] > cutoff: 
     current = previous +1 
    elif row["COL_C"]<=cutoff: 
     current = previous 
    else: 
     current = Nan 
    previous = current 
    return current 

這裏PREV_COL_A無非是爲col_a由1排滯後。

請注意,此功能是最簡單的,但不會返回行,但其他人做。 如果任何人都可以指導我如何在pyspark中實施行操作,這將是一個很大的幫助。 TIA

回答

0

您可以使用rdd.mapPartition。它會給你一個遍歷行的迭代器,併產生你想要返回的結果行。你得到的迭代不會讓你索引向前或向後,只是返回下一行。但是,您可以在處理時保存行,以執行您需要執行的任何操作。例如

def my_cool_function(rows): 
    prev_rows = [] 

    for row in rows: 
     # Do some processing with all the rows, and return a result 
     yield my_new_row 

     if len(prev_rows) >= 2: 
      prev_rows = prev_rows[1:] 

     prev_rows.append(row) 

updated_rdd = rdd.mapPartitions(my_cool_function) 

請注意,我用一個列表來跟蹤例如着想的分區,但蟒蛇名單是真的不具備有效的頭推/ pop方法陣列,所以你可能會想使用一個實際的隊列。