2009-09-19 17 views
7

Potrzebuję kolejki do przekazywania wiadomości z jednego wątku (A) na inny (B), jednak nie byłem w stanie znaleźć takiego, który naprawdę robi to, co chcę, ponieważ na ogół pozwalają dodanie elementu do błędu, sprawa, która w mojej sytuacji jest bardzo fatalna, ponieważ wiadomość musi zostać przetworzona, a wątek naprawdę nie może się zatrzymać i czekać na wolny pokój.Wielowątkowy pojedynczy czytnik pojedynczy pisarz kolejka fifo

  • wątku Only A dodaje elementy, a nić tylko B odczytuje je
  • Temat Musi Nigdy nie należy blokować, jednak wątek B nie jest wydajność krytyczna, więc to może
  • Dodaję zawsze musi odnieść sukces, więc kolejka mogę mieć górny limit rozmiaru (skrót od wyczerpaniu pamięci w systemie)
  • Jeśli kolejka jest pusta, gwint B powinna poczekać, dopóki nie jest element do przetwarzania
+0

Jaką bibliotekę wątków używasz? pthreads? –

+0

doładowanie :: wątek i niektóre bity kodu specyficznego dla platformy tu i tam –

+0

Twój cel może spowodować wyczerpanie pamięci, ponieważ nie pozwalasz, aby wątek programu piszącego blokował lub upuszczał elementy. Więc jeśli osiągniesz krytyczny limit wielkości kolejki, musisz zdecydować, czy upuścić przedmioty, czy zablokować wątek pisarza. W przeciwnym razie możesz upuścić przedmioty pośrednio, ponieważ Twój program się nie powiedzie :-) – mmmmmmmm

Odpowiedz

7

Oto jak napisać lock- wolna kolejka w C++:

http://www.ddj.com/hpc-high-performance-computing/210604448

Ale gdy powiesz „nawlec nie musi blokować”, jesteś pewna, że ​​to wymaganie? Windows nie jest systemem operacyjnym czasu rzeczywistego (i nie jest linuxem, w normalnym użyciu). Jeśli chcesz, aby wątek A mógł korzystać z całej dostępnej pamięci systemowej, musisz przydzielić pamięć (lub poczekać, aż zrobi to inna osoba). Sam system operacyjny nie może zapewnić gwarancji czasowych lepszych niż te, które miałbyś, gdyby zarówno czytnik, jak i program piszący, wykonali blokadę procesu (tj. Nie-udostępniony muteks) w celu manipulowania listą. A najgorszy przypadek dodania wiadomości będzie musiał przejść do systemu operacyjnego, aby uzyskać pamięć.

Podsumowując, istnieje powód, dla którego te kolejki, które nie lubisz, mają stałą pojemność - tak, że nie muszą przydzielać pamięci w wątku o rzekomo niskim opóźnieniu.

Tak więc kod bez blokady generalnie będzie mniejszy blok-y, ale ze względu na przydzielenie pamięci nie ma gwarancji, że jest, a wydajność z muteksem nie powinna być tak nędzna, chyba że masz naprawdę olbrzymie strumień zdarzeń do przetworzenia (np. piszemy sterownik sieciowy, a wiadomości są przychodzącymi pakietami ethernetowymi).

Więc w pseudo-kodzie, pierwszą rzeczą, którą staram byłoby:

Writer: 
    allocate message and fill it in 
    acquire lock 
     append node to intrusive list 
     signal condition variable 
    release lock 

Reader: 
    for(;;) 
     acquire lock 
      for(;;) 
       if there's a node 
        remove it 
        break 
       else 
        wait on condition variable 
       endif 
      endfor 
     release lock 
     process message 
     free message 
    endfor 

Dopiero jeśli okaże się to do wprowadzenia niedopuszczalnych opóźnień w wątku pisarz pójdę zablokować wolne od kodu (chyba że zdarzyło mi się mieć odpowiednią kolejkę już leżącą w pobliżu).

+0

Na niższym poziomie można użyć listy z pojedynczym połączeniem z dołączonym procesem pisania i procesem czytania. Może to być blokadę z procesem pisania zmieniającym wskaźnik NULL na wartość inną niż NULL, a proces czytania zmienia się z NULL na NULL. Mała prywatna kupa zapewnia dobrą zamortyzowaną wydajność dla elementów listy. Pisarz mallocs i czytelnik uwalniają. Jeśli czytnik przejdzie w stan uśpienia, można podać trzeci proces C, który spekulacyjnie powiększy stertę prywatną, ukrywając blokujący charakter alokacji z procesu A. –

+0

Twój przykład spowoduje zakleszczenie.Podczas gdy czytnik czeka na sygnał, blokuje go, co uniemożliwia autorowi uzyskanie blokady i sygnalizację. Musisz zwolnić blokadę zanim zaczekasz na zmienną warunku i ponownie ją odbierz. –

+0

