2016-07-05 12 views
2

我最近給了一個關於Java併發任務的編碼訪問,很不幸沒有得到這份工作。最糟糕的是我已經盡力了,但現在我甚至不知道哪裏出了問題。任何人都可以幫助給我一些關於我可以改進下面的代碼的一些想法嗎?謝謝Java併發代碼改進思路

這個問題很模糊。給定4個通用接口,在高層將任務分成小塊,在每塊上工作並將部分結果合併到最終結果中,我被要求實現接口的中央控制器部分。唯一的要求是在部分結果處理中使用的併發性和「代碼必須生產質量」

我的代碼是如下面(接口給出)。我也爲此投入了大量的註釋來解釋我的假設它在這裏

// adding V,W in order to use in private fields types 
public class ControllerImpl<T, U, V, W> implements Controller<T, U> { 

    private static Logger logger = LoggerFactory.getLogger(ControllerImpl.class); 

    private static int BATCH_SIZE = 100; 

    private Preprocessor<T, V> preprocessor; 
    private Processor<V, W> processor; 
    private Postprocessor<U, W> postprocessor; 

    public ControllerImpl() { 
     this.preprocessor = new PreprocessorImpl<>(); 
     this.processor = new ProcessorImpl<>(); 
     this.postprocessor = new PostprocessorImpl<>(); 
    } 

    public ControllerImpl(Preprocessor preprocessor, Processor processor, Postprocessor postprocessor) { 
     this.preprocessor = preprocessor; 
     this.processor = processor; 
     this.postprocessor = postprocessor; 
    } 

    @Override 
    public U process(T arg) { 
     if (arg == null) return null; 

     final V[] parts = preprocessor.split(arg); 
     final W[] partResult = (W[]) new Object[parts.length]; 

     final int poolSize = Runtime.getRuntime().availableProcessors(); 
     final ExecutorService executor = getExecutor(poolSize); 

     int i = 0; 
     while (i < parts.length) { 
      final List<Callable<W>> tasks = IntStream.range(i, i + BATCH_SIZE) 
       .filter(e -> e < parts.length) 
       .mapToObj(e -> (Callable<W>)() -> partResult[e] = processor.processPart(parts[e])) 
       .collect(Collectors.toList()); 
      i += tasks.size(); 

      try { 
       logger.info("invoking batch of {} tasks to workers", tasks.size()); 
       long start = System.currentTimeMillis(); 
       final List<Future<W>> futures = executor.invokeAll(tasks); 
       long end = System.currentTimeMillis(); 
       logger.info("done batch processing took {} ms", end - start); 
       for (Future future : futures) { 
        future.get(); 
       } 
      } catch (InterruptedException e) { 
       logger.error("{}", e);// have comments to explain better handling according to real business requirement 
      } catch (ExecutionException e) { 
       logger.error("error: ", e); 
      } 
     } 

     MoreExecutors.shutdownAndAwaitTermination(executor, 60, TimeUnit.SECONDS); 

     return postprocessor.aggregate(partResult); 
    } 

    private ExecutorService getExecutor(int poolSize) { 
     final ThreadFactory threadFactory = new ThreadFactoryBuilder() 
      .setNameFormat("Processor-%d") 
      .setDaemon(true) 
      .build(); 
     return new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), threadFactory); 
    } 
} 
+7

這將是[codereview.se] –

+2

之一這也是一個用於Nightmarish Java面試問題。 – Compass

+1

@Compass - 認真。這一定是在面試的「腔體搜索」階段之前出現的。 – rmlan

回答

5

所以,如果我理解正確的話,你有一個預處理器,需要一個T和它分裂中V []數組中刪除。然後你有一個處理器,將V轉換成W,然後是後處理器,將W []轉換成U,對吧?你必須組裝這些東西。

首先,數組和仿製藥確實不匹配在一起,所以它是真正奇怪的那些方法返回數組,而不是名單。對於生產質量的代碼,不應使用通用數組。

因此,回顧一下:

T --> V1 --> W1 --> U 
     V2 --> W2 
     .  . 
     .  . 
     Vn --> Wn 

所以,你可以這樣做:如果您使用列表,而不是陣列

V[] parts = preprocessor.split(t); 
W[] transformedParts = 
    (W[]) Arrays.stream(parts) // unchecked cast due to the use of generic arrays 
       .parallel() // this is where concurrency happens 
       .map(processor::processPart) 
       .toArray(); 
U result = postProcessor.aggregate(transformedParts); 

,並把它寫成一行:

U result = 
    postProcessor.aggregate(
     preprocessor.split(t) 
        .parallelStream() 
        .map(processor::processPart) 
        .collect(Collectors.toList())); 
+1

使用普通forkjoinpool不是並行蒸汽,因此無法控制可用線程數量或共享相同池的其他任務?另外,通過「生產質量」,我確信Future必須在這方面有一席之地,才能檢索出異常,或者我在錯誤的道路上? – CSBob

+0

Reg。公共泳池,是的,但這不一定是壞事。如果需要,您可以隨時將您的任務提交到自定義池。預優化總是一個糟糕的主意,特別是對於這樣的通用任務以及關於上下文的很少的信息。註冊。例外情況,如果這應該是通用的,那麼您不可能正確處理意外的異常而不會拋出。忽略它們並向後處理器傳遞一個不完整的W數組,導致錯誤的結果而不是異常信號,這是一個問題。 –

+0

無論如何,你的解決方案還有其他一些問題:例如每次創建和關閉一個新的線程池,或者爲通用進程(比如池的大小,製作線程守護進程等)做出強有力的假設。它也比它需要的複雜得多。批量分割只是增加了複雜性,沒有什麼,國際海事組織。我會爭取一個簡單易讀的解決方案,並在完成測量後進行調整,如果需要**。 –