2016-12-09 73 views
-1

我有這種格式複利在星火

Date  | Return 
01/01/2015  0.0 
02/02/2015  -0.02 
03/02/2015  0.05 
04/02/2015  0.07 

我想這樣做配料,並添加將返回複合回報率一欄的數據幀。複合回報計算公式如下:

  • 1表示第一行。

  • (1 +回報(I))*複利(I-1))

所以我的DF最終將成爲Java的

Date   | Return | Compounded 
01/01/2015   0.0   1.0 
02/02/2015  -0.02  1.0*(1-0.2)=0.8 
03/02/2015   0.05  0.8*(1+0.05)=0.84 
04/02/2015   0.07  0.84*(1+0.07)=0.8988 

答案將得到高度讚賞。

+0

你的樣品是OK?在第二行中,Compounded的值不應該是0.98? –

+0

是。它應該是0.98。對不起,錯誤 –

回答

-1

您也可以創建自定義聚合函數並使用它在一個窗口函數中。

像這樣(寫自由所以有可能會出現一些錯誤):

package com.myuadfs 

import org.apache.spark.sql.Row 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 
import org.apache.spark.sql.types._ 

class MyUDAF() extends UserDefinedAggregateFunction { 

    def inputSchema: StructType = StructType(Array(StructField("Return", DoubleType))) 

    def bufferSchema = StructType(StructField("compounded", DoubleType)) 

    def dataType: DataType = DoubleType 

    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
    buffer(0) = 1.0 // set compounded to 1 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
    buffer(0) = buffer.getDouble(0) * (input.getDouble(0) + 1) 
    } 

    // this generally merges two aggregated buffers. This means this 
    // would not have worked properly had you been working with a regular 
    // aggregate but since you are planning to use this inside a window 
    // only this should not be called at all. 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0) 
} 

    def evaluate(buffer: Row) = { 
    buffer.getDouble(0) 
    } 
} 

現在,你可以在一個窗口函數內使用它。事情是這樣的:

import org.apache.spark.sql.Window 
val windowSpec = Window.orderBy("date") 
val newDF = df.withColumn("compounded", df("Return").over(windowSpec) 

注意這整個計算應裝在單個分區,所以如果你有太大的數據,你會遇到問題的限制。也就是說,名義上這種操作是在按鍵分區後執行的(例如,將分區添加到窗口中),然後單個元素應該是鍵的一部分。

+0

非常感謝。這段代碼幫了我很多! –

-1

首先,我們定義一個函數f(line)(提出一個更好的名字,請!)來處理線。

def f(line): 
    global firstLine 
    global last_compounded 
    if line[0] == 'Date': 
     firstLine = True 
     return (line[0], line[1], 'Compounded') 
    else: 
     firstLine = False 
    if firstLine: 
     last_compounded = 1 
     firstLine = False 
    else: 
     last_compounded = (1+float(line[1]))*last_compounded 
    return (line[0], line[1], last_compounded) 

使用兩個全局變量(可以改進?),我們保持混配(I-1)值,如果我們正在處理的第一道防線。

隨着SOME_FILE數據,一個解決方案可以是:

rdd = sc.textFile('some_file').map(lambda l: l.split()) 
r1 = rdd.map(lambda l: f(l)) 

rdd.collect()
[[u'Date 'u'Return'],[u'01/01/2015' ,u'0.0 '],[u'02/02/2015',U'0.02 '],[u'03/02/2015',u'0.05' ],[u'04/02/2015 'u'0.07 ']]
r1.collect()
[(u'Date',u'Return', '復配'),(u'01/01/2015' ,u'0.0' ,1.0),(U '02/02/2015' ,U'0.02' ,0.98),(U '03/02/2015',u'0.05',1.05),(u'04/02/2015',u'0.07',1.1235000000000002)]

+1

當你有多個(本地)執行器時,你確定這可以工作嗎?全球將在每個執行者不同... –

+0

@AssafMendelson,好點!回到計劃表! –

+0

我試着實現它,但是當我有很多天的時候它失敗了。也許是因爲Assaf的建議。 –