2013-03-25 11 views
6

Mam usługę JSON-RPC, która dla jednego z żądań zwraca ciągły strumień obiektów JSON.Strumień ciągły HTTP z Indy

tj. :

{id:'1'} 
{id:'2'} 
//30 minutes of no data 
{id:'3'} 
//... 

Oczywiście nie ma długości treści, ponieważ strumień jest nieskończony.

Używam niestandardowego potomka TStream do odbierania i analizowania danych. Ale wewnętrznie TIdHttp buforuje dane i nie przekazuje mi go do momentu otrzymania RecvBufferSize bajtów.

Wynika to z:

{id:'1'} //received 
{id:'2'} //buffered by Indy but not received 
//30 minutes of no data 
{id:'3'} //this is where Indy commits {id:'2'} to me 

Oczywiście nie zrobi, ponieważ wiadomość, która liczy 30 minut temu miały zostać dostarczone 30 minut temu.

Chciałbym, aby Indy robiła dokładnie to, co robią gniazda: odczytaj do RecvBufferSize lub mniej, jeśli są dostępne dane i natychmiast wracaj.

Znalazłem this discussion z 2005 roku, gdzie jakaś biedna dusza próbowała wyjaśnić problem deweloperom Indy, ale oni go nie rozumieli. (Przeczytaj to, to smutny widok)

W każdym razie pracował nad tym, pisząc niestandardowy potomek IOHandlera, ale to było w 2005 roku, być może jest kilka gotowych rozwiązań dzisiaj?

Odpowiedz

2

Podczas korzystania z strumienia TCP była opcja, w końcu poszedłem z oryginalnym rozwiązaniem pisania niestandardowego potomka TIdIOHandlerStack.

Motywacja polegała na tym, że dzięki TIdHTTP wiem, co nie działa i trzeba to naprawić, natomiast przejście na niższy poziom oznacza, że ​​mogą pojawić się nowe problemy.

Here's the code that I'm using, i zamierzam omówić kluczowe punkty tutaj.

Nowa TIdStreamIoHandler musi odziedziczyć po TIdIOHandlerStack.

dwie funkcje muszą być zapisane: ReadBytes i ReadStream:

function TryReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; 
    AAppend: Boolean = True): integer; virtual; 
procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1; 
    AReadUntilDisconnect: Boolean = False); override; 

Oba są modyfikowane Indy funkcje, które można znaleźć w IdIOHandler.TIdIOHandler. W ReadBytes klauzula while musi zostać zastąpiona pojedynczą prośbą, aby TryReadBytes powrócił po przeczytaniu do AByteCount bajtów za jednym razem.

Na tej podstawie ReadStream musi obsługiwać wszystkie kombinacje AByteCount (> 0, < 0) i ReadUntilDisconnect (true, false) aby cyklicznie odczytać i zapisać strumień ilości danych przybywających z gniazdka.

Należy pamiętać, że nie musi kończyć się przedwcześnie, nawet w tej wersji strumienia, jeśli tylko część żądanych danych jest dostępna w gnieździe. Musi tylko zapisać tę część w strumieniu natychmiast, zamiast buforować ją w FInputBuffer, a następnie zablokować i poczekać na kolejną część danych.

+0

jako Indy jest open source, zmodyfikowane źródła mogą (i, jeśli pomocne dla innych, powinny) być upublicznione – mjn

+0

@mjn: Nie wiedziałem tego, dziękuję. Dodano kod. – himself

2

Nie trzeba pisać potomka IOHandler, jest to już możliwe z klasą TIdTCPClient. Udostępnia obiekt TIdIOHandler, który ma metody odczytu z gniazda. Te metody ReadXXX są blokowane, dopóki żądane dane nie zostaną odczytane lub nastąpi przekroczenie limitu czasu. Dopóki istnieje połączenie, ReadXXX może być wykonywany w pętli i za każdym razem, gdy otrzymuje nowy obiekt JSON, przekazuje go do logiki aplikacji.

Twój przykład wygląda tak, jakby wszystkie obiekty JSON miały tylko jedną linię. Obiekty JSON mogą jednak być wieloliniowe, w tym przypadku kod klienta musi wiedzieć, w jaki sposób są rozdzielone.


Aktualizacja: w podobnym pytaniu Stackoverflow (NET) dla „streaming” HTTP JSON usługi internetowej, najbardziej upvoted rozwiązanie wykorzystywane do niższego poziomu klienta TCP zamiast klienta http: Reading data from an open HTTP stream

4

Brzmi dla mnie jak zadanie WebSocket, ponieważ twoje połączenie nie jest już zorientowane w postaci pytań i odpowiedzi HTTP, ale strumień treści.

Zobacz kod WebSocket server implementations for Delphi.

Istnieje at least one based on Indy, autor programu AsmProfiler.

AFAIK Istnieją dwa rodzaje strumieni w stronach internetowych: binarne i tekstowe. Podejrzewam, że Twój strumień JSON jest treścią tekstową, z punktu widzenia websocket.

Inną opcją jest użycie long-pooling lub niektórych starszych protokołów, które są bardziej przyjazne dla rooterów - kiedy połączenie przełącza się do trybu websockets, nie jest już standardowym HTTP, więc niektóre "sensowne" narzędzia do inspekcji pakietów (na sieć korporacyjna) może zidentyfikować to jako atak bezpieczeństwa (np. DoS), więc może przerwać połączenie.

+0

Jeśli dobrze, oba rozwiązania wymagają przepisania usługi? Ponieważ nie mam do niego dostępu. – himself

+0

@himself Jeśli twoja prośba ma otworzyć połączenie i nie używać nagłówków Content-Length, to nie jest to już HTTP, więc przypuszczam, że będziesz musiał zmienić stronę usługi! –

+0

Mhm, zgadnij, co powie strona obsługi? "Nigdzie w standardzie HTTP jest napisane, że oprogramowanie pośredniczące HTTP może buforować dane przez dłuższy czas, dlatego nasza usługa jest w porządku, przypuszczam, że będziesz musiał naprawić swój kod klienta HTTP". Powrót do pierwszego kwadratu. – himself

0

Istnieje rzeczywiście dane o długości tuż przed treścią pakietu, który został przeniesiony w trybie transferu kodowanego. Używając tych danych o długości, IOhandler z idhttp odczytuje jeden pakiet z jednego pakietu do strumienia. Minimalną znaczącą jednostką jest pakiet, więc nie powinno być potrzeby odczytywania znaków jeden po drugim z pakietu, a następnie nie ma potrzeby zmiany funkcji IOHandler. Jedynym problemem jest to, że idhttp nie powstrzyma przekształcania danych strumienia do następnego kroku z powodu niekończących się danych strumienia: nie ma pakietu końcowego. Więc rozwiązaniem jest użycie idhttp onwork zdarzenie wywołać odczyt ze strumienia i ustawienie pozycji strumienia do zera w celu uniknięcia przepełnienia .like to:

//add a event handler to idhttp  
    IdHTTP.OnWork := IdHTTPWork; 


    procedure TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
    begin 
     ..... 
     ResponseStringStream.Position :=0; 
     s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;//this is the packet conten 
     ResponseStringStream.Clear; 
     ... 
    end; 

procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject); 
var 
begin 
    .....  
    source := RatesWorker.RatesURL+'EUR_USD'; 
    RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream); 
end; 

jednak użyć niestandardowego write() funkcja TStream może być lepsze rozwiązanie dla tego rodzaju wymagań.