2012-05-15 54 views
17

由於WS客戶端服務和端口的初始化需要很長時間,我喜歡在啓動時初始化它們並重新使用端口的相同實例。 Initalization會是這個樣子:這是JAX-WS客戶端調用線程安全嗎?

private static RequestContext requestContext = null; 

static 
{ 
    MyService service = new MyService(); 
    MyPort myPort = service.getMyServicePort(); 

    Map<String, Object> requestContextMap = ((BindingProvider) myPort).getRequestContext(); 
    requestContextMap = ((BindingProvider)myPort).getRequestContext(); 
    requestContextMap.put(BindingProvider.USERNAME_PROPERTY, uName); 
    requestContextMap.put(BindingProvider.PASSWORD_PROPERTY, pWord); 

    rc = new RequestContext(); 
    rc.setApplication("test"); 
    rc.setUserId("test"); 
} 

在我班上的通話地方:

myPort.someFunctionCall(requestContext, "someValue"); 

我的問題:請問這個調用是線程安全的?

強尼

+3

是否已由在這裏找到答案:http://stackoverflow.com/questions/4385204/are-jax-ws-clients-thread-safe – kyiu

+0

KHY喜, 感謝您的快速回復。我看到了這個線程。我的問題是,我缺乏任何(官方)聲明什麼是線程安全或不(服務/端口/等)。我的用例也不同於其他線程。 Jonny – user871611

+1

這裏是我在CXF網站上找到的答案:https://cwiki.apache.org/CXF/faq.html#FAQ-AreJAXWSclientproxiesthreadsafe%253F – kyiu

回答

19

按照CXF FAQ

是JAX-WS客戶端代理線程安全的?

官方JAX-WS答案:號 按照JAX-WS規範,客戶端代理不是線程安全的。 要編寫可移植代碼,您應該將它們視爲非線程安全,並同步訪問或使用實例或類似池。

CXF回答: CXF代理對於很多用例都是線程安全的。該 的例外是:

  • ((BindingProvider)proxy).getRequestContext()使用 - 每個JAX-WS規範, 請求上下文每個實例。因此,在那裏設置的任何東西都會影響其他線程上的請求。隨着CXF,你可以這樣做:

    ((BindingProvider)proxy).getRequestContext().put("thread.local.request.context","true"); 
    

    和未來的呼叫getRequestContext()將使用一個線程 本地請求上下文。這使得請求上下文成爲 線程安全。 (注:響應上下文總是線程局部在CXF)

  • 設置在導管 - 如果使用代碼或配置直接 操縱導管(如設置TLS設定或類似),這些 不是線程安全。導管是每個實例,因此這些設置將被共享。另外,如果使用FailoverFeature和LoadBalanceFeatures,則會立即更換導管。因此,在使用 設置線程之前,設置在導管上的設置可能會丟失。

  • 會話支持 - 如果您打開會話支持(請參閱 jaxws規範),會話cookie將存儲在管道中。因此, 將落入管道設置的上述規則,因此跨線程共享 。
  • WS-Security令牌 - 如果使用WS-SecureConversation或WS-Trust,則將檢索到的令牌緩存在端點/代理中,以避免額外(並且昂貴)調用STS以獲取令牌。因此,多個線程將共享令牌。如果每個線程有不同的安全憑證或要求,則需要使用單獨的代理 實例。

對於導管問題,您可以安裝新的 ConduitSelector,它使用本地或類似的線程。雖然這有點 。

對於大多數「簡單」用例,您可以在多個 線程上使用CXF代理。以上概述了其他方法的解決方法。

3

一般來說,沒有。

按照CXF FAQ http://cxf.apache.org/faq.html#FAQ-AreJAX-WSclientproxiesthreadsafe?

官方JAX-WS答案:號按照JAX-WS規範,客戶 代理不是線程安全的。要編寫可移植代碼,您應該將它們視爲非線程安全並同步訪問,或使用實例池或類似實例 。

CXF回答: CXF代理對於很多用例都是線程安全的。

有關例外列表,請參閱FAQ。

3

正如你從上面看到的答案,JAX-WS客戶端代理不是線程安全的,所以我只是想分享我的實現,其他人將緩存客戶端代理。 我實際上遇到了同樣的問題,並決定創建一個可以緩存JAX-WS客戶端代理的spring bean。你可以看到更多細節http://programtalk.com/java/using-spring-and-scheduler-to-store/

import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

import javax.annotation.PostConstruct; 

import org.apache.commons.lang3.concurrent.BasicThreadFactory; 
import org.apache.logging.log4j.Logger; 
import org.springframework.stereotype.Component; 

