2014-07-11 79 views
1

我有使用Cloudhopper 5.0.6庫來保存SMPP連接(3.4版本)和發送或接收PDU的項目。 我需要修改默認PDUResopnse因此,定製的PDU處理通過擴大DefaultSmppSessionHandler組織方式如下:用cloudhopper發送異步PDU響應

​​

它正常工作,只爲目的:

  1. 更改結果的命令狀態或一些PDU域/ TLV params
  2. 不發送任何對當前PDU請求的響應。要做到這一點firePduRequestReceived方法必須返回null

現在我需要添加延遲的PDU響應發送,這裏的問題開始了。 我的第一次嘗試是這樣的:加入

@Override 
    public PduResponse firePduRequestReceived(PduRequest pduRequest) { 
     PduRequestHandler pduReqHandler = pduRequestHandler; 
     PduResponse resultPduResponse = pduRequest.createResponse(); 
     return processDefaultPduResponse(resultPduResponse); 
    } 

    private PduResponse processDefaultPduResponse(PduResponse pduResponse) { 
     try { 
       Thread.sleep(responseDelay); 
      } catch (InterruptedException e) { 
       throw new RuntimeException("Response delay interrupted", e); 
      } 
     return pduResponse; 
    } 

睡眠當前線程的延遲發送響應,所以調用線程被關押了responseDelay毫秒。如果在本次會話中沒有更多的請求在同一時間到來,這可以正常工作。添加在同一會議上,一些SUBMIT_SM負荷引起的錯誤:

com.cloudhopper.smpp.type.SmppTimeoutException: Unable to get response within [10000 ms] 
    at com.cloudhopper.smpp.impl.DefaultSmppSession.sendRequestAndGetResponse(DefaultSmppSession.java:471) ~[ch-smpp-5.0.6.jar:5.0.6] 
    at com.cloudhopper.smpp.impl.DefaultSmppSession.enquireLink(DefaultSmppSession.java:439) ~[ch-smpp-5.0.6.jar:5.0.6] 

在coudhopper源搜索後,我發現這個問題,它是在DefaultSmppSession類的任何行動excusive窗鎖:

future = sendWindow.offer(pdu.getSequenceNumber(), pdu, timeoutMillis, configuration.getRequestExpiryTimeout(), synchronous); 

的問題是在com.cloudhopper.commons.util.windowing.Window使用排他鎖執行任何操作的類,因此無法在一個線程中返回PRUResponse並從另一個線程發出請求之前等待。

接着嘗試圍繞返回作爲請求處理(不發送任何響應下降請求),並用com.cloudhopper.smpp.SmppSession.sendResponsePdu(pduResponse)方法手動發送PDUResponse。這種方法適用於一段時間,但始終與下面的異常結束:

com.cloudhopper.smpp.type.SmppChannelException: null 
    at com.cloudhopper.smpp.impl.DefaultSmppSession.sendResponsePdu(DefaultSmppSession.java:581) ~[ch-smpp-5.0.6.jar:5.0.6] 
    at com.svzn.autotest.smppclient.impl.cloudhopper.SmppSendingManager.sendPduResponse(SmppSendingManager.java:84) ~[smpp-client-1.0.1.jar:na] 
    at com.svzn.autotest.smppclient.impl.cloudhopper.util.SendPduCommand.sendPduResponse(SendPduCommand.java:80) [smpp-client-1.0.1.jar:na] 
    at com.svzn.autotest.smppclient.impl.cloudhopper.SmppClientImpl.sendPduResponse(SmppClientImpl.java:91) [smpp-client-1.0.1.jar:na] 
    at com.svzn.autotest.example.testng_aggr.lib.smpp.event.BaseEventProcessor$1.run(BaseEventProcessor.java:62) [test-classes/:na] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37] 
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) [na:1.6.0_37] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:138) [na:1.6.0_37] 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37] 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) [na:1.6.0_37] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37] 
    at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37] 
Caused by: org.jboss.netty.handler.timeout.WriteTimeoutException: null 
    at org.jboss.netty.handler.timeout.WriteTimeoutHandler.<clinit>(WriteTimeoutHandler.java:79) ~[netty-3.9.0.Final.jar:na] 
    at com.cloudhopper.smpp.impl.DefaultSmppClient.createSession(DefaultSmppClient.java:259) ~[ch-smpp-5.0.6.jar:5.0.6] 
    at com.cloudhopper.smpp.impl.DefaultSmppClient.doOpen(DefaultSmppClient.java:226) ~[ch-smpp-5.0.6.jar:5.0.6] 
    at com.cloudhopper.smpp.impl.DefaultSmppClient.bind(DefaultSmppClient.java:193) ~[ch-smpp-5.0.6.jar:5.0.6] 
    at com.svzn.autotest.smppclient.impl.cloudhopper.tasks.RebindTask.run(RebindTask.java:37) ~[smpp-client-1.0.1.jar:na] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37] 
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) [na:1.6.0_37] 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) [na:1.6.0_37] 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37] 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) [na:1.6.0_37] 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoo 

lExecutor.java:204) [na:1.6.0_37 

] 
    ... 3 common frames omitted 

不知道如何解決這個錯誤,或發送asynchronus PDUResponse在同一會話另一種方式。 你有什麼想法嗎?

回答

2

最後,我發現了這個問題。問題出現在不正確的同步塊中,這阻止了並行asyncronus事件處理(發送pdu響應)並以通常的方式處理請求和響應而沒有延遲。

及其完全確定調用 com.cloudhopper.smpp.SmppSession.sendResponsePdu(pduResponse)在一個線程 方法以及通過從另一個延伸DefaultSmppSessionHandler與請求和響應沃金。一切都將在同一會話中處理。

