我正在構建一個Web服務,它返回一個大的「結果集」作爲List<List<String>>
。由於我想避免將XML中的結果集表示爲SOAP消息的一部分,因此我現在將它作爲附件添加並使用簡單的自定義編碼。結果集可能非常大,如數百GB。我想避免把東西留在記憶中(因此我使用Iterator<List<String>>
而不是List<List<String>>
)。另外我希望能夠「流水線」或流式傳輸數據。特別是,我想要實現的是客戶端可以開始處理第一個結果,而服務器還沒有寫完。其實,我不完全確定這是否可能。實際上它目前沒有發生。流式傳輸大型SOAP附件
這是我的服務實現
@MTOM // enable Message Transmission Optimization Mechanism (MTOM) at the server
@WebService (endpointInterface = "com.SnafucatorWS")
@StreamingAttachment(parseEagerly = true, memoryThreshold = 4000000L)
public class SnafucatorWSImpl implements SnafucatorWS {
...
@Override
public GetNodesResponse getNodes() {
Iterator<List<String>> result = impl.getNodes();
return new GetNodesResponse(result);
}
}
如何創建客戶端連接
private SnafucatorWS connect() {
if (serviceUrl == null) {
throw new UncheckedConnectionException("The service url of the snafucator webservice has not been set");
}
Service result = null;
try {
result = Service.create(
new URL(serviceUrl + "?wsdl"),
new QName("http://com/", "SnafucatorWSImplService"));
} catch (MalformedURLException e) {
LOG.fatal("Could not create web service endpoint.", e);
}
SnafucatorWS port = result.getPort(SnafucatorWS.class);
// enable Message Transmission Optimization Mechanism (MTOM) at the client (for transmission of binary data)
BindingProvider bindingProvider = (BindingProvider)port;
SOAPBinding soapBinding = (SOAPBinding)bindingProvider.getBinding();
soapBinding.setMTOMEnabled(true);
return port;
}
這是我的反應和編碼代碼和數據解碼:
@XmlAccessorType (XmlAccessType.NONE)
@XmlRootElement (name = "getNodesResponse")
@XmlType (name = "getNodesResponseType")
public class GetNodesResponse {
private static final ExecutorService POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private DataHandler results;
private BlockingQueue<List<String>> resultSet;
private volatile boolean hasNext;
private final CountDownLatch streamOpened = new CountDownLatch(1);
...
@XmlElement (required = true)
@XmlMimeType ("*/*")
public DataHandler getResults() {
return results;
}
void setResults(DataHandler aDataHandler) {
results = aDataHandler;
resultSet = new ArrayBlockingQueue<List<String>>(100);
StreamingDataHandler dataHandler = (StreamingDataHandler)results;
try {
parse(dataHandler, resultSet);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* @return true, if the result set serves more elements
* @throws InterruptedException if the current thread was interrupted
*/
public boolean hasNext() throws InterruptedException {
streamOpened.await();
return (hasNext || !resultSet.isEmpty());
}
/**
* Results can be accessed by successively calling
* {@link java.util.concurrent.BlockingQueue#take()} as long as {@link #hasNext()} returns true. Please note, however,
* that an invalid result stream may cause {@link #hasNext()} to return true erroneously, hence the queue might block indefinitely.
*
* @return the result set as a blocking queue
*/
public BlockingQueue<List<String>> getResultSet() {
return resultSet;
}
private DataHandler encode(Iterator<List<String>> aResults) {
assert (aResults != null);
final PipedOutputStream out = new PipedOutputStream();
DataHandler dh = new DataHandler(new StreamDataSource(out, "*/*"));
Encoder encoder = new Encoder(out, aResults);
@SuppressWarnings("unused")
Future<?> future = POOL.submit(encoder);
return dh;
}
private void parse(StreamingDataHandler dataHandler, final Queue<List<String>> aResultSet)
throws InterruptedException {
Decoder decoder = new Decoder(dataHandler, aResultSet);
@SuppressWarnings("unused")
Future<?> future = POOL.submit(decoder);
}
...
private static final class StreamDataSource implements DataSource {
private final String name = UUID.randomUUID().toString();
private final InputStream in;
private final String mimeType;
private StreamDataSource(PipedOutputStream aOut, String aMimeType) {
try {
in = new PipedInputStream(aOut);
} catch (IOException e) {
throw new RuntimeException("Could not create input stream.", e);
}
mimeType = aMimeType;
}
@Override public String getName() { return name; }
@Override public String getContentType() { return mimeType; }
/**
* {@inheritDoc}
*
* This implementation violates the specification in that it is destructive. Only the first call will return an
* appropriate input stream.
*/
@Override public InputStream getInputStream() { return in; }
@Override public OutputStream getOutputStream() { throw new UnsupportedOperationException(); }
}
/**
* Decodes the contents of an input stream as written by the Encoder and writes
* parsed rows to a {@link java.util.Queue}.
*/
private class Decoder implements Runnable {
private final StreamingDataHandler dataHandler;
private final Queue<List<String>> resultSet;
public Decoder(StreamingDataHandler aDataHandler, Queue<List<String>> aResultSet) {
dataHandler = aDataHandler;
resultSet = aResultSet;
}
@Override
public void run() {
try {
InputStream in = dataHandler.getInputStream();
byte[] lenBytes = new byte[4];
List<String> row;
int rowLen;
// read the first row's length
in.read(lenBytes);
rowLen = ByteBuffer.wrap(lenBytes).getInt();
if (rowLen == 0) {
hasNext = false;
streamOpened.countDown();
} else {
hasNext = true;
streamOpened.countDown(); // now the client can start processing/waiting for rows
do {
rowLen = ByteBuffer.wrap(lenBytes).getInt();
if (rowLen == 0) { break; }
row = new ArrayList<String>(rowLen); // read row length
for (int col = 0; col < rowLen; col++) { // for each column
in.read(lenBytes); // read the value length
int valLen = ByteBuffer.wrap(lenBytes).getInt();
byte[] valBytes = new byte[valLen]; // allocate a buffer of appropriate size
in.read(valBytes);
row.add(new String(valBytes));
}
resultSet.add(row);
} while (in.read(lenBytes) > 0);
hasNext = false;
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
hasNext = false;
try {
dataHandler.close();
} catch (IOException e) {
throw new RuntimeException("Could not close data handler.", e);
}
}
}
}
/**
* Encodes the given result set and writes the result to an output stream.
*/
private class Encoder implements Runnable {
private final OutputStream out;
private final Iterator<List<String>> iterator;
public Encoder(OutputStream aOut, Iterator<List<String>> aResults) {
out = aOut;
iterator = aResults;
}
@Override
public void run() {
try {
while (iterator.hasNext()) {
List<String> row = iterator.next();
out.write(ByteBuffer.allocate(4).putInt(row.size()).array());
for (String s : row) {
byte[] bytes = s.getBytes();
out.write(ByteBuffer.allocate(4).putInt(s.length()).array()); // write size of column in bytes
out.write(bytes); // write column value
}
}
} catch (IOException e) {
throw new RuntimeException("Could not write data.", e);
} finally {
try {
out.close();
} catch (IOException e) {
throw new RuntimeException("Could not close data handler.", e);
}
}
}
}
Web服務返回包含完全預期結果集的響應。但是,當我調試時,我發現客戶端只能在服務關閉輸出流後才能開始解碼。我的目標是解碼器開始工作(在客戶端),而編碼器任務仍在寫入服務器端的輸出流。 也許這不能修復,但我認爲這將是一個很好的性能改進。
在此先感謝您的任何建議如何解決這個問題。
我實現了一個小的DataHandler,做同樣的任意協議緩衝區的流:https://bintray.com/avidd/maven/StreamingService/view/read – avidD