2012-10-12 105 views
0

我有一個系統,當找到某種類型的文件時,我下載,編碼並將它們上載到一個單獨的線程中。多線程從sftp服務器下載相同的文件

while(true) { 
    for(SftpClient c : clients) { 
     try { 
      filenames = c.list("*.wav", "_rdy_"); 
     } catch (SftpException e) { 
      e.printStackTrace(); 
     } 
     if(filenames.size() > 0) { 
      //AudioThread run() method handles the download, encode, and upload 
      AudioThread at = new AudioThread(filenames); 
      at.setNode(c.getNode()); 
      Thread t = new Thread(at); 
      t.start(); 
     } 
    } 
    try { 
     Thread.sleep(3000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

從AudioThread

public void run() { 
    System.out.println("Running..."); 
    this.buildAsteriskMapping(); 
    this.connectToSFTP(); 
    ac = new AudioConvert(); 
    this.connectToS3(); 

    String downloadDir = "_rough/" + getNode() + "/" + Time.getYYYYMMDDDate() + "/"; 
    String encodeDir = "_completed" + getNode() + "/" + Time.getYYYYMMDDDate() + "/"; 
    String uploadDir = getNode() + "/" + Time.getYYYYMMDDDate() + "/"; 

    System.out.println("Downloading..."); 
    try { 
     sftp.get(filenames, downloadDir); 
    } catch (SftpException e) { 
     //download failed 
     System.out.println("DL Failed..."); 
     e.printStackTrace(); 
    } 

    System.out.println("Encoding..."); 
    try { 
     ac.encodeWavToMP3(filenames, downloadDir, encodeDir); 
    } catch (IllegalArgumentException | EncoderException e) { 
     System.out.println("En Failed..."); 
     e.printStackTrace(); 
    } 

    System.out.println("Uploading..."); 
    try { 
     s3.upload(filenames, encodeDir, uploadDir); 
    } catch (AmazonClientException e) { 
     System.out.println("Up Failed..."); 
     e.printStackTrace(); 
    } 

} 

下載方法run方法:

public void get(ArrayList<String> src, String dest) throws SftpException { 
    for(String file : src) { 
     System.out.println(dest + file); 
     channel.get(file, dest + file); 
    } 
} 

的編碼方法:

public void encodeWavToMP3(ArrayList<String> filenames, String downloadDir, String encodeDir) throws IllegalArgumentException, EncoderException { 
    for(String f : filenames) { 
     File wav = new File(downloadDir + f); 
     File mp3 = new File(encodeDir + wav.getName().replace(".wav", ".mp3")); 
     encoder.encode(wav, mp3, attrs); 
    } 
} 

上傳方法:

public void upload(ArrayList<String> filenames, String encodeDir, String uploadDir) throws AmazonClientException, AmazonServiceException { 
    for(String f : filenames) { 
     s3.putObject(new PutObjectRequest(bucketName, uploadDir, new File(encodeDir + f))); 
    } 
} 

問題是我一直在爲每個線程下載相同的文件(或大約相同的文件)。我想爲包含正在下載的文件的每個客戶端添加一個變量,但我不知道如何從該變量中刪除列表/文件名。什麼是解決方案?我的老闆也只想讓x線程運行。

回答

4

這是一種很難看到的問題,因爲實際執行下載缺少代碼:P

不過,我會用某種ExecutorService代替。

基本上,我會將每個下載請求添加到服務中(包裝在「DownloadTask」中,參考要下載的文件以及可能需要獲取文件的任何其他相關信息),並讓服務保持小心剩下的。

可以對下載任務進行編碼,以考慮您認爲合適的現有文件。

根據您的要求,這可能是單線程或多線程服務。它也可以讓你把上傳任務也放在裏面。

退房的Executors線索更多信息

總的想法是用一種生產者/消費者模式。你將有(至少)一個線程來查找所有要下載的文件,併爲每個文件添加它到執行程序服務。文件下載後,我會排隊並將請求上傳到同一服務中。

這樣,你避免所有的混亂與同步和線程管理:d

你可以用同樣的想法與掃描任務,爲每一個客戶,你可以一個任務到一個單獨的服務

+1

是,+1。這個問題接近生產者/消費者原則。聲明一個隊列(http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html),一個線程添加文件上傳到隊列,以及由ExecutorService管理的多個線程,請參閱http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29消耗隊列的文件路徑 – Aubin

1

代碼中存在一個問題,您在while循環中實例化AudioThread。

請注意,創建線程並執行t.start()後,所有下載,編碼和上傳都將異步發生。因此,在啓動線程後,循環會繼續執行另一個對c.list(...)的調用,而您創建的第一個線程仍在處理第一組文件。由於您在調用中指定了文件模式,並且沒有代碼標記哪些文件當前正在處理,因此很可能在後續的c.list()調用中返回相同的一組文件。

我的建議:

  • 使用Executors.newFixedThreadPool(INT來確定nthreads)在以前的文章中提到。並將線程數指定爲機器中處理器的數量。在你的while循環之前執行此操作。
  • 對於你從FTP s.list()檢索到的每個文件名,創建一個Callable類並調用ExecutorService.invokeAll(收藏<可贖回<牛逼> >任務)。您將創建的Callable中的代碼是您的AudioThread代碼。修改AudioThread代碼,以便只處理一個文件(如果可能的話),這樣您就可以爲每個文件並行地進行下載,上傳和編碼。
  • 添加標記哪些文件已被處理的代碼。我會建議添加一個代碼,將您已處理的文件重命名爲不同的名稱,以避免在下一個c.list()調用中返回。
  • 在您的while循環塊後呼叫ExecutorService.shutdown(...)