2012-07-02 9 views
7

Mam wiele wątków, które muszą zużywać dane ze strumienia TCP. Chciałbym użyć okrągłego bufora/kolejki we wspólnej pamięci do odczytu z gniazda TCP. Odbiór TCP zapisze bezpośrednio do kolejki cyklicznej. Konsumenci będą czytać z kolejki.Jak zaimplementować zerową kopię protokołu TCP przy użyciu zablokowanego bufora kołowego w C++

Ten projekt powinien umożliwiać zerową kopię i zerową blokadę. Jednak są tutaj 2 różne problemy.

  1. Czy jest możliwe/wydajne odczytanie tylko 1 logicznej wiadomości z gniazda TCP? Jeśli nie, a ja przeczytam więcej niż 1 wiadomość, będę musiał skopiować resztki z tego do tego-> dalej.

  2. Czy rzeczywiście istnieje możliwość zaimplementowania kolejki bez blokady? Wiem, że są operacje atomowe, ale to też może być kosztowne. ponieważ cała pamięć podręczna procesora musi zostać unieważniona. Wpłynie to na wszystkie operacje na wszystkich moich 24 rdzeniach.

Jestem trochę zardzewiały w TCP niskiego poziomu, i nie jest dokładnie jasne, jak stwierdzić, kiedy wiadomość jest kompletna. Czy szukam \ 0 czy jest to konkretna implementacja?

ty

+4

Co jest wiadomość w kosmos? Masz wysokie cele, ale czy wykonanie prostego, prostego rozwiązania nie jest wystarczające? – Nim

+0

Zablokowanie nie oznacza "nie kosztuje", ale w praktyce blokada jest zwykle o kilka rzędów wielkości szybsza niż przełączniki kontekstu związane z blokowaniem. To, czego chcesz, można osiągnąć wyłącznie za pomocą przyrostów atomowych, co jest bardzo szybkie, pod warunkiem, że uczynisz swój bufor odpowiednim rozmiarem, np. 65536. W sumie jednak zgadzam się z Nimem. – Damon

+0

@Damon, ciekawe, nie widzę implementacji pierścienia, który opiera się wyłącznie na przyrostach atomowych, może masz papier (link) coś do tego? Jedyne implementacje jakie widziałem (i zrobiłem) polegają na operacjach porównania/wymiany. – Nim

Odpowiedz

8

Niestety, TCP nie można przenieść wiadomości, tylko bajt strumieni. Jeśli chcesz przesłać wiadomości, będziesz musiał zastosować protokół na wierzchu. Najlepsze protokoły wysokiej wydajności to te, które wykorzystują nagłówek sprawdzania poprawności określający długość wiadomości - pozwala to na odczytanie prawidłowej ilości danych bezpośrednio w odpowiednim obiekcie buforowym bez powtarzania bajt po bajcie, szukając o charakterze wiadomości. Bufor POINTER można następnie umieścić w kolejce do innego wątku i utworzyć nowy obiekt bufora dla następnego komunikatu. Pozwala to uniknąć kopiowania danych masowych, a w przypadku dużych wiadomości jest wystarczająco wydajne, aby za pomocą kolejki nieblokującej dla wskaźników obiektu komunikatu było nieco niecelowe.

Następną optymalizacją dostępną jest łączenie buforów obiektów * w celu uniknięcia ciągłego nowego/likwidacji, zawrócenie buforów * w wątku konsumenta w celu ponownego wykorzystania w wątku otrzymującym sieć. Jest to dość łatwe do zrobienia z ConcurrentQueue, najlepiej blokowanie, aby umożliwić kontrolę przepływu zamiast uszkodzenia danych lub segfaultów/AV, jeśli pula opróżni się tymczasowo.

Następnie dodaj "martwą strefę [cacheline size]" na początku każdego * elementu danych bufora, aby zapobiec jakiemukolwiek wątkowi fałszywego współdzielenia danych z innymi.

