2012-08-04 18 views
6

Mam za zadanie zmodyfikować synchroniczny program C, aby mógł działać równolegle. Celem jest zapewnienie maksymalnej mobilności, ponieważ jest to program open source, z którego korzysta wiele osób. Z tego powodu pomyślałem, że najlepiej będzie zawinąć program w warstwie C++, aby móc korzystać z przenośnych bibliotek wspomagających. Zrobiłem to już i wszystko wydaje się działać zgodnie z oczekiwaniami.Wielowątkowe przesyłanie wiadomości w języku C++

Problem, który mam, polega na wyborze najlepszego sposobu przekazywania wiadomości między wątkami. Na szczęście architektura programu jest wieloproducentem i pojedynczym konsumentem. Co więcej, kolejność wiadomości nie jest ważna. Czytałem, że koleje jednoprodukcyjne/jedno-konsumenckie (SPSC) skorzystałyby z tej architektury. Ci doświadczeni w programowaniu wielowątkowym mają jakąkolwiek radę? Jestem całkiem nowy w tych rzeczach. Również wszelkie przykłady kodu wykorzystujące doładowanie w celu wdrożenia SPSC będą bardzo mile widziane.

+0

Zobacz zaakceptowaną odpowiedź na http://stackoverflow.com/questions/8918401/does-a-multiple-producer-single-consumer-lock-free-queue-exist-for-c – walrii

Odpowiedz

7

Poniżej znajduje się technika, której użyłem w mojej bibliotece współpracy wielozadaniowej/wielowątkowej (MACE) http://bytemaster.github.com/mace/. Ma tę zaletę, że jest wolny od blokady, z wyjątkiem sytuacji, gdy kolejka jest pusta.

struct task { 
    boost::function<void()> func; 
    task* next; 
}; 


boost::mutex      task_ready_mutex; 
boost::condition_variable  task_ready; 
boost::atomic<task*>    task_in_queue; 

// this can be called from any thread 
void thread::post_task(task* t) { 
    // atomically post the task to the queue. 
    task* stale_head = task_in_queue.load(boost::memory_order_relaxed); 
    do { t->next = stale_head; 
    } while(!task_in_queue.compare_exchange_weak(stale_head, t, boost::memory_order_release)); 

    // Because only one thread can post the 'first task', only that thread will attempt 
    // to aquire the lock and therefore there should be no contention on this lock except 
    // when *this thread is about to block on a wait condition. 
    if(!stale_head) { 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex); 
     task_ready.notify_one(); 
    } 
} 

// this is the consumer thread. 
void process_tasks() { 
    while(!done) { 
    // this will atomically pop everything that has been posted so far. 
    pending = task_in_queue.exchange(0,boost::memory_order_consume); 
    // pending is a linked list in 'reverse post order', so process them 
    // from tail to head if you want to maintain order. 

    if(!pending) { // lock scope 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex);     
     // check one last time while holding the lock before blocking. 
     if(!task_in_queue) task_ready.wait(lock); 
    } 
} 
+0

"Spółdzielnia": (( –

+0

.. choć +1 za użycie linku wewnątrz każdej wiadomości, aby uniknąć przechowywania wiadomości w kolejce –

+0

Fantastyczne dzięki . – grouma

1

Jeśli istnieje tylko jeden konsument, ale wielu producentów, to użyłbym tablicy lub struktury danych podobnej do tablicy z czasem dostępu O (1), gdzie każde gniazdo tablicy reprezentuje kolejkę jednego producenta-konsumenta. Wielką zaletą kolejki pojedynczego producenta-konsumenta jest fakt, że można go zablokować bez żadnych wyraźnych mechanizmów synchronizacji, co czyni go bardzo szybką strukturą danych w środowisku wielowątkowym. Zobacz my answer here, aby uzyskać pełną wersję kolejki jednego producenta-konsumenta.

+0

Użyłem tej techniki przed przyjęciem rozwiązanie atomowe poniżej. Problem polegał na tym, że nie skalował się, zużywał pamięć podczas "bezczynności" dla buforów, a jeśli nie wiesz, które wątki mogą publikować z wyprzedzeniem (często), musisz dynamicznie zmienić rozmiar (za pomocą lock) lub hardcode "dużą" maksymalną liczbę wątków. – bytemaster

+0

Myślałem o używaniu okrągłej kolejki ... w ten sposób nie ma potrzeby zmiany rozmiaru kolejki. – Jason

+0

, która jest "kolejką o ustalonym rozmiarze", która może wypełnić, ale jeśli masz N wątków, to każdy wątek potrzebuje jednego wejścia dla wszystkich innych wątków N, lub gdy pierwszy wątek próbuje się połączyć z innym, musi przydzielić własny pojedynczy producent/kolejka pojedynczego klienta. To jest kwestia zmiany rozmiaru lub twardego kodu. – bytemaster

1

Istnieje wiele przykładów kolejek producent-konsument w sieci, bezpiecznych dla wielu producentów/konsumentów. @bytemaster opublikował taki, który używa odsyłacza wewnątrz każdej wiadomości, aby wyeliminować pamięć w samej klasie kolejki - to dobre podejście, używam go sam na osadzonych zadaniach.

Tam, gdzie klasa kolejkowa musi zapewniać pamięć, zwykle uruchamiam "kolejkę puli" o rozmiarze N, załadowaną instancjami klasy N * wiadomości podczas uruchamiania. Wątki wymagające komunikacji muszą wysłać wiadomość * z puli, załadować ją i umieścić w kolejce. Kiedy w końcu "zużyta" wiadomość * zostanie zepchnięta z powrotem do puli. Ogranicza to liczbę wiadomości, więc wszystkie kolejki muszą mieć tylko długość N - bez zmiany rozmiaru, bez nowych(), bez usuwania(), łatwe wykrywanie wycieków.

+0

Uprościliśmy kod odpowiedzi, ale w rzeczywistości każde "zadanie" zapisuje funktora o "nieznanym rozmiarze" (aby uniknąć przydziału boost :: function <>). Co więcej, pozwoliłem, aby zadanie było podwójnie liczone jako obietnica, która jest utrzymywana przy życiu tak długo, jak utrzymuje ją przyszłość. Kończę z 1 malloc na zadanie i mogę przesłać 2,4 miliona postów synchronizacji/czekania na sekundę. – bytemaster

Powiązane problemy