@Bobby: jesteś w błędzie. Oczekiwanie na zmienną warunku powoduje zwolnienie powiązanej blokady podczas oczekiwania, a następnie ponowne jej odzyskanie przed powrotem z oczekiwania. Jest to część "zmiennej warunku" - jeśli interfejs API, którego używasz, nie robi tego dla ciebie, to nie jest zmienną warunkową, to raczej semafor. I ważne jest, aby API to robił, ponieważ wtedy twój kod może polegać na tym, że zwolnienie blokady i rozpoczęcie czekania na warunek następuje atomowo - to znaczy żaden inny wątek nie może nic zrobić pod blokadą, zanim twój wątek będzie kelner. –

0

Być może zechcesz wziąć pod uwagę swoje wymagania - czy naprawdę nie można odrzucić żadnych pozycji kolejki? Czy też nie chcesz, aby B wyciągał dwa kolejne elementy z kolejki, które nie były kolejnymi przedmiotami, ponieważ mogłoby to w jakiś sposób przeinaczać sekwencję zdarzeń?

Na przykład, jeśli jest to jakiś rodzaj systemu rejestrowania danych, (co zrozumiałe) nie chciałbyś mieć przerw w zapisie - ale bez nieograniczonej pamięci, rzeczywistość jest taka, że ​​w jakimś rogu przypadku gdzieś prawdopodobnie twoja kolejka pojemność ..

W takim przypadku jednym z rozwiązań jest posiadanie specjalnego elementu, który można umieścić w kolejce, co reprezentuje przypadek A, który odkrył, że musiał upuścić przedmioty. Zasadniczo trzymasz jeden dodatkowy element wokół, który przez większość czasu jest zerowy. Za każdym razem, gdy A dodaje elementy do kolejki, jeśli ten dodatkowy element nie jest pusty, to znaczy, że jeśli A wykryje, że nie ma miejsca w kolejce, konfiguruje ten dodatkowy element, mówiąc "hej, kolejka była pełna". .

W ten sposób A nigdy się nie blokuje, możesz upuszczać elementy, gdy system jest bardzo zajęty, ale nie tracisz z oczu faktu, że elementy zostały upuszczone, ponieważ jak tylko miejsce w kolejce stanie się dostępne, znak ten przechodzi w aby wskazać, gdzie nastąpił spadek danych. Proces B wykonuje następnie wszystko, co musi, gdy odkryje, że wyciągnął element zaznaczenia z kolejki.

1

Visual Studio 2010 dodaje 2 nowe biblioteki, które obsługują ten scenariusz bardzo dobrze, Asynchronous Agents Library i równoległą bibliotekę wzorów.

Biblioteka środki obsługuje lub asynchroniczne przekazywanie komunikatów i zawiera bloki wiadomości do wysyłania wiadomości do „celów” i odbierania wiadomości od „źródła”

unbounded_buffer jest klasa szablon, który oferuje co wierzę szukasz dla:

#include <agents.h> 
#include <ppl.h> 
#include <iostream> 

using namespace ::Concurrency; 
using namespace ::std; 

int main() 
{ 
    //to hold our messages, the buffer is unbounded... 
    unbounded_buffer<int> buf1; 
    task_group tasks; 

    //thread 1 sends messages to the unbounded_buffer 
    //without blocking 
    tasks.run([&buf1](){ 
     for(int i = 0 ; i < 10000; ++i) 
     send(&buf1,i) 
    //signal exit 
    send(&buf1,-1); 
    }); 

    //thread 2 receives messages and blocks if there are none 

    tasks.run([&buf1](){ 
     int result; 
     while(result = receive(&buf1)!=-1) 
     { 
      cout << "I got a " << result << endl; 
     } 
    }); 

    //wait for the threads to end 
    tasks.wait(); 
} 
+2

Czy to naprawdę działa w kategorii Linux? –

+0

FWIW, w pętli odbierającej zawsze będziesz wyprowadzał "Mam 1", ponieważ! = Jest obliczany przed = –

1
  • Dlaczego nie użyć STL <list> lub <deque> z AR mutex ound dodać/usunąć? Czy thread-safety of STL jest niewystarczające?

  • Dlaczego nie utworzyć własnej (pojedynczo/podwójnie) klasy z połączonymi listami węzłów, która zawiera wskaźnik i czy elementy do dodania/usunięcia dziedziczą z tego? W ten sposób niepotrzebne są dodatkowe alokacje. Po prostu masz kilka wskazówek w threadA::add() i threadB::remove() i gotowe. (Mimo, że chcesz to zrobić pod mutex, efekt blokujący na threadA byłyby znikome, chyba że zrobił coś naprawdę złego ...)

  • Jeśli używasz pthreads, sprawdź sem_post() i sem_wait(). Chodzi o to, że threadB może blokować w nieskończoność przez sem_wait(), aż wątekA umieści coś w kolejce. Następnie wątekA wywołuje sem_post(). Który budzi wątekB, aby to działało. Następnie wątek B może powrócić do trybu uśpienia. Jest to skuteczny sposób obsługi asynchronicznej sygnalizacji, obsługujący rzeczy takie jak wiele threadA::add() przed zakończeniem threadB::remove().