2014-07-03 108 views
0

我正在構建一個Web服務,它返回一個大的「結果集」作爲List<List<String>>。由於我想避免將XML中的結果集表示爲SOAP消息的一部分,因此我現在將它作爲附件添加並使用簡單的自定義編碼。結果集可能非常大,如數百GB。我想避免把東西留在記憶中(因此我使用Iterator<List<String>>而不是List<List<String>>)。另外我希望能夠「流水線」或流式傳輸數據。特別是,我想要實現的是客戶端可以開始處理第一個結果,而服務器還沒有寫完。其實,我不完全確定這是否可能。實際上它目前沒有發生。流式傳輸大型SOAP附件

這是我的服務實現

@MTOM // enable Message Transmission Optimization Mechanism (MTOM) at the server 
@WebService (endpointInterface = "com.SnafucatorWS") 
@StreamingAttachment(parseEagerly = true, memoryThreshold = 4000000L) 
public class SnafucatorWSImpl implements SnafucatorWS { 
    ... 
    @Override 
    public GetNodesResponse getNodes() { 
    Iterator<List<String>> result = impl.getNodes();  
    return new GetNodesResponse(result); 
    } 
} 

如何創建客戶端連接

private SnafucatorWS connect() { 
    if (serviceUrl == null) { 
     throw new UncheckedConnectionException("The service url of the snafucator webservice has not been set"); 
    } 
    Service result = null; 
    try { 
     result = Service.create(
     new URL(serviceUrl + "?wsdl"), 
     new QName("http://com/", "SnafucatorWSImplService")); 
     } catch (MalformedURLException e) { 
     LOG.fatal("Could not create web service endpoint.", e); 
     } 
     SnafucatorWS port = result.getPort(SnafucatorWS.class); 
     // enable Message Transmission Optimization Mechanism (MTOM) at the client (for transmission of binary data) 
     BindingProvider bindingProvider = (BindingProvider)port; 
     SOAPBinding soapBinding = (SOAPBinding)bindingProvider.getBinding(); 
     soapBinding.setMTOMEnabled(true);  
     return port; 
    } 

這是我的反應和編碼代碼和數據解碼:

@XmlAccessorType (XmlAccessType.NONE) 
@XmlRootElement (name = "getNodesResponse") 
@XmlType (name = "getNodesResponseType") 
public class GetNodesResponse { 
    private static final ExecutorService POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 
    private DataHandler results; 
    private BlockingQueue<List<String>> resultSet; 
    private volatile boolean hasNext; 
    private final CountDownLatch streamOpened = new CountDownLatch(1); 

    ... 

    @XmlElement (required = true) 
    @XmlMimeType ("*/*") 
    public DataHandler getResults() { 
    return results; 
    } 

void setResults(DataHandler aDataHandler) { 
    results = aDataHandler; 
    resultSet = new ArrayBlockingQueue<List<String>>(100);  
    StreamingDataHandler dataHandler = (StreamingDataHandler)results; 
    try { 
    parse(dataHandler, resultSet); 
    } catch (InterruptedException e) { 
    Thread.currentThread().interrupt(); 
    } 
} 

/** 
    * @return true, if the result set serves more elements 
    * @throws InterruptedException if the current thread was interrupted 
    */ 
public boolean hasNext() throws InterruptedException { 
    streamOpened.await(); 
    return (hasNext || !resultSet.isEmpty()); 
} 

/** 
    * Results can be accessed by successively calling 
    * {@link java.util.concurrent.BlockingQueue#take()} as long as {@link #hasNext()} returns true. Please note, however, 
    * that an invalid result stream may cause {@link #hasNext()} to return true erroneously, hence the queue might block indefinitely. 
    * 
    * @return the result set as a blocking queue 
    */ 
public BlockingQueue<List<String>> getResultSet() { 
    return resultSet; 
} 

private DataHandler encode(Iterator<List<String>> aResults) { 
    assert (aResults != null); 
    final PipedOutputStream out = new PipedOutputStream(); 
    DataHandler dh = new DataHandler(new StreamDataSource(out, "*/*")); 
    Encoder encoder = new Encoder(out, aResults); 
    @SuppressWarnings("unused") 
    Future<?> future = POOL.submit(encoder); 
    return dh; 
} 

private void parse(StreamingDataHandler dataHandler, final Queue<List<String>> aResultSet) 
    throws InterruptedException { 
    Decoder decoder = new Decoder(dataHandler, aResultSet); 
    @SuppressWarnings("unused") 
    Future<?> future = POOL.submit(decoder); 
} 

... 


private static final class StreamDataSource implements DataSource { 
    private final String name = UUID.randomUUID().toString(); 
    private final InputStream in; 
    private final String mimeType; 

    private StreamDataSource(PipedOutputStream aOut, String aMimeType) { 
    try { 
     in = new PipedInputStream(aOut); 
    } catch (IOException e) { 
     throw new RuntimeException("Could not create input stream.", e); 
    } 
    mimeType = aMimeType; 
    } 

    @Override public String getName() { return name; } 
    @Override public String getContentType() { return mimeType; } 

    /** 
    * {@inheritDoc} 
    * 
    * This implementation violates the specification in that it is destructive. Only the first call will return an 
    * appropriate input stream. 
    */ 
    @Override public InputStream getInputStream() { return in; } 

    @Override public OutputStream getOutputStream() { throw new UnsupportedOperationException(); } 
} 

/** 
* Decodes the contents of an input stream as written by the Encoder and writes 
* parsed rows to a {@link java.util.Queue}. 
*/ 
private class Decoder implements Runnable { 
    private final StreamingDataHandler dataHandler; 
    private final Queue<List<String>> resultSet; 

