2013-03-25 50 views
6

我有一個JSON-RPC服務,其中一個請求返回連續的JSON對象流。與Indy的HTTP連續分組流

I.e. :

{id:'1'} 
{id:'2'} 
//30 minutes of no data 
{id:'3'} 
//... 

當然,沒有內容長度,因爲流是無止境的。

我正在使用自定義TStream後裔來接收和解析數據。但在內部TIdHttp緩衝數據並不傳遞給我,直到收到RecvBufferSize字節。

這導致:

{id:'1'} //received 
{id:'2'} //buffered by Indy but not received 
//30 minutes of no data 
{id:'3'} //this is where Indy commits {id:'2'} to me 

顯然,這不會做,因爲這要緊31分鐘前的消息應該已經29分鐘前交付。

我想印該怎麼做插座的更多信息:讀取多達RecvBufferSize以內是否有可用的數據並立即返回。

我發現this discussion從2005年一些地方可憐的靈魂試圖解釋問題的Indy開發者,但他們並沒有理解他。 (閱讀它,它是一個可悲的景象)

無論如何,他工作圍繞這通過編寫自定義IOHandler後裔,但是這是早在2005年,也許今天也有一些現成的解決方案?

回答

2

在使用TCP流是一種選擇,最後我去編寫自定義TIdIOHandlerStack後代的原始解決方案。

的動機是與TIdHTTP我知道什麼是不工作,只需要解決這個問題,在切換到低級別TCP意味着可能會出現新的問題。

Here's the code that I'm using,我將在這裏討論重點。

TIdStreamIoHandler必須從TIdIOHandlerStack繼承。

兩個函數需要重寫:ReadBytesReadStream

function TryReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; 
    AAppend: Boolean = True): integer; virtual; 
procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1; 
    AReadUntilDisconnect: Boolean = False); override; 

兩個被修改可以在IdIOHandler.TIdIOHandler找到印功能。在ReadByteswhile條款必須用燎ReadFromSource()要求更換,使TryReadBytes回報一氣呵成閱讀最多AByteCount字節之後。

基於此,ReadStream必須處理AByteCount(> 0,< 0)和ReadUntilDisconnect(true,false)的所有組合才能循環讀取,然後寫入到從套接字到達的數據流中。

注意ReadStream不必過早即使在這種流版本如果只有所請求的數據的一部分,是在插座可終止。它只需立即將該部分寫入流,而不是將其緩存在FInputBuffer中,然後阻塞並等待下一部分數據。

+0

作爲Indy是開源的,修改過的資源可能(並且,如果對其他人有幫助的話)應該公開 – mjn 2013-04-24 13:03:46

+0

@mjn:不知道,謝謝。添加了代碼。 – himself 2013-04-25 07:49:17

2

你並不需要寫一個IOHandler後裔,已經可以與TIdTCPClient類。它公開了一個TIdIOHandler對象,該對象具有從套接字讀取的方法。這些ReadXXX方法會阻塞,直到讀取請求的數據或發生超時。只要連接存在,ReadXXX就可以循環執行,並且只要它接收到一個新的JSON對象,就將它傳遞給應用程序邏輯。

你的例子看起來像所有的JSON對象只有一行。但是,JSON對象可能是多線的,在這種情況下,客戶端代碼需要知道它們是如何分離的。


更新:在用於「流式」 HTTP JSON web服務的類似問題#1(用於.NET),最upvoted溶液中使用較低級TCP客戶端,而不是一個HTTP客戶端:Reading data from an open HTTP stream

4

因爲你的連接不再是簡單的HTTP問題/答案,而是一個內容流,所以我聽起來像是一個WebSocket任務。

一段代碼見WebSocket server implementations for Delphi

at least one based on Indy,從AsmProfiler的作者。

AFAIK在websockets中有兩種流:二進制和文本。我懷疑你的JSON流是一些文本內容,從websocket的角度來看。

另一種選擇是使用long-pooling或一些較老的協議,這些協議更友好 - 當連接切換到websockets模式時,它不再是標準的HTTP,所以一些「明智」的數據包檢查工具企業網絡)可能會將其識別爲安全攻擊(例如DoS),因此可能會停止連接。

+0

如果我說得對,兩種解決方案都需要重寫服務?因爲我無法訪問它。 – himself 2013-03-26 09:08:22

+0

@himself如果您的請求是打開連接並且不使用Content-Length標頭,那麼這不再是HTTP,因此我想您必須更改服務端! – 2013-03-26 12:42:27

+0

姆姆,猜猜服務方會說什麼? 「HTTP標準中沒有任何地方說HTTP中間件可以長時間緩存數據,因此我們的服務很好,我想你必須修復你的HTTP客戶端代碼。」回到原點。 – himself 2013-03-26 13:51:28

0

實際上有分組的這在分塊編碼傳輸模式傳輸的內容之前的長度的數據的權利。使用此長度數據,idhttp的IOhandler通過一個數據包讀取一個數據包進行流式傳輸。最小有意義的單位是一個數據包,所以不需要從數據包中逐個讀取字符,也不需要改變IOHandler的功能。唯一的問題是idhttp不會停止將流數據轉向下一步,因爲流數據層出不窮:沒有結束數據包。因此該解決方案是使用idhttp onwork事件來觸發從流讀出和流位置設定爲零,以避免溢出。像這樣:

//add a event handler to idhttp  
    IdHTTP.OnWork := IdHTTPWork; 


    procedure TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
    begin 
     ..... 
     ResponseStringStream.Position :=0; 
     s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;//this is the packet conten 
     ResponseStringStream.Clear; 
     ... 
    end; 

procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject); 
var 
begin 
    .....  
    source := RatesWorker.RatesURL+'EUR_USD'; 
    RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream); 
end; 

然而使用自定義的write()T流的函數可以是這種要求更好的解決方案。