1

我正在過濾來自輸入parquet文件的下列邏輯的整數列,並試圖修改此邏輯以添加其他驗證以查看是否有任何一個輸入列有計數等於輸入parquet文件rdd計數。我想過濾掉這樣的列。篩選列的計數等於輸入文件rdd Spark

更新

列名稱的輸入文件的數量不會是一成不變的,它會改變我們每次得到文件的時間。 其目標是也過濾出其計數等於輸入文件rdd計數的列。以下邏輯已經實現了過濾整數列。

e.g input parquet file count = 100 
    count of values in column A in the input file = 100 

過濾掉任何這樣的列。

當前邏輯

//Get array of structfields 

val columns = df.schema.fields.filter(x => 
       x.dataType.typeName.contains("integer")) 

    //Get the column names 
    val z = df.select(columns.map(x => col(x.name)): _*) 

    //Get array of string 
    val m = z.columns 

新邏輯是這樣

val cnt = spark.read.parquet("inputfile").count() 

    val d = z.column.where column count is not equals cnt 

我不想明確地傳遞列名到新的條件下,由於具有數等於列輸入文件將改變(val d = ..以上) 我們如何爲此編寫邏輯?

+0

所有你過濾,將有相同數量,即行數的整列將是相同的。因此,如果計數與輸入地板文件行數相匹配,那麼數據框中不會有任何行?那是你要的嗎? –

+0

謝謝Ramesh看看這個。所有Integer列將不會有相同的計數,如果您採用approx_count_distinct,則每列的計數將根據其中的不同值而有所不同,現在列的計數或多或少與輸入parquet文件計數相似(在我的情況下爲100)應該被過濾掉。 – sabby

+0

你可以使用select和where函數我猜:) –

回答

2

根據我對你的問題的理解,您在列與integer爲的dataType試圖filter,其distinct count是不是在另一個輸入parquet文件等於rowscount。如果我的理解是正確的,你可以在你現有的過濾器添加列數過濾器

val cnt = spark.read.parquet("inputfile").count() 
val columns = df.schema.fields.filter(x => 
    x.dataType.typeName.contains("string") && df.select(x.name).distinct().count() != cnt) 

代碼的其餘部分應遵循它。

我希望答案是有幫助的。

+0

謝謝Ramesh!棒極了! :) – sabby

+0

@sabby我的榮幸:) –

0

Jeanr和Ramesh提出正確的做法,這裏是我做過什麼,以獲得所需的輸出,它的工作:)

cnt = (inputfiledf.count()) 

val r = df.select(df.col("*")).where(df.col("MY_COLUMN_NAME").<(cnt))