/** 
* This keeps the cache of MAX_CUNCURRENT_THREADS number of 
* appConnections and tries to shares them equally amongst the threads. All the 
* connections are created right at the start and if an error occurs then the 
* cache is created again. 
* 
*/ 
/* 
* 
* Are JAX-WS client proxies thread safe? <br/> According to the JAX-WS spec, 
* the client proxies are NOT thread safe. To write portable code, you should 
* treat them as non-thread safe and synchronize access or use a pool of 
* instances or similar. 
* 
*/ 
@Component 
public class AppConnectionCache { 

private static final Logger logger = org.apache.logging.log4j.LogManager.getLogger(AppConnectionCache.class); 

private final Map<Integer, MyService> connectionCache = new ConcurrentHashMap<Integer, MyService>(); 

private int cachedConnectionId = 1; 

private static final int MAX_CUNCURRENT_THREADS = 20; 

private ScheduledExecutorService scheduler; 

private boolean forceRecaching = true; // first time cache 

@PostConstruct 
public void init() { 
    logger.info("starting appConnectionCache"); 
    logger.info("start caching connections"); ;; 
    BasicThreadFactory factory = new BasicThreadFactory.Builder() 
    .namingPattern("appconnectioncache-scheduler-thread-%d").build(); 
    scheduler = Executors.newScheduledThreadPool(1, factory); 

    scheduler.scheduleAtFixedRate(new Runnable() { 
    @Override 
    public void run() { 
    initializeCache(); 
    } 

    }, 0, 10, TimeUnit.MINUTES); 

} 

public void destroy() { 
    scheduler.shutdownNow(); 
} 

private void initializeCache() { 
    if (!forceRecaching) { 
    return; 
    } 
    try { 
    loadCache(); 
    forceRecaching = false; // this flag is used for initializing 
    logger.info("connections creation finished successfully!"); 
    } catch (MyAppException e) { 
    logger.error("error while initializing the cache"); 
    } 
} 

private void loadCache() throws MyAppException { 
    logger.info("create and cache appservice connections"); 
    for (int i = 0; i < MAX_CUNCURRENT_THREADS; i++) { 
    tryConnect(i, true); 
    } 
} 

public MyPort getMyPort() throws MyAppException { 
    if (cachedConnectionId++ == MAX_CUNCURRENT_THREADS) { 
    cachedConnectionId = 1; 
    } 
    return tryConnect(cachedConnectionId, forceRecaching); 
} 

private MyPort tryConnect(int threadNum, boolean forceConnect) throws MyAppException { 
    boolean connect = true; 
    int tryNum = 0; 
    MyPort app = null; 
    while (connect && !Thread.currentThread().isInterrupted()) { 
    try { 
    app = doConnect(threadNum, forceConnect); 
    connect = false; 
    } catch (Exception e) { 
    tryNum = tryReconnect(tryNum, e); 
    } 
    } 
    return app; 
} 

private int tryReconnect(int tryNum, Exception e) throws MyAppException { 
    logger.warn(Thread.currentThread().getName() + " appservice service not available! : " + e); 
    // try 10 times, if 
    if (tryNum++ < 10) { 
    try { 
    logger.warn(Thread.currentThread().getName() + " wait 1 second"); 
    Thread.sleep(1000); 
    } catch (InterruptedException f) { 
    // restore interrupt 
    Thread.currentThread().interrupt(); 
    } 
    } else { 
    logger.warn(" appservice could not connect, number of times tried: " + (tryNum - 1)); 
    this.forceRecaching = true; 
    throw new MyAppException(e); 
    } 
    logger.info(" try reconnect number: " + tryNum); 
    return tryNum; 
} 

private MyPort doConnect(int threadNum, boolean forceConnect) throws InterruptedException { 
    MyService service = connectionCache.get(threadNum); 
    if (service == null || forceConnect) { 
    logger.info("app service connects : " + (threadNum + 1)); 
    service = new MyService(); 
    connectionCache.put(threadNum, service); 
    logger.info("connect done for " + (threadNum + 1)); 
    } 
    return service.getAppPort(); 
} 
} 
0

這個一般的解決辦法是在一個池使用多個客戶端對象,然後使用代理充當門面。

import org.apache.commons.pool2.BasePooledObjectFactory; 
import org.apache.commons.pool2.PooledObject; 
import org.apache.commons.pool2.impl.DefaultPooledObject; 
import org.apache.commons.pool2.impl.GenericObjectPool; 

import java.lang.reflect.InvocationHandler; 
import java.lang.reflect.Method; 
import java.lang.reflect.Proxy; 

class ServiceObjectPool<T> extends GenericObjectPool<T> { 
     public ServiceObjectPool(java.util.function.Supplier<T> factory) { 
      super(new BasePooledObjectFactory<T>() { 
       @Override 
       public T create() throws Exception { 
        return factory.get(); 
       } 
      @Override 
      public PooledObject<T> wrap(T obj) { 
       return new DefaultPooledObject<>(obj); 
      } 
     }); 
    } 

    public static class PooledServiceProxy<T> implements InvocationHandler { 
     private ServiceObjectPool<T> pool; 

     public PooledServiceProxy(ServiceObjectPool<T> pool) { 
      this.pool = pool; 
     } 


     @Override 
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 
      T t = null; 
      try { 
       t = this.pool.borrowObject(); 
       return method.invoke(t, args); 
      } finally { 
       if (t != null) 
        this.pool.returnObject(t); 
      } 
     } 
    } 

    @SuppressWarnings("unchecked") 
    public T getProxy(Class<? super T> interfaceType) { 
     PooledServiceProxy<T> handler = new PooledServiceProxy<>(this); 
     return (T) Proxy.newProxyInstance(interfaceType.getClassLoader(), 
              new Class<?>[]{interfaceType}, handler); 
    } 
} 

要使用代理:

ServiceObjectPool<SomeNonThreadSafeService> servicePool = new ServiceObjectPool<>(createSomeNonThreadSafeService); 
nowSafeService = servicePool .getProxy(SomeNonThreadSafeService.class);