UPDATE: 下面是我用它來處理PDU請求的執行:

public class SmppSessionHandlerController extends DefaultSmppSessionHandler { 
    private static final Logger log = LoggerFactory.getLogger(SmppSessionHandlerController.class); 

    private volatile PduHandler pduHandler; 
private PduResponseHandler pduResponseHandler; 
private PduRequestHandler pduRequestHandler; 

public SmppSessionHandlerController() { 
    super(log); 
} 

public PduHandler getPduHandler() { 
    return pduHandler; 
} 

public void setPduHandler(PduHandler pduHandler) { 
    this.pduHandler = pduHandler; 
} 

public PduResponseHandler getPduResponseHandler() { 
    return pduResponseHandler; 
} 

public void setPduResponseHandler(PduResponseHandler pduResponseHandler) { 
    this.pduResponseHandler = pduResponseHandler; 
} 

public PduRequestHandler getPduRequestHandler() { 
    return pduRequestHandler; 
} 

public void setPduRequestHandler(PduRequestHandler pduRequestHandler) { 
    this.pduRequestHandler = pduRequestHandler; 
} 

@Override 
public void fireExpectedPduResponseReceived(PduAsyncResponse pduAsyncResponse) { 
    log.trace("Handling response PDU: {}", pduAsyncResponse); 
    pduAsyncResponse.getResponse().setReferenceObject(pduAsyncResponse.getRequest().getReferenceObject()); 
    processPduResponse(pduAsyncResponse.getResponse()); 
} 


@Override 
public void fireUnexpectedPduResponseReceived(PduResponse pduResponse) { 
    log.warn("Handling unexpected response PDU: {}", pduResponse); 
    processPduResponse(pduResponse); 
} 

@Override 
public boolean firePduReceived(Pdu pdu) { 
    PduHandler currPduHandler = pduHandler; 
    if (currPduHandler != null) { 
     SmppPdu smppPdu = PduToApiConverter.convertToApiObject(pdu); 
     currPduHandler.handlePduReceived(smppPdu); 
    } 
    // default handling is to accept pdu for processing up chain 
    return true; 
} 

public void firePduRequestExpired(PduRequest pduRequest) { 
    super.firePduRequestExpired(pduRequest); 
} 

private void processPduResponse(PduResponse pduResponse) { 
    HandlersContextHelper referenceObj = (HandlersContextHelper) pduResponse.getReferenceObject(); 
    if (referenceObj != null) { 
     referenceObj.getSequenceIdHolder().addReceivedSequenceId(pduResponse.getSequenceNumber()); 
    } 
    PduResponseHandler pduRespHandler = pduResponseHandler; 
    if (pduRespHandler != null) { 
     SmppPduResponse smppPduResponse = PduToApiConverter.convertToApiResponse(pduResponse); 
     if (smppPduResponse != null) { 
      pduRespHandler.handlePduResponse(smppPduResponse); 
     } 
    } 
    if (referenceObj != null) { 
     referenceObj.getSequenceIdHolder().checkSentAndReceivedClosed(); 
    } 
} 

@Override 
public PduResponse firePduRequestReceived(PduRequest pduRequest) { 
    PduRequestHandler pduReqHandler = pduRequestHandler; 
    PduResponse resultPduResponse = pduRequest.createResponse(); 
    if (pduReqHandler == null) { 
     return resultPduResponse; 
    } 
    PduResponse defaultPduResponse = pduRequest.createResponse(); 
    SmppPduRequest smppPduRequest = PduToApiConverter.convertToApiRequest(pduRequest); 
    SmppPduResponse defaultSmppPduResponse = PduToApiConverter.convertToApiResponse(defaultPduResponse); 
    if (smppPduRequest == null || defaultSmppPduResponse == null) { 
     return resultPduResponse; 
    } 
    SmppPduResponse resultSmppPduResponse = pduReqHandler.handlePduRequest(smppPduRequest, defaultSmppPduResponse); 
    if (resultSmppPduResponse == null) { 
     return null; 
    } 
    PduResponse convertedPduResponse = ApiToPduConverter.convertToPduResponse(resultSmppPduResponse); 
    if (convertedPduResponse == null) { 
     return resultPduResponse; 
    } 
    if (!resultPduResponse.getClass().isAssignableFrom(convertedPduResponse.getClass())) { 
     return resultPduResponse; 
    } 
    return convertedPduResponse; 
} 
} 

添加至極至clowdhopper SMPP客戶端這樣的

SmppSession session = smppClient.bind(SmppSessionConfiguration_instance, SmppSessionHandlerController_instance); 

我定義的自定義接口,PduHandler PduRequestHandler和PduResponseHandler處理smpp事件的特殊情況,您可以看到SmppSessionHandlerController只是將調用委託給其中的一個。

使用方法

public PduResponse firePduRequestReceived(PduRequest pduRequest) 

defiend在SmppSessionHandler您可以發送同步模式下,你想要的任何迴應。如果您想以異步模式執行此操作,請返回null pduResponse並使用SmppSession.sendResponsePdu(Pdu)代替當前或其他線程。

+0

對不起,我有問題。可以說在使用firePduRequestReceived方法時,發生了一些異常(可以說在保存到數據庫時)。我想強制服務器稍後重新發送給我。我應該如何設置pduresponse或拋出異常?你能幫忙嗎? – qasanov

+0

你不會拋出異常,否則將由netty處理,服務器將不會以正確的方式通知。您應該發送正確的pdu響應以通知服務器重新發送是必需的。響應格式可以是特定的,並取決於您的服務器。 – user3007501