2016-02-26 27 views
0

這是情景。我必須在定時間隔內輪詢一個ftp服務器並獲取csv文件。然後,這些CSV文件必須被解析並作爲輸入發送給某些業務邏輯。我已經這樣做了。生產者消費者模型如情景不起作用

FTPClientPolling(監製)

public class FTPClientPolling { 

    private static FTPClientPolling instance = null; 

    private FTPClientPolling() { 
    } 

    public synchronized static FTPClientPolling getInstance() { 
     if (instance == null) { 
      logger.info("Object created for Client Polling"); 
      instance = new FTPClientPolling(); 
      initializeFTPClient(); 
     } 
     return instance; 
    } 

    public static void initializeFTPClient() { 
     // initialize the values from properties file 
    } 

    public void startPolling() { 

     FTPClient ftpClient = null; 
     try { 
      //connecting to ftp server 

      //iterating the files in it 
      FTPFile[] filesList = ftpClient.listFiles(); 
      for (FTPFile tmpFile : filesList) { 
       //.. 
       File tempFile = File.createTempFile(tmpFile.getName(), null); 
       FileOutputStream fileOut = new FileOutputStream(tempFile); 
       ftpClient.retrieveFile(tmpFile.getName(), fileOut); 

       //adding the file to the Queue of the file processor 
       FileProcessor.getInstance().getFilesToBeProcessedQueue().add(tempFile); 
      } 

      if (ftpClient.isConnected()) 
       ftpClient.disconnect(); 
     } catch (Exception e) { 
      //logging 
     } finally { 
      //closing ftpclient 
     } 

    } 
} 

FTPClientPollingTasker(監製塔斯克)

public class FTPClientTasker extends TimerTask { 
    private static Long timeInterval = 10000l; 

    @Override 
    public void run() { 
     FTPClientPolling.getInstance().startPolling(); 
    } 

    public static void start() { 
     TimerTask timerTask = new FTPClientTasker(); 
     Timer timer = new Timer(); 
     timer.scheduleAtFixedRate(timerTask, timeInterval, timeInterval); 
    } 

    public static void main(String[] args) { 
     start(); 
    } 
} 

FileProcessor(消費者)

public final class FileProcessor { 

    private static FileProcessor instance = null; 
    private Queue<File> filesToBeProcessedQueue = new ArrayBlockingQueue<File>(10); 

    private FileProcessor() { 
    } 

    public synchronized static FileProcessor getInstance() { 
     if (instance == null) { 
      instance = new FileProcessor(); 
     } 
     return instance; 
    } 

    public void run() { 
     while (!filesToBeProcessedQueue.isEmpty()) { 
      processSyncFiles(filesToBeProcessedQueue.poll()); 
     } 
    } 

    private void processSyncFiles(File inputFile) { 
     try { 
      HashMap<String, Boolean> outputConsolidation = new HashMap<String, Boolean>(); 

      FileReader fileReader = new FileReader(inputFile); 
      List<InputBean> csvContentsList = CSVParser.readContentsFromCSV(fileReader, new InputBean()); 
      for (InputBean inputBean : csvContentsList) { 
       boolean output = false; 
       // some business logic 
       outputConsolidation.put(inputBean.toString(), output); 
      } 
     } catch (Exception e) { 
      //logging 
     } 
    } 

    public synchronized Queue<File> getFilesToBeProcessedQueue() { 
     return filesToBeProcessedQueue; 
    } 
} 

FileProcessor Tasker(Consumer Scheduler) 此類創建一個Tasker for FileProcessor,並按計劃的時間間隔運行它。

public final class FileProcessorTasker extends TimerTask { 

    private static Long timeInterval = 5000l; 

    @Override 
    public void run() { 
     FileProcessor.getInstance().run(); 
    } 

    public static void start() { 
     TimerTask timerTask = new FileProcessorTasker(); 
     Timer timer = new Timer(); 
     timer.schedule(timerTask, timeInterval, timeInterval); 
    } 

    public static void main(String[] args) { 
     FileProcessorTasker.start(); 
    } 
} 

這兩個程序都單獨運行良好。但是當通過filesToBeProcessedQueue連接在一起時,它似乎並不奏效。 問題是FTPClientPolling創建了一個對象FileProcessor並將該文件添加到隊列中。 FileProcessorTasker創建隊列大小爲零的另一個對象FileProcessor。這兩個不同的對象是問題。它是如何創建兩個對象,當這個類是singleton。我在單例實現中丟失了什麼?

回答

1

首先,請勿使用TimerTimerTask。使用ExecutorService進行多線程。

並在您的Singleton類中使用Eager初始化。或者雙重檢查null的鎖定,以使你的Singleton真的是單身。

FTPClientPolling.java

public class FTPClientPolling { 

    private static FTPClientPolling instance = new FTPClientPolling(); 

    private FTPClientPolling() { 
     logger.info("Object created for Client Polling"); 
     initializeFTPClient(); 
    } 

    public static FTPClientPolling getInstance() { 
     return instance; 
    } 

    public static void initializeFTPClient() { 
     // initialize the values from properties file 
    } 

    public void startPolling() { 

     FTPClient ftpClient = null; 
     try { 
      //connecting to ftp server 

      //iterating the files in it 
      FTPFile[] filesList = ftpClient.listFiles(); 
      for (FTPFile tmpFile : filesList) { 
       //.. 
       File tempFile = File.createTempFile(tmpFile.getName(), null); 
       FileOutputStream fileOut = new FileOutputStream(tempFile); 
       ftpClient.retrieveFile(tmpFile.getName(), fileOut); 

       //adding the file to the Queue of the file processor 
       FileProcessor.getInstance().getFilesToBeProcessedQueue().add(tempFile); 
      } 

      if (ftpClient.isConnected()) 
       ftpClient.disconnect(); 
     } catch (Exception e) { 
      //logging 
     } finally { 
      //closing ftpclient 
     } 

    } 
} 

FileProcessor.java

public final class FileProcessor { 

    private static FileProcessor instance = new FileProcessor(); 
    private Queue<File> filesToBeProcessedQueue = new ArrayBlockingQueue<File>(10); 

    private FileProcessor() { 
    } 

    public static FileProcessor getInstance() { 
     return instance; 
    } 

    public void run() { 
     while (!filesToBeProcessedQueue.isEmpty()) { 
      processSyncFiles(filesToBeProcessedQueue.poll()); 
     } 
    } 

    private void processSyncFiles(File inputFile) { 
     try { 
      HashMap<String, Boolean> outputConsolidation = new HashMap<String, Boolean>(); 

      FileReader fileReader = new FileReader(inputFile); 
      List<InputBean> csvContentsList = CSVParser.readContentsFromCSV(fileReader, new InputBean()); 
      for (InputBean inputBean : csvContentsList) { 
       boolean output = false; 
       // some business logic 
       outputConsolidation.put(inputBean.toString(), output); 
      } 
     } catch (Exception e) { 
      //logging 
     } 
    } 

    public synchronized Queue<File> getFilesToBeProcessedQueue() { 
     return filesToBeProcessedQueue; 
    } 
} 

閱讀本post以獲取更多信息。