2017-06-05 272 views

回答

0
spark = SparkSession.builder.appName('MyApp').getOrCreate() 
rdd = spark.sparkContext.parallelize([2, 43, 1, 25, 3, 13]).zipWithIndex() 
df = spark.createDataFrame(rdd, schema=['int_val', 'idx']).cache() 
df_r = df.select(df['int_val'].alias('int_val_r'), (df['idx'] + 1).alias('idx_r')) 
df = df.join(df_r, on=[df['idx'] == df_r['idx_r']], how='left_outer').orderBy(df['idx']) 
df = df.withColumn('res', f.when(df['idx'] % 2 == 1, df['int_val'] * df['int_val_r']).otherwise(f.lit('null'))) 
df.show() 

輸出:

+-------+---+---------+-----+----+            
|int_val|idx|int_val_r|idx_r| res| 
+-------+---+---------+-----+----+ 
|  2| 0|  null| null|null| 
|  43| 1|  2| 1| 86| 
|  1| 2|  43| 2|null| 
|  25| 3|  1| 3| 25| 
|  3| 4|  25| 4|null| 
|  13| 5|  3| 5| 39| 
+-------+---+---------+-----+----+