我已經設置了一個主集羣和2個從屬的Spark集羣(我正在使用Spark Standalone)。羣集與一些示例運行良好,但不是我的應用程序。我的應用程序工作流程是,它將讀取csv - >將csv中的每一行以及標題 - >轉換爲JSON - >保存到S3。這裏是我的代碼:PySpark - Spark集羣EC2 - 無法保存到S3
def upload_func(row):
f = row.toJSON()
f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
print(f)
print(row.name)
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.getOrCreate()
df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
df.rdd.map(upload_func)
我還出口AWS_Key_ID
和AWS_Secret_Key
到EC2環境。但是,通過上面的代碼,我的應用程序不起作用。下面是問題:
的JSON文件都不會保存在S3中,我曾嘗試運行應用程序幾次,也重新加載頁面S3,但沒有數據。該應用程序在日誌中沒有任何錯誤地完成。此外,日誌中不打印
print(f)
和print(row.name)
。爲了在S3上獲得JSON保存,我需要修復哪些內容,並且爲了調試目的,我是否有打印日誌的權限?目前我需要將csv文件放在worker節點中,以便應用程序可以讀取csv文件。我怎麼能把文件放在另一個地方,比如說主節點和應用程序運行時,它會將csv文件拆分爲所有工作節點,以便他們可以將上傳並行化爲分佈式系統?
幫助真的很感激。感謝您的幫助提前。
修訂
把記錄器來調試後,我已經確定了問題的地圖功能upload_func()
不會被調用或應用此功能中不能得到(記錄器之前和之後的函數調用印刷消息) 。如果你知道原因,請幫忙嗎?