2017-09-10 47 views
0

我有兩個dataframes,其中我想知道基於作爲重點其它wisei將更新一個是不同的,所以他們變得等於列無論他們是不同的更新數據框中,如果不相等火花數據幀

 val TMP_SITE = spark.load("jdbc", Map("url" -> "jdbc:oracle:thin:System/[email protected]//localhost:1521/XE", "dbtable" -> "IPTECH.TMP_SITE")) 
     .withColumn("SITE",'SITE.cast(LongType)) 

     val local_pos = spark.load("jdbc", Map("url" -> url, "dbtable" -> "pos")).select("id","name") 

    TMP_SITE.printSchema() 
    local_pos.printSchema() 

     val join = TMP_SITE.join(local_pos, 'SITE === 'id, "inner") 


root 
|-- SITE: long (nullable = true) 
|-- LIBELLE: string (nullable = false) 

root 
|-- id: long (nullable = false) 
|-- name: string (nullable = true) 

加入的結果是

|id |name     |SITE|LIBELLE    | 
+---+----------------------+----+----------------------+ 
|51 |Ezzahra    |51 |Ezzahra    | 
|7 |BENIKHALLED   |7 |BENIKHALLED   | 
|15 |Kram     |15 |Kram     | 
|54 |El Mourouj   |54 |El Mourouj   | 
|11 |LE BARDO    |11 |LE BARDO    | 
|29 |Mini M Ksar said  |29 |Mini M Ksar said  | 
|69 |ZAGHOUAN    |69 |ZAGHOUAN    | 
|42 |BEB EL KHADHRA  |42 |BEB EL KHADHRA  | 
|73 |Zaouit Kontech  |73 |Zaouit Kontech  | 
|87 |Aouina    |87 |Aouina    | 
|64 |Sousse I I   |64 |Sousse I I   | 
|3 |SAHRA CONFORT : KORBA |3 |SAHRA CONFORT : KORBA | 
|34 |SOUKRA SQUARE   |34 |SOUKRA SQUARE   | 
|59 |SAHRA CONFORT : ZARZIS|59 |SAHRA CONFORT : ZARZIS| 
|8 |Jerba     |8 |Jerba     | 
|22 |Moknine    |22 |Moknine    | 
|28 |RDAYEF    |28 |RDAYEF    | 
|85 |MONASTIR ABSORBA  |85 |MONASTIR ABSORBA  | 
|16 |BARDO HANAYA   |16 |BARDO HANAYA   | 
|35 |Mini M Agba   |35 |Mini M Agba   | 
+---+----------------------+----+----------------------+ 

我這樣做

VAL溫度= join.withColumn( 「變化」,當($ 「LIBELLE」 === $ 「名」,點燃( 「一無所有」 ))。否則(「需要更新」)) 我得到這個

|id |name     |SITE|LIBELLE    |changes  | 
+---+----------------------+----+----------------------+--------------+ 
|51 |Ezzahra    |51 |Ezzahra    |nothing  | 
|7 |BENIKHALLED   |7 |BENIKHALLED   |nothing  | 
|15 |Kram     |15 |Kram     |nothing  | 
|54 |El Mourouj   |54 |El Mourouj   |nothing  | 
|11 |LE BARDO    |11 |LE BARDO    |nothing  | 
|29 |Mini M Ksar said  |29 |Mini M Ksar said  |nothing  | 
|69 |ZAGHOUAN    |69 |ZAGHOUAN    |nothing  | 
|42 |BEB EL KHADHRA  |42 |BEB EL KHADHRA  |nothing  | 
|73 |Zaouit Kontech  |73 |Zaouit Kontech  |need an update| 
|87 |Aouina    |87 |Aouina    |nothing  | 
|64 |Sousse I I   |64 |Sousse I I   |nothing  | 
|3 |SAHRA CONFORT : KORBA |3 |SAHRA CONFORT : KORBA |nothing  | 
|34 |SOUKRA SQUARE   |34 |SOUKRA SQUARE   |nothing  | 
|59 |SAHRA CONFORT : ZARZIS|59 |SAHRA CONFORT : ZARZIS|nothing  | 
|8 |Jerba     |8 |Jerba     |nothing  | 
|22 |Moknine    |22 |Moknine    |need an update| 
|28 |RDAYEF    |28 |RDAYEF    |nothing  | 
|85 |MONASTIR ABSORBA  |85 |MONASTIR ABSORBA  |nothing  | 
|16 |BARDO HANAYA   |16 |BARDO HANAYA   |nothing  | 
|35 |Mini M Agba   |35 |Mini M Agba   |nothing  | 
+---+----------------------+----+----------------------+--------------+ 

我不爲什麼說,他們需要的,因爲它們是相同的更新。 儘管它不應該對所有的人說什麼,但它們是平等的。 任何幫助應讚賞

回答

1

一旦你有一個dataframe,它很容易發揮與columnsrows

所以,你有你的join

+----+---------------------+----+---------------------+ 
|SITE|LIBELLE    |id |name     | 
+----+---------------------+----+---------------------+ 
|48 |Mini M Boumhel  |48 |Mini M Boumhel  | 
|67 |Lac     |67 |Lac     | 
|992 |test2    |992 |test     | 
|44 |KAIROUAN    |44 |KAIROUAN    | 
|61 |Tunis    |61 |Tunis    | 
|9001|MONOPRIX    |9001|MONOPRIX    | 
|3 |SAHRA CONFORT : KORBA|3 |SAHRA CONFORT : KORBA| 
|37 |Mini M Borj Lozir |37 |Mini M Borj Lozir | 
|83 |Jendouba    |83 |Jendouba    | 
|12 |Bigro    |12 |Bigro    | 
+----+---------------------+----+---------------------+ 

您可以創建你寫的邏輯,但通過使用when功能

另一列
import org.apache.spark.sql.functions._ 
val temp = join.withColumn("changes", when($"LIBELLE" === $"name", lit("nothing")).otherwise("need an update")) 

tempdataframe將是繼dataframe

+----+---------------------+----+---------------------+--------------+ 
|SITE|LIBELLE    |id |name     |changes  | 
+----+---------------------+----+---------------------+--------------+ 
|48 |Mini M Boumhel  |48 |Mini M Boumhel  |nothing  | 
|67 |Lac     |67 |Lac     |nothing  | 
|992 |test2    |992 |test     |need an update| 
|44 |KAIROUAN    |44 |KAIROUAN    |nothing  | 
|61 |Tunis    |61 |Tunis    |nothing  | 
|9001|MONOPRIX    |9001|MONOPRIX    |nothing  | 
|3 |SAHRA CONFORT : KORBA|3 |SAHRA CONFORT : KORBA|nothing  | 
|37 |Mini M Borj Lozir |37 |Mini M Borj Lozir |nothing  | 
|83 |Jendouba    |83 |Jendouba    |nothing  | 
|12 |Bigro    |12 |Bigro    |nothing  | 
+----+---------------------+----+---------------------+--------------+ 

現在你可以使用在dataframe作爲

temp.filter($"changes" === "need an update").show(false) 

filter方法,它應該給你

+----+-------+---+----+--------------+ 
|SITE|LIBELLE|id |name|changes  | 
+----+-------+---+----+--------------+ 
|992 |test2 |992|test|need an update| 
+----+-------+---+----+--------------+ 

你只需要與列玩用selectgroupByaggregationsfilters和其他內置功能或使用udf函數等等。您甚至可以轉換爲rddtuples,就像您在示例中一樣。

+0

請我更新了這個問題,我試過你的建議,不要爲什麼我那樣做,謝謝 –

+0

檢查後面的空格。我想在表格之後的單詞之後有空格。 –

+0

你是對的,我只是用修剪功能修復它 –