2017-04-14 84 views
2

如何從SFTP服務器加載文件到spark RDD。加載這個文件後,我需要對數據執行一些過濾。此外,該文件是csv文件,所以你可以幫我決定是否應該使用數據框或RDD。從SFTP服務器加載文件到spark RDD

回答

1

您可以以下方式使用spark-sftp庫在你的程序:

火花2.x的

Maven的依賴

<dependency> 
    <groupId>com.springml</groupId> 
    <artifactId>spark-sftp_2.11</artifactId> 
    <version>1.1.0</version> 
</dependency> 

SBT依賴

libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.0" 

與火花殼

此包可添加使用--packages命令行選項來使用火花。

$ bin/spark-shell --packages com.springml:spark-sftp_2.11:1.1.0 

Scala的API

// Construct Spark dataframe using file in FTP server 
val df = spark.read. 
      format("com.springml.spark.sftp"). 
      option("host", "SFTP_HOST"). 
      option("username", "SFTP_USER"). 
      option("password", "****"). 
      option("fileType", "csv"). 
      option("inferSchema", "true"). 
      load("/ftp/files/sample.csv") 

// Write dataframe as CSV file to FTP server 
df.write. 
     format("com.springml.spark.sftp"). 
     option("host", "SFTP_HOST"). 
     option("username", "SFTP_USER"). 
     option("password", "****"). 
     option("fileType", "csv"). 
     save("/ftp/files/sample.csv") 

用於火花的1.x(1.5+)

Maven的依賴

:例如,要在啓動火花殼時它包括
<dependency> 
    <groupId>com.springml</groupId> 
    <artifactId>spark-sftp_2.10</artifactId> 
    <version>1.0.2</version> 
</dependency> 

SBT依賴

libraryDependencies += "com.springml" % "spark-sftp_2.10" % "1.0.2" 

與火花殼

此包可添加使用--packages命令行選項來使用火花。例如,在啓動火花外殼時,它包含:

$ bin/spark-shell --packages com.springml:spark-sftp_2.10:1.0.2 

斯卡拉API

import org.apache.spark.sql.SQLContext 

// Construct Spark dataframe using file in FTP server 
val sqlContext = new SQLContext(sc) 
val df = sqlContext.read. 
        format("com.springml.spark.sftp"). 
        option("host", "SFTP_HOST"). 
        option("username", "SFTP_USER"). 
        option("password", "****"). 
        option("fileType", "csv"). 
        option("inferSchema", "true"). 
        load("/ftp/files/sample.csv") 

// Write dataframe as CSV file to FTP server 
df.write(). 
     format("com.springml.spark.sftp"). 
     option("host", "SFTP_HOST"). 
     option("username", "SFTP_USER"). 
     option("password", "****"). 
     option("fileType", "csv"). 
     save("/ftp/files/sample.csv") 

欲瞭解更多有關spark-sftp你可以參觀那裏的github頁springml/spark-sftp

+0

什麼火花spark.read?它是sc嗎?它給我錯誤。任何導入它? – vindev

+0

'spark.read'中的spark是'SparkSession',它是在Spark 2.0中引入的。如果您使用的是Spark版本<2.x,則可以使用'sqlContext',而不是'spark'。 – himanshuIIITian

+0

謝謝:)其複製文件,但在加載語句中給出以下錯誤。 17/04/14 12:51:17 INFO SFTPClient:成功複製文件... 線程「main」中的異常java.lang.NoSuchMethodError:org.apache.spark.sql.DataFrameReader.load(Ljava/lang/String ;)Lorg /阿帕奇/火花/ SQL /數據集; – vindev

0

加載從SFTP是直接使用sftp連接器。

https://github.com/springml/spark-sftp

記住它是單線程的應用程序和數據的土地到HDFS即使你不指定它。它將數據流化爲hdfs,然後在其上創建一個DataFrame

加載時,我們需要指定幾個參數。

通常在沒有指定位置的情況下,當用戶sudo用戶使用hdfs時,它也可能正常工作。它將在/ hdfs中創建臨時文件,並在完成該過程後將其刪除。

val data = sparkSession.read.format("com.springml.spark.sftp"). 
     option("host", "host"). 
     option("username", "user"). 
     option("password", "password"). 
     option("fileType", "json"). 
     option("createDF", "true"). 
     option("hdfsTempLocation","/user/currentuser/"). 
     load("/Home/test_mapping.json"); 

所有可用的選項下面,源代碼

https://github.com/springml/spark-sftp/blob/master/src/main/scala/com/springml/spark/sftp/DefaultSource.scala

override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = { 
    val username = parameters.get("username") 
    val password = parameters.get("password") 
    val pemFileLocation = parameters.get("pem") 
    val pemPassphrase = parameters.get("pemPassphrase") 
    val host = parameters.getOrElse("host", sys.error("SFTP Host has to be provided using 'host' option")) 
    val port = parameters.get("port") 
    val path = parameters.getOrElse("path", sys.error("'path' must be specified")) 
    val fileType = parameters.getOrElse("fileType", sys.error("File type has to be provided using 'fileType' option")) 
    val inferSchema = parameters.get("inferSchema") 
    val header = parameters.getOrElse("header", "true") 
    val delimiter = parameters.getOrElse("delimiter", ",") 
    val createDF = parameters.getOrElse("createDF", "true") 
    val copyLatest = parameters.getOrElse("copyLatest", "false") 
    //System.setProperty("java.io.tmpdir","hdfs://devnameservice1/../") 
    val tempFolder = parameters.getOrElse("tempLocation", System.getProperty("java.io.tmpdir")) 
    val hdfsTemp = parameters.getOrElse("hdfsTempLocation", tempFolder) 
    val cryptoKey = parameters.getOrElse("cryptoKey", null) 
    val cryptoAlgorithm = parameters.getOrElse("cryptoAlgorithm", "AES") 

    val supportedFileTypes = List("csv", "json", "avro", "parquet") 
    if (!supportedFileTypes.contains(fileType)) { 
     sys.error("fileType " + fileType + " not supported. Supported file types are " + supportedFileTypes) 
    }