2016-06-11 65 views
3

我試圖使用Spark將文本文件導出到Postgres數據庫。我正在使用下面的一段代碼來導出單個文本文件。我在同一個文件夾中有近200個文本文件,每個文本文件具有相同的結構。不幸的是,一年的價值不是我的輸入文件的一部分,因此我很難編碼它。使用Spark將文本文件導出到PostgreSQL - 自動化

我希望一次上傳所有這些文件,但不知道該怎麼做,有人有什麼建議嗎?

from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc) 

lines = sc.textFile("/aaaa/bbbb/DataFile/t-1870.txt") 
splits = lines.map(lambda l: l.split(",")) 
raw_data = splits.map(lambda b: Row(name=b[0], gender=b[1],count=int(b[2]),year=int(1870))) 

schemaBabies = sqlContext.createDataFrame(raw_data) 
schemaBabies.registerTempTable("raw_data") 

df = sqlContext.sql("select * from raw_data") 

pgurl="jdbc:postgresql://localhost:5432/sparkling?user=XXXX&password=XXXX" 
properties={"user":"XXXX","password":"XXXX","driver":"org.postgresql.Driver","mode":"append"} 

df.write.jdbc(url = pgurl ,table = "EDW.raw_data",properties=properties) 

回答

2

讓我們假設你的數據是這樣的:

import csv 
import tempfile 
import os 

out = tempfile.mkdtemp() 
data = [ 
    ("1870", [("Jane Doe", "F", 3)]), 
    ("1890", [("John Doe", "M", 1)]), 
] 

for year, rows in data: 
    with open(os.path.join(out, "t-{0}.txt".format(year)), "w") as fw: 
     csv.writer(fw).writerows(rows) 

開始PySpark會議或提交腳本傳遞正確spark-csv--packages參數,負載數據與指定模式:

from pyspark.sql.types import * 

schema = StructType([ 
    StructField("name", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("count", LongType(), True) 
]) 

df = (sqlContext.read.format("com.databricks.spark.csv") 
    .schema(schema) 
    .load(out)) 

提取物年從文件名中寫入:

from pyspark.sql.functions import input_file_name, regexp_extract 

df_with_year = (df.withColumn(
    "year", 
    regexp_extract(input_file_name(), "[1-2][0-9]{3}", 0).cast("int"))) 

df_with_year.show() 
## +--------+------+-----+----+ 
## | name|gender|count|year| 
## +--------+------+-----+----+ 
## |John Doe|  M| 1|1890| 
## |Jane Doe|  F| 3|1870| 
## +--------+------+-----+----+ 

df_with_year.write.jdbc(...) 

重要:在Spark < 2.0中,此方法依賴於不在Python和JVM之間傳遞數據。它將無法​​與Python UDF或DataFrame.rdd.map工作

+1

我確實根據您的輸入對我的代碼進行了一些更改,我可以將所有200多個文本文件加載到數據庫中。真的很感激你的幫助。 – ytasfeb15

相關問題