1

我已經設置了一個主集羣和2個從屬的Spark集羣(我正在使用Spark Standalone)。羣集與一些示例運行良好,但不是我的應用程序。我的應用程序工作流程是,它將讀取csv - >將csv中的每一行以及標題 - >轉換爲JSON - >保存到S3。這裏是我的代碼:PySpark - Spark集羣EC2 - 無法保存到S3

def upload_func(row): 
    f = row.toJSON() 
    f.saveAsTextFile("s3n://spark_data/"+ row.name +".json") 
    print(f) 
    print(row.name) 

if __name__ == "__main__": 
    spark = SparkSession \ 
     .builder \ 
     .appName("Python Spark SQL data source example") \ 
     .getOrCreate() 
    df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED") 
    df.rdd.map(upload_func) 

我還出口AWS_Key_IDAWS_Secret_Key到EC2環境。但是,通過上面的代碼,我的應用程序不起作用。下面是問題:

  1. 的JSON文件都不會保存在S3中,我曾嘗試運行應用程序幾次,也重新加載頁面S3,但沒有數據。該應用程序在日誌中沒有任何錯誤地完成。此外,日誌中不打印print(f)print(row.name)。爲了在S3上獲得JSON保存,我需要修復哪些內容,並且爲了調試目的,我是否有打印日誌的權限?

  2. 目前我需要將csv文件放在worker節點中,以便應用程序可以讀取csv文件。我怎麼能把文件放在另一個地方,比如說主節點和應用程序運行時,它會將csv文件拆分爲所有工作節點,以便他們可以將上傳並行化爲分佈式系統?

幫助真的很感激。感謝您的幫助提前。

修訂

把記錄器來調試後,我已經確定了問題的地圖功能upload_func()不會被調用或應用此功能中不能得到(記錄器之前和之後的函數調用印刷消息) 。如果你知道原因,請幫忙嗎?

回答

0

你需要強制地圖進行評估;火花只會按需執行工作。

df.rdd.map(upload_func).count()應該這樣做