2017-03-03 51 views
4

如何將Spark Python中的時間戳數據雙向轉換爲Pandas並返回?我從Spark的Hive表中讀取數據,想要在Pandas中進行一些計算,並將結果寫回Hive。只有最後一部分失敗,將Pandas時間戳轉換回Spark DataFrame時間戳。從Spark Python到Pandas的往返時間戳和返回

import datetime 
import pandas as pd 

dates = [ 
    ('today', '2017-03-03 11:30:00') 
    , ('tomorrow', '2017-03-04 08:00:00') 
    , ('next Thursday', '2017-03-09 20:00:00') 
] 
string_date_rdd = sc.parallelize(dates) 
timestamp_date_rdd = string_date_rdd.map(lambda t: (t[0], datetime.datetime.strptime(t[1], "%Y-%m-%d %H:%M:%S'))) 
timestamp_df = sqlContext.createDataFrame(timestamp_date_rdd, ['Day', 'Date']) 
timestamp_pandas_df = timestamp_df.toPandas() 
roundtrip_df = sqlContext.createDataFrame(timestamp_pandas_df) 
roundtrip_df.printSchema() 
roundtrip_df.show() 

root 
|-- Day: string (nullable = true) 
|-- Date: long (nullable = true) 

+-------------+-------------------+ 
|   Day|    Date| 
+-------------+-------------------+ 
|  today|1488540600000000000| 
|  tomorrow|1488614400000000000| 
|next Thursday|1489089600000000000| 
+-------------+-------------------+ 

此時,往返Spark DataFrame的日期列的數據類型爲long。在Pyspark中,可以輕鬆地將其轉換回日期時間對象,例如datetime.datetime.fromtimestamp(148908960000000000/1000000000),儘管一天中的時間已關閉幾個小時。我如何做到這一點來轉換Spark DataFrame的數據類型?

的Python 3.4.5,1.6.0星火

感謝, 約翰

回答

1

轉換的datetime64列到Python datetime對象爲我工作。

from pandas import Series 

def convert_to_python_datetime(df): 
    df_copy = df.copy() 
    for column_name, column in df_copy.iteritems(): 
     if column.dtype.kind == 'M': 
      df_copy[column_name] = Series(column.dt.to_pydatetime(), dtype=object) 
    return df_copy 


tmp = convert_to_python_datetime(timestamp_pandas_df) 
roundtrip_df = sqlContext.createDataFrame(tmp) 
roundtrip_df.printSchema() 
roundtrip_df.show() 

輸出:

root 
|-- Day: string (nullable = true) 
|-- Date: timestamp (nullable = true) 

+-------------+--------------------+ 
|   Day|    Date| 
+-------------+--------------------+ 
|  today|2017-03-03 11:30:...| 
|  tomorrow|2017-03-04 08:00:...| 
|next Thursday|2017-03-09 20:00:...| 
+-------------+--------------------+ 
+0

當我嘗試這個roundtrip_df.Date列的數據類型返回'結構'而不是'時間戳'。 –

+0

你使用的是什麼版本的熊貓? – innohead

+0

0.19.1,根據pd .__ version__ –

1

這裏有一個解決方案,我發現:

from pyspark.sql.types import TimestampType 
extra_column_df = roundtrip_df.select(roundtrip_df.Day, roundtrip_df.Date).withColumn('new_date', roundtrip_df.Date/1000000000) 
roundtrip_timestamp_df = extra_column_df.select(extra_column_df.Day, extra_column_df.new_date.cast(TimestampType()).alias('Date') 

輸出:

root 
|-- Day: string (nullable = true) 
|-- Date: timestamp (nullable = true) 

+-------------+--------------------+ 
|  Day |    Date| 
+-------------+--------------------+ 
|  today|2017-03-03 11:30:...| 
|  tomorrow|2017-03-04 08:00:...| 
|next Thursday|2017-03-09 20:00:...| 
+-------------+--------------------+ 

作爲一個額外的錯誤或功能,這似乎是所有轉換UTC的日期,包括DST awaren ESS。

相關問題