Rezultatem powinien być przepływ szerokopasmowy kompletnych wiadomości do wątku konsumenta z bardzo małym opóźnieniem, brakiem procesora lub pomijaniem pamięci podręcznej. Wszystkie Twoje 24 rdzenie mogą działać bez problemów na różnych danych.

Kopiowanie danych masowych w aplikacjach wielowątkowych to przyznanie się do złego projektu i porażki.

Kontynuacja ..

Brzmi jak utkniesz z iteracji danych z powodu różnych protokołów :(

Fałsz podziału wolne PDU obiektu buforowego, np:

typedef struct{ 
    char deadZone[256]; // anti-false-sharing 
    int dataLen; 
    char data[8388608]; // 8 meg of data 
} SbufferData; 

class TdataBuffer: public{ 
private: 
    TbufferPool *myPool; // reference to pool used, in case more than one 
    EpduState PDUstate; // enum state variable used to decode protocol 
protected: 
    SbufferData netData; 
public: 
    virtual reInit(); // zeros dataLen, resets PDUstate etc. - call when depooling a buffer 
    virtual int loadPDU(char *fromHere,int len); // loads protocol unit 
    release(); // pushes 'this' back onto 'myPool' 
}; 

loadPDU przekazuje wskaźnik do, długości surowych danych sieciowych.Zwraca albo 0 - oznacza, że ​​jeszcze nie kompletnie zmontował PDU, albo liczbę bajtów, które zjadł z surowych danych sieciowych, aby całkowicie złożyć PDU, w takim przypadku, odstawił go, zdeboolował inny i wywołał loadPDU() z niewykorzystaną resztą danych surowych, a następnie kontynuuj z następnymi danymi surowymi, które będą dostarczane.

Możesz użyć różnych pul różnych klas buforów pochodnych do obsługi różnych protokołów, w razie potrzeby - tablicy TbufferPool [Eprotocols]. TbufferPool może być po prostu kolejką BlockingCollection. Zarządzanie staje się niemal banalne - bufory mogą być wysyłane w kolejkach dookoła systemu, do GUI do wyświetlania statystyk, a następnie do loggera, o ile na końcu łańcucha kolejek coś wywołuje release().

Oczywiście, "prawdziwy" obiekt PDU miałby ładować więcej metod, związków danych/struktur, iteratorów może i stanu silnika do obsługi protokołu, ale to jest i tak podstawowa idea. Najważniejsze jest łatwe zarządzanie, enkapsulacja i, ponieważ żadne dwa wątki nie mogą kiedykolwiek działać na tej samej instancji bufora, żadna blokada/synchronizacja nie jest wymagana do parsowania/dostępu do danych.

Och, tak, a ponieważ żadna kolejka nie musi pozostać zablokowana dłużej niż jest wymagane do popchnięcia/popowania jednym wskaźnikiem, szanse na faktyczne rywalizacje są bardzo niskie - nawet konwencjonalne kolejki blokujące prawie nigdy nie będą wymagały blokowania jądra.

+0

dzięki! niestety nie mam kontroli nad protokołem wiadomości. Mój kod będzie łączył się z różnymi publicznymi giełdami i prywatnymi fabrykami sprzedawców. miejmy nadzieję, że używają nagłówków/długości. O ile nie jest to standard, prawdopodobnie będę musiał zaprojektować kod odbioru TCP niezależnie od protokołu wiadomości. Jestem pewien, że mogę znaleźć wzór na to, a nawet spojrzeć na źródło ACE? dzięki @Martin James – jaybny

+0

"Następnie dodaj" martwą strefę [cacheline size] "na początku każdego * członka danych bufora" Im nie jestem całkiem pewien, rozumiem problem, który rozwiązuje. Co to jest "rozmiar cachline"? Specyficzny procesor? 32/64 bitów? ty – jaybny

+0

@jaybny - Google "fałszywe udostępnianie" –

Powiązane problemy