2015-10-19 57 views
2

我們有一個高負載的Apache Camel應用程序,利用logback/MDC記錄信息。我們發現一些MDC信息在logback的文檔中預先警告過。我發現這太問題,解決這個問題:駱駝MDC Logback舊信息在卷

How to use MDC with thread pools?

我們應該如何應用此我們CAMEL應用避免舊的信息?是否有一個簡單的方法來將鏈接問題中建議的默認ThreadPoolExecutor全局更改爲自定義變體?我看到你可以爲游泳池本身做到這一點,但沒有看到執行者的任何例子。請記住,我們的應用程序相當龐大,並且每天都爲大量訂單提供服務 - 我希望儘可能減少對現有應用程序的影響。

回答

2

我想通了,並想發佈我所做的事情,以防止他人受益。請注意,我使用JDK 6/camel2.13.2

  • 駱駝有一個使用DefaultThreadPoolFactory一個DefaultExecutorServiceManager。我將默認工廠擴展爲MdcThreadPoolFactory

  • DefaultThreadPoolFactory具有生成RejectableThreadPoolExecutor s和RejectableScheduledThreadPoolExecutor s的方法。我將這兩個擴展爲Mdc *版本,覆蓋​​方法以包裝Runnable並在線程之間切換MDC信息(如我原始問題中的鏈接所指定的那樣)。

    package com.mypackage.concurrent 
    
    import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor 
    import org.slf4j.MDC; 
    
    import java.util.Map; 
    import java.util.concurrent.*; 
    
    /** 
    * A SLF4J MDC-compatible {@link ThreadPoolExecutor}. 
    * <p/> 
    * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate 
    * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a 
    * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately. 
    * <p/> 
    * Created by broda20. 
    * Date: 10/29/15 
    */ 
    public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor { 
    
        @SuppressWarnings("unchecked") 
        private Map<String, Object> getContextForTask() { 
         return MDC.getCopyOfContextMap(); 
        } 
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
                 BlockingQueue<Runnable> workQueue) { 
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
        } 
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
                 BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { 
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); 
        } 
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
                 BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { 
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); 
        } 
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
                 BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { 
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); 
        } 
    
        /** 
        * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) 
        * all delegate to this. 
        */ 
        @Override 
        public void execute(Runnable command) { 
         super.execute(wrap(command, getContextForTask())); 
        } 
    
        public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) { 
         return new Runnable() { 
          @Override 
          public void run() { 
           Map previous = MDC.getCopyOfContextMap(); 
           if (context == null) { 
            MDC.clear(); 
           } else { 
            MDC.setContextMap(context); 
           } 
           try { 
            runnable.run(); 
           } finally { 
            if (previous == null) { 
             MDC.clear(); 
            } else { 
             MDC.setContextMap(previous); 
            } 
           } 
          } 
         }; 
        } 
    } 
    

    MdcScheduledThreadPoolExecutor:

  • 我在我的應用程序配置由駱駝自動拾取和使用的ExecutorServiceManager

MdcThreadPoolExecutor創建MdcThreadPoolFactory的bean實例

package com.mypackage.concurrent 

import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor 
import org.slf4j.MDC; 

import java.util.Map; 
import java.util.concurrent.*; 

/** 
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}. 
* <p/> 
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate 
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a 
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately. 
* <p/> 
* Created by broda20. 
* Date: 10/29/15 
*/ 
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor { 

    @SuppressWarnings("unchecked") 
    private Map<String, Object> getContextForTask() { 
     return MDC.getCopyOfContextMap(); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize) { 
     super(corePoolSize); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { 
     super(corePoolSize, threadFactory); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { 
     super(corePoolSize, handler); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { 
     super(corePoolSize, threadFactory, handler); 
    } 

    /** 
    * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) 
    * all delegate to this. 
    */ 
    @Override 
    public void execute(Runnable command) { 
     super.execute(wrap(command, getContextForTask())); 
    } 

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) { 
     return new Runnable() { 
      @Override 
      public void run() { 
       Map previous = MDC.getCopyOfContextMap(); 
       if (context == null) { 
        MDC.clear(); 
       } else { 
        MDC.setContextMap(context); 
       } 
       try { 
        runnable.run(); 
       } finally { 
        if (previous == null) { 
         MDC.clear(); 
        } else { 
         MDC.setContextMap(previous); 
        } 
       } 
      } 
     }; 
    } 
} 

MdcThreadPoolFactory:

package com.mypackage.concurrent 

import org.apache.camel.impl.DefaultThreadPoolFactory 
import org.apache.camel.spi.ThreadPoolProfile 
import org.apache.camel.util.concurrent.SizedScheduledExecutorService 
import org.slf4j.MDC; 

import java.util.Map; 
import java.util.concurrent.*; 

public class MdcThreadPoolFactory extends DefaultThreadPoolFactory { 

    @SuppressWarnings("unchecked") 
    private Map<String, Object> getContextForTask() { 
     return MDC.getCopyOfContextMap(); 
    } 


    public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut, 
              RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException { 

      // the core pool size must be 0 or higher 
      if (corePoolSize < 0) { 
       throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize); 
      } 

      // validate max >= core 
      if (maxPoolSize < corePoolSize) { 
       throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize); 
      } 

      BlockingQueue<Runnable> workQueue; 
      if (corePoolSize == 0 && maxQueueSize <= 0) { 
       // use a synchronous queue for direct-handover (no tasks stored on the queue) 
       workQueue = new SynchronousQueue<Runnable>(); 
       // and force 1 as pool size to be able to create the thread pool by the JDK 
       corePoolSize = 1; 
       maxPoolSize = 1; 
      } else if (maxQueueSize <= 0) { 
       // use a synchronous queue for direct-handover (no tasks stored on the queue) 
       workQueue = new SynchronousQueue<Runnable>(); 
      } else { 
       // bounded task queue to store tasks on the queue 
       workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize); 
      } 

      ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue); 
      answer.setThreadFactory(threadFactory); 
      answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut); 
      if (rejectedExecutionHandler == null) { 
       rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 
      } 
      answer.setRejectedExecutionHandler(rejectedExecutionHandler); 
      return answer; 
     } 

     @Override 
     public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { 
      RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); 
      if (rejectedExecutionHandler == null) { 
       rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 
      } 

      ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler); 
      //JDK7: answer.setRemoveOnCancelPolicy(true); 

      // need to wrap the thread pool in a sized to guard against the problem that the 
      // JDK created thread pool has an unbounded queue (see class javadoc), which mean 
      // we could potentially keep adding tasks, and run out of memory. 
      if (profile.getMaxPoolSize() > 0) { 
       return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); 
      } else { 
       return answer; 
      } 
     } 
} 

最後,bean實例:

<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/> 
+1

爲了得到這個駱駝2.16.3工作由org.apache.camel.util.component.AbstractApiProducer要求的新主題。進程(Exchange,AsyncCallback)我也不得不重寫java.util.concurrent.ScheduledThreadPoolExecutor.submit(Runnable) –

+1

cool。當我能夠升級我們的駱駝 –

+1

我隨後改變了這個以覆蓋public Sc​​heduledFuture schedule(Runnable command,long delay,TimeUnit unit),這是submit()和execute()委託給(在JDK8中最小)。我認爲這會讓核心駱駝很好的投入。如果我找時間瞭解這一點,您可以向Apache簽署版權(或需要進行任何許可工作)? –