我嘗試使用spark的sc.textfile('/home/sathya/location/*.txt')
訪問示例位置中的最新添加文件但是,我需要取最新添加的文件而不是獲取所有文件目錄下。使用spark從文件夾訪問最新更改的文件
感謝, Sathiyarajan中號
我嘗試使用spark的sc.textfile('/home/sathya/location/*.txt')
訪問示例位置中的最新添加文件但是,我需要取最新添加的文件而不是獲取所有文件目錄下。使用spark從文件夾訪問最新更改的文件
感謝, Sathiyarajan中號
你可以從目錄中的最新修改的文件,並把它傳遞給sc.textFile()
火花閱讀。
這裏是你如何能得到最新的修改後的文件
val directory = new File("/home/sathya/location/")
val allFiles = directory.listFiles
.filter(_.isFile)
.sortBy(-_.lastModified())
.toList
val latestFile = allFiles(0)
這裏latestFile
是最新修改的文件,現在你可以閱讀最新的文件到火花作爲
sc.textFile(latestFile)
希望這有助於!
對於您的問題沒有現成的解決方案,首先找到最新的文件,然後加載它。
Java示例:
/**
* Function to get latest file in directory
*/
public static String latestFileInDir(String dir) throws IOException, InterruptedException {
//Replace hadoop home
String command = "<HADOOP_HOME>/bin/hadoop fs -ls -R " + dir + " | awk -F\" \" '{print $6\" \"$7\" \"$8}' | sort -nr | head -1";
ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", command);
String op = null;
Process process = pb.start();
int errCode = process.waitFor();
if (errCode == 0) {
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
op = br.readLine();
}
return op;
}
獲取最新的目錄和負載
rdd= sc.textfile(latestFileInDir("/home/sathya/location/"));
如果這個回答你的問題,你能接受作爲回答,並關閉話題。這對其他人也有幫助 –
其工作,謝謝 – sathiyarajan
感謝您接受作爲答案:) –