2015-11-25 102 views
1

我使用SparkIpython和打印時有RDD其中包含這種格式的數據:在PySpark找到數據的最小和最大日期在RDD

print rdd1.collect()

[u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00',....] 

每個數據是datetimestamp,我想在此RDD中找到最小值和最大值。我怎樣才能做到這一點?

回答

5

比如,你可以使用aggregate功能(它的工作原理看到一個解釋:What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?

from datetime import datetime  

rdd = sc.parallelize([ 
    u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00']) 

def seq_op(acc, x): 
    """ Given a tuple (min-so-far, max-so-far) and a date string 
    return a tuple (min-including-current, max-including-current) 
    """ 
    d = datetime.strptime(x, '%Y-%m-%d %H:%M:%S') 
    return (min(d, acc[0]), max(d, acc[1])) 

def comb_op(acc1, acc2): 
    """ Given a pair of tuples (min-so-far, max-so-far) 
    return a tuple (min-of-mins, max-of-maxs) 
    """ 
    return (min(acc1[0], acc2[0]), max(acc1[1], acc2[1])) 

# (initial-min <- max-date, initial-max <- min-date) 
rdd.aggregate((datetime.max, datetime.min), seq_op, comb_op) 

## (datetime.datetime(2010, 12, 8, 0, 0), datetime.datetime(2012, 5, 13, 0, 0)) 

DataFrames

from pyspark.sql import Row 
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max 

row = Row("ts") 
df = rdd.map(row).toDF() 

df.withColumn("ts", unix_timestamp("ts")).agg(
    from_unixtime(min("ts")).alias("min_ts"), 
    from_unixtime(max("ts")).alias("max_ts") 
).show() 

## +-------------------+-------------------+ 
## |    min_ts|    max_ts| 
## +-------------------+-------------------+ 
## |2010-12-08 00:00:00|2012-05-13 00:00:00| 
## +-------------------+-------------------+ 
+0

是有可能,如果你能提到有關代碼是如何工作的一些意見(這兩個函數特別是),使我能夠正確地理解它? –

+0

http://stackoverflow.com/a/31082341/1560062 – zero323

2

如果RDD由datetime對象的,什麼是錯的只需使用

rdd1.min() 
rdd1.max() 

documentation

這個例子對我的作品

rdd = sc.parallelize([u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00']) 
from datetime import datetime 
rddT = rdd.map(lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S")).cache() 
print rddT.min() 
print rddT.max()