2016-08-30 100 views
0

我使用的是Pyspark dataframe運行下面的表達式:麻煩與星火據幀GROUPBY

md = data.filter(data['cluster_id'].like('cluster30')) \ 
       .select(
        udf_make_date(
         fn.year(data['request_timestamp']), 
         fn.month(data['request_timestamp']), 
         fn.dayofmonth(data['request_timestamp']) 
        ), 
        who_assigned, 
        fn.hour(data['request_timestamp']).alias('request_hour'), 
        fn.date_format(
         data['request_timestamp'], 
         'F').alias('request_day_of_week'), 
        fn.lit(data.count()).alias('num_requests'), 
        fn.countDistinct(data['user_id']).alias('num_users'), 
        fn.avg(data['microseconds']).alias(
         'avg_response_time_microseconds')) \ 
       .groupBy(
        udf_make_date(
         fn.year(data['request_timestamp']), 
         fn.month(data['request_timestamp']), 
         fn.dayofmonth(data['request_timestamp']) 
        ), 
        who_assigned, 
        fn.hour(data['request_timestamp']), 
        fn.date_format(
         data['request_timestamp'], 
         'F') 
      ) 

和我收到以下錯誤:

pyspark.sql.utils.AnalysisException: "expression '`request_timestamp`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;". 

至於我可以告訴大家,我應該是包括我需要的groupBy中的所有內容......我在寫這個以反映我的SQL查詢的結構,其結構大致如下:

SELECT 
MAKE_DATE(YEAR(request_timestamp), MONTH(request_timestamp), DAYOFMONTH(request_timestamp)), 
CASE 
    lots of case logic here... 
HOUR(request_timestamp) AS request_hour, 
DATE_FORMAT(request_timestamp, 'F') request_day_of_week, 
COUNT(*) as num_requests, 
COUNT(DISTINCT user_id) num_users, 
AVG(microseconds) AS avg_response_time_microseconds 
FROM 
(SELECT * 
FROM {table} 
WHERE cluster_id LIKE 'cluster30') 
GROUP BY 
MAKE_DATE(YEAR(request_timestamp), MONTH(request_timestamp), DAYOFMONTH(request_timestamp)), 
CASE 
    lots of case logic here... 
HOUR(request_timestamp), 
DATE_FORMAT(request_timestamp, 'F') 

回答

2

在Spark中,groupBy出現在聚合之前。此外,在結果DataFrame中選擇groupBy函數中的每一列。對於您的查詢,SparkSQL API中的等效內容類似於:

data \ 
    .filter(data['cluster_id'].like('cluster30')) \ 
    .groupBy(
     udf_make_date(
      fn.year(data['request_timestamp']), 
      fn.month(data['request_timestamp']), 
      fn.dayofmonth(data['request_timestamp']) 
     ).alias('request_date'), 
     who_assigned, 
     fn.hour(data['request_timestamp']).alias('request_hour'), 
     fn.date_format(
      data['request_timestamp'], 
      'F' 
     ).alias('request_day_of_week') 
    ) \ 
    .agg(
     fn.countDistinct(data['user_id']).alias('num_users'), 
     fn.count('*').alias('num_requests'), 
     fn.avg(data['microseconds']).alias('avg_response_time_microseconds') 
    )