0
我有2個數據幀: dataframe1
具有70000行,如:創建火花數據幀基於條件
location_id, location, flag
1,Canada,active
2,Paris,active
3,London,active
4,Berlin,active
對於每個位置二DF lookup
已修改IDS(此數據幀被修改的時間到時間),像:
id,location
1,Canada
10,Paris
4,Berlin
3,London
我的問題是,我需要從lookup
LOCATION_ID新的ID,如果location_id
比id
不同的,那麼,保持CORRE的老ID標誌名稱爲非活動狀態(維護歷史數據)的新位置和標誌名稱爲每個位置活動的新標識。因此,蜂巢中的輸出表應該如下所示:
location_id,location,flag
1,Canada,active
2,Paris,inactive
10,Paris,active
3,London,active
4,Berlin,active
我試圖先加入兩個幀。然後在加入DF,我執行行動,保存在hive.I嘗試操作的所有記錄:
val joinedFrame = dataframe1.join(lookup, "location")
val df_temp = joinedFrame.withColumn("flag1", when($"tag_id" === $"tag_number", "active").otherwise("inactive"))
var count = 1
df_temp.foreach(x => {
val flag1 = x.getAs[String]("flag1").toString
val flag = x.getAs[String]("flag").toString
val location_id = x.getAs[String]("location_id").toString
val location = x.getAs[String]("location").toString
val id = x.getAs[String]("id").toString
if ((count != 1)&&(flag1 != flag)){
println("------not equal-------",flag1,"-------",flag,"---------",id,"---------",location,"--------",location_id)
val df_main = sc.parallelize(Seq((location_id, location,flag1), (id, location, flag))).toDF("location_id", "location", "flag")
df_main.show
df_main.write.insertInto("location_coords")
}
count += 1
})
它打印出具有不同ID的位置值,但同時節省了這些值數據框中,我獲得例外:
not equal------inactive------active---10---------Paris---------2
17/09/29 03:43:29 ERROR Executor: Exception in task 0.0 in stage 25.0 (TID 45)
java.lang.NullPointerException
at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:75)
at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:65)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
17/09/29 03:43:29 WARN TaskSetManager: Lost task 0.0 in stage 25.0 (TID 45, localhost, executor driver): java.lang.NullPointerException
at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:75)
at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:65)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我不認爲你已經用'foreach'循環了一個數據框時就可以使用'sc.parallelize'。 – Shaido
@Shaido什麼應該是可能的選擇呢。謝謝。 – Swati
你只想保存id已經改變的那些行嗎? – Shaido