    public Decoder(StreamingDataHandler aDataHandler, Queue<List<String>> aResultSet) { 
    dataHandler = aDataHandler; 
    resultSet = aResultSet; 
    } 

    @Override 
    public void run() { 
    try { 
     InputStream in = dataHandler.getInputStream(); 
     byte[] lenBytes = new byte[4]; 
     List<String> row; 
     int rowLen; 
     // read the first row's length 
     in.read(lenBytes); 
     rowLen = ByteBuffer.wrap(lenBytes).getInt(); 
     if (rowLen == 0) { 
     hasNext = false; 
     streamOpened.countDown(); 
     } else { 
     hasNext = true; 
     streamOpened.countDown(); // now the client can start processing/waiting for rows 
     do { 
      rowLen = ByteBuffer.wrap(lenBytes).getInt(); 
      if (rowLen == 0) { break; } 
      row = new ArrayList<String>(rowLen);  // read row length 
      for (int col = 0; col < rowLen; col++) { // for each column 
      in.read(lenBytes);      // read the value length 
      int valLen = ByteBuffer.wrap(lenBytes).getInt(); 
      byte[] valBytes = new byte[valLen];  // allocate a buffer of appropriate size 
      in.read(valBytes); 
      row.add(new String(valBytes)); 
      } 
      resultSet.add(row); 
     } while (in.read(lenBytes) > 0); 
     hasNext = false; 
     } 
    } catch (IOException e) { 
     throw new RuntimeException(e); 
    } finally { 
     hasNext = false; 
     try { 
     dataHandler.close(); 
     } catch (IOException e) { 
     throw new RuntimeException("Could not close data handler.", e); 
     } 
    } 
    } 
} 

/** 
* Encodes the given result set and writes the result to an output stream. 
*/ 
private class Encoder implements Runnable { 
    private final OutputStream out; 
    private final Iterator<List<String>> iterator; 

    public Encoder(OutputStream aOut, Iterator<List<String>> aResults) { 
    out = aOut; 
    iterator = aResults; 
    } 

    @Override 
    public void run() { 
    try { 
     while (iterator.hasNext()) { 
     List<String> row = iterator.next(); 
     out.write(ByteBuffer.allocate(4).putInt(row.size()).array()); 
     for (String s : row) { 
      byte[] bytes = s.getBytes(); 
      out.write(ByteBuffer.allocate(4).putInt(s.length()).array()); // write size of column in bytes 
      out.write(bytes);            // write column value    
     } 
     } 
    } catch (IOException e) { 
     throw new RuntimeException("Could not write data.", e); 
    } finally { 
     try { 
     out.close(); 
     } catch (IOException e) { 
     throw new RuntimeException("Could not close data handler.", e); 
     } 
    } 
    } 
} 

Web服務返回包含完全預期結果集的響應。但是,當我調試時,我發現客戶端只能在服務關閉輸出流後才能開始解碼。我的目標是解碼器開始工作(在客戶端),而編碼器任務仍在寫入服務器端的輸出流。 也許這不能修復,但我認爲這將是一個很好的性能改進。

在此先感謝您的任何建議如何解決這個問題。

回答

0

問題已修復。 除了我的代碼中的一些鎖定錯誤(與原始文章無關)之外,我有錯誤的註釋和客戶端配置。這一切是如何工作的罰款:

服務接口

@WebService 
public interface SnafucatorWS { 
    @WebMethod (operationName = "getNodes") 
    public GetNodesResponse getNodes(); 
} 

服務實現

@MTOM // enable Message Transmission Optimization Mechanism (MTOM) at the server 
@WebService (endpointInterface = "com.snafu.SnafucatorWS") 
@StreamingAttachment(parseEagerly = false, dir = "/tmp", memoryThreshold = 4000000L) 
public class SnafucatorWSImpl implements SnafucatorWS { 
    @Override public GetNodesResponse getNodes() { ... } 
} 

在客戶端

public final class SnafucatorWSClient implements SnafucatorWS { 
    private SnafucatorWS connect() { 
    Service result = null; 
    try { 
     result = Service.create(
     new URL(serviceUrl + "?wsdl"), 
     new QName("http://snafu.com/", "SnafucatorWSImplService")); 
    } catch (MalformedURLException e) { 
     LOG.fatal("Could not create web service endpoint.", e); 
    } 
    // load off attachment to the file system when exceeding 4MB in size 
    StreamingAttachmentFeature stf = new StreamingAttachmentFeature("/tmp", false, 4000000L); 
    SnafucatorWS port = result.getPort(SnafucatorWS.class, new MTOMFeature(), stf); 
    // enable Message Transmission Optimization Mechanism (MTOM) at the client (for transmission of binary data) 
    BindingProvider bindingProvider = (BindingProvider)port; 
    SOAPBinding soapBinding = (SOAPBinding)bindingProvider.getBinding(); 
    soapBinding.setMTOMEnabled(true); 
    return port; 
    } 
} 

結果類型的連接配置

現在
@StreamingAttachment(parseEagerly=false, memoryThreshold=40000L) 
@XmlAccessorType (XmlAccessType.NONE) 
@XmlRootElement (name = "getNodesResponse") 
@XmlType (name = "getNodesResponseType") 
public class GetNodesResponse { 
    @XmlElement (required = true) 
    @XmlMimeType("application/octet-stream") 
    DataHandler getResults() { 
    return results; 
    } 

    void setResults(DataHandler aDataHandler) { ... /* do the parallel parsing here */ } 
} 

,客戶端解析結果的服務器已關閉流之前。

+0

我實現了一個小的DataHandler,做同樣的任意協議緩衝區的流:https://bintray.com/avidd/maven/StreamingService/view/read – avidD