如何從SFTP服務器加載文件到spark RDD。加載這個文件後,我需要對數據執行一些過濾。此外,該文件是csv文件,所以你可以幫我決定是否應該使用數據框或RDD。從SFTP服務器加載文件到spark RDD
2
A
回答
1
您可以以下方式使用spark-sftp
庫在你的程序:
火花2.x的
Maven的依賴
<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_2.11</artifactId>
<version>1.1.0</version>
</dependency>
SBT依賴
libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.0"
與火花殼
此包可添加使用--packages命令行選項來使用火花。
$ bin/spark-shell --packages com.springml:spark-sftp_2.11:1.1.0
Scala的API
// Construct Spark dataframe using file in FTP server
val df = spark.read.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
option("inferSchema", "true").
load("/ftp/files/sample.csv")
// Write dataframe as CSV file to FTP server
df.write.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
save("/ftp/files/sample.csv")
用於火花的1.x(1.5+)
Maven的依賴
:例如,要在啓動火花殼時它包括<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_2.10</artifactId>
<version>1.0.2</version>
</dependency>
SBT依賴
libraryDependencies += "com.springml" % "spark-sftp_2.10" % "1.0.2"
與火花殼
此包可添加使用--packages
命令行選項來使用火花。例如,在啓動火花外殼時,它包含:
$ bin/spark-shell --packages com.springml:spark-sftp_2.10:1.0.2
斯卡拉API
import org.apache.spark.sql.SQLContext
// Construct Spark dataframe using file in FTP server
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
option("inferSchema", "true").
load("/ftp/files/sample.csv")
// Write dataframe as CSV file to FTP server
df.write().
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
save("/ftp/files/sample.csv")
欲瞭解更多有關spark-sftp
你可以參觀那裏的github頁springml/spark-sftp
0
加載從SFTP是直接使用sftp連接器。
https://github.com/springml/spark-sftp
記住它是單線程的應用程序和數據的土地到HDFS即使你不指定它。它將數據流化爲hdfs,然後在其上創建一個DataFrame
加載時,我們需要指定幾個參數。
通常在沒有指定位置的情況下,當用戶sudo用戶使用hdfs時,它也可能正常工作。它將在/ hdfs中創建臨時文件,並在完成該過程後將其刪除。
val data = sparkSession.read.format("com.springml.spark.sftp").
option("host", "host").
option("username", "user").
option("password", "password").
option("fileType", "json").
option("createDF", "true").
option("hdfsTempLocation","/user/currentuser/").
load("/Home/test_mapping.json");
所有可用的選項下面,源代碼
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = {
val username = parameters.get("username")
val password = parameters.get("password")
val pemFileLocation = parameters.get("pem")
val pemPassphrase = parameters.get("pemPassphrase")
val host = parameters.getOrElse("host", sys.error("SFTP Host has to be provided using 'host' option"))
val port = parameters.get("port")
val path = parameters.getOrElse("path", sys.error("'path' must be specified"))
val fileType = parameters.getOrElse("fileType", sys.error("File type has to be provided using 'fileType' option"))
val inferSchema = parameters.get("inferSchema")
val header = parameters.getOrElse("header", "true")
val delimiter = parameters.getOrElse("delimiter", ",")
val createDF = parameters.getOrElse("createDF", "true")
val copyLatest = parameters.getOrElse("copyLatest", "false")
//System.setProperty("java.io.tmpdir","hdfs://devnameservice1/../")
val tempFolder = parameters.getOrElse("tempLocation", System.getProperty("java.io.tmpdir"))
val hdfsTemp = parameters.getOrElse("hdfsTempLocation", tempFolder)
val cryptoKey = parameters.getOrElse("cryptoKey", null)
val cryptoAlgorithm = parameters.getOrElse("cryptoAlgorithm", "AES")
val supportedFileTypes = List("csv", "json", "avro", "parquet")
if (!supportedFileTypes.contains(fileType)) {
sys.error("fileType " + fileType + " not supported. Supported file types are " + supportedFileTypes)
}
相關問題
- 1. 從遠程服務器到本地服務器的Sftp文件
- 2. 從服務器加載文件到Dropzonejs
- 3. 將文件上傳到SFTP服務器
- 4. Spark:從REST服務創建RDD
- 5. 使用JSch從SFTP服務器下載文件
- 6. 使用PowerShell從SFTP服務器下載文件
- 7. 使用Bash遞歸地從SFTP服務器下載文件
- 8. 使用SSH.NET從SFTP服務器下載一個特定文件
- 9. 多線程從sftp服務器下載相同的文件
- 10. 作爲Jenkins作業從SFTP服務器下載多個文件
- 11. 從SFTP服務器下載壓縮文件
- 12. 使用SharpSSH從SFTP服務器下載文件?
- 13. 如何使用PHP從SFTP服務器下載文件
- 14. 如何使用curl從sftp服務器下載文件
- 15. 使用chilkat將zip文件上載到SFTP服務器python
- 16. 如何將文件從Sftp服務器下載到本地機器
- 17. SFTP檢索文件從Windows服務器到Linux服務器使用java
- 18. xsl文件從PHP服務器加載
- 19. Raphael從服務器加載SVG文件
- 20. 加載XML文件到服務器
- 21. 加載文件到服務器
- 22. 如何使用服務器端加密向S3寫入spark rdd
- 23. Flume - 將文件從文件服務器加載到HDFS
- 24. 從jsp頁面上傳到SFTP服務器的文件
- 25. 如何使用SFTP將文件從Android發送到服務器?
- 26. 將文件從s3傳輸到SFTP服務器
- 27. 添加列到RDD Spark 1.2.1
- 28. 從服務器下載文件到iPhone
- 29. 從url下載文件到服務器
- 30. Sftp服務器中的子文件夾
什麼火花spark.read?它是sc嗎?它給我錯誤。任何導入它? – vindev
'spark.read'中的spark是'SparkSession',它是在Spark 2.0中引入的。如果您使用的是Spark版本<2.x,則可以使用'sqlContext',而不是'spark'。 – himanshuIIITian
謝謝:)其複製文件,但在加載語句中給出以下錯誤。 17/04/14 12:51:17 INFO SFTPClient:成功複製文件... 線程「main」中的異常java.lang.NoSuchMethodError:org.apache.spark.sql.DataFrameReader.load(Ljava/lang/String ;)Lorg /阿帕奇/火花/ SQL /數據集; – vindev