我想通了,並想發佈我所做的事情,以防止他人受益。請注意,我使用JDK 6/camel2.13.2
駱駝有一個使用DefaultThreadPoolFactory
一個DefaultExecutorServiceManager
。我將默認工廠擴展爲MdcThreadPoolFactory
DefaultThreadPoolFactory
具有生成RejectableThreadPoolExecutor
s和RejectableScheduledThreadPoolExecutor
s的方法。我將這兩個擴展爲Mdc *版本,覆蓋方法以包裝Runnable並在線程之間切換MDC信息(如我原始問題中的鏈接所指定的那樣)。
package com.mypackage.concurrent
import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
/**
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
* <p/>
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
* <p/>
* Created by broda20.
* Date: 10/29/15
*/
public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor {
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return MDC.getCopyOfContextMap();
}
public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@Override
public void execute(Runnable command) {
super.execute(wrap(command, getContextForTask()));
}
public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
return new Runnable() {
@Override
public void run() {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
}
};
}
}
MdcScheduledThreadPoolExecutor:
我在我的應用程序配置由駱駝自動拾取和使用的ExecutorServiceManager
MdcThreadPoolExecutor創建MdcThreadPoolFactory
的bean實例
package com.mypackage.concurrent
import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
/**
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
* <p/>
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
* <p/>
* Created by broda20.
* Date: 10/29/15
*/
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor {
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return MDC.getCopyOfContextMap();
}
public MdcScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, handler);
}
public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, threadFactory, handler);
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@Override
public void execute(Runnable command) {
super.execute(wrap(command, getContextForTask()));
}
public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
return new Runnable() {
@Override
public void run() {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
}
};
}
}
MdcThreadPoolFactory:
package com.mypackage.concurrent
import org.apache.camel.impl.DefaultThreadPoolFactory
import org.apache.camel.spi.ThreadPoolProfile
import org.apache.camel.util.concurrent.SizedScheduledExecutorService
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
public class MdcThreadPoolFactory extends DefaultThreadPoolFactory {
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return MDC.getCopyOfContextMap();
}
public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut,
RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {
// the core pool size must be 0 or higher
if (corePoolSize < 0) {
throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
}
// validate max >= core
if (maxPoolSize < corePoolSize) {
throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
}
BlockingQueue<Runnable> workQueue;
if (corePoolSize == 0 && maxQueueSize <= 0) {
// use a synchronous queue for direct-handover (no tasks stored on the queue)
workQueue = new SynchronousQueue<Runnable>();
// and force 1 as pool size to be able to create the thread pool by the JDK
corePoolSize = 1;
maxPoolSize = 1;
} else if (maxQueueSize <= 0) {
// use a synchronous queue for direct-handover (no tasks stored on the queue)
workQueue = new SynchronousQueue<Runnable>();
} else {
// bounded task queue to store tasks on the queue
workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
answer.setThreadFactory(threadFactory);
answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
if (rejectedExecutionHandler == null) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
answer.setRejectedExecutionHandler(rejectedExecutionHandler);
return answer;
}
@Override
public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
if (rejectedExecutionHandler == null) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
//JDK7: answer.setRemoveOnCancelPolicy(true);
// need to wrap the thread pool in a sized to guard against the problem that the
// JDK created thread pool has an unbounded queue (see class javadoc), which mean
// we could potentially keep adding tasks, and run out of memory.
if (profile.getMaxPoolSize() > 0) {
return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
} else {
return answer;
}
}
}
最後,bean實例:
<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/>
爲了得到這個駱駝2.16.3工作由org.apache.camel.util.component.AbstractApiProducer要求的新主題。進程(Exchange,AsyncCallback)我也不得不重寫java.util.concurrent.ScheduledThreadPoolExecutor.submit(Runnable) –
cool。當我能夠升級我們的駱駝 –
我隨後改變了這個以覆蓋public ScheduledFuture > schedule(Runnable command,long delay,TimeUnit unit),這是submit()和execute()委託給(在JDK8中最小)。我認爲這會讓核心駱駝很好的投入。如果我找時間瞭解這一點,您可以向Apache簽署版權(或需要進行任何許可工作)? –