我有一個系統,當找到某種類型的文件時,我下載,編碼並將它們上載到一個單獨的線程中。多線程從sftp服務器下載相同的文件
while(true) {
for(SftpClient c : clients) {
try {
filenames = c.list("*.wav", "_rdy_");
} catch (SftpException e) {
e.printStackTrace();
}
if(filenames.size() > 0) {
//AudioThread run() method handles the download, encode, and upload
AudioThread at = new AudioThread(filenames);
at.setNode(c.getNode());
Thread t = new Thread(at);
t.start();
}
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
從AudioThread
public void run() {
System.out.println("Running...");
this.buildAsteriskMapping();
this.connectToSFTP();
ac = new AudioConvert();
this.connectToS3();
String downloadDir = "_rough/" + getNode() + "/" + Time.getYYYYMMDDDate() + "/";
String encodeDir = "_completed" + getNode() + "/" + Time.getYYYYMMDDDate() + "/";
String uploadDir = getNode() + "/" + Time.getYYYYMMDDDate() + "/";
System.out.println("Downloading...");
try {
sftp.get(filenames, downloadDir);
} catch (SftpException e) {
//download failed
System.out.println("DL Failed...");
e.printStackTrace();
}
System.out.println("Encoding...");
try {
ac.encodeWavToMP3(filenames, downloadDir, encodeDir);
} catch (IllegalArgumentException | EncoderException e) {
System.out.println("En Failed...");
e.printStackTrace();
}
System.out.println("Uploading...");
try {
s3.upload(filenames, encodeDir, uploadDir);
} catch (AmazonClientException e) {
System.out.println("Up Failed...");
e.printStackTrace();
}
}
下載方法run方法:
public void get(ArrayList<String> src, String dest) throws SftpException {
for(String file : src) {
System.out.println(dest + file);
channel.get(file, dest + file);
}
}
的編碼方法:
public void encodeWavToMP3(ArrayList<String> filenames, String downloadDir, String encodeDir) throws IllegalArgumentException, EncoderException {
for(String f : filenames) {
File wav = new File(downloadDir + f);
File mp3 = new File(encodeDir + wav.getName().replace(".wav", ".mp3"));
encoder.encode(wav, mp3, attrs);
}
}
上傳方法:
public void upload(ArrayList<String> filenames, String encodeDir, String uploadDir) throws AmazonClientException, AmazonServiceException {
for(String f : filenames) {
s3.putObject(new PutObjectRequest(bucketName, uploadDir, new File(encodeDir + f)));
}
}
問題是我一直在爲每個線程下載相同的文件(或大約相同的文件)。我想爲包含正在下載的文件的每個客戶端添加一個變量,但我不知道如何從該變量中刪除列表/文件名。什麼是解決方案?我的老闆也只想讓x線程運行。
是,+1。這個問題接近生產者/消費者原則。聲明一個隊列(http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html),一個線程添加文件上傳到隊列,以及由ExecutorService管理的多個線程,請參閱http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29消耗隊列的文件路徑 – Aubin