2015-06-11 9 views
10

Mam x boostów, które działają w tym samym czasie. Jeden wątek producenta wypełnia zsynchronizowaną kolejkę z zadaniami obliczeniowymi. Nici konsumenta wyskakują z zadań i obliczają je.Naprawiono poprawnie gwinty do automatycznego wyłączania

Synchronised Queue Image Source: https://www.quantnet.com/threads/c-multithreading-in-boost.10028/

Użytkownik może zakończyć programm podczas tego procesu, więc muszę zamknięcie moje wątki prawidłowo. Moje obecne podejście wydaje się nie działać, ponieważ są zgłaszane wyjątki. Planuje się, że po zamknięciu systemu wszystkie procesy powinny zostać zabite i zatrzymać ich bieżące zadanie bez względu na to, co robią. Czy mógłbyś mi pokazać, jak zabiłbyś te wątki?

Inicjalizacja Temat:

for (int i = 0; i < numberOfThreads; i++) 
    { 
     std::thread* thread = new std::thread(&MyManager::worker, this); 
     mThreads.push_back(thread); 
    } 

Zniszczenie Temat:

void MyManager::shutdown() 
{ 
    for (int i = 0; i < numberOfThreads; i++) 
    { 
     mThreads.at(i)->join(); 
     delete mThreads.at(i); 
    } 
    mThreads.clear(); 
} 

Pracownik:

void MyManager::worker() 
{ 
    while (true) 
    { 

     int current = waitingList.pop(); 
     Object * p = objects.at(current); 
     p->calculateMesh(); //this task is internally locked by a mutex 

     try 
     { 
      boost::this_thread::interruption_point(); 
     } 
     catch (const boost::thread_interrupted&) 
     { 
      // Thread interruption request received, break the loop 
      std::cout << "- Thread interrupted. Exiting thread." << std::endl; 
      break; 
     } 
    } 
} 

synchroniczne kolejki:

#include <queue> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 

template <typename T> 
class ThreadSafeQueue 
{ 
public: 

    T pop() 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     while (queue_.empty()) 
     { 
      cond_.wait(mlock); 
     } 
     auto item = queue_.front(); 
     queue_.pop(); 

     return item; 
    } 

    void push(const T& item) 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     queue_.push(item); 
     mlock.unlock(); 
     cond_.notify_one(); 
    } 


    int sizeIndicator() 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     return queue_.size(); 
    } 


private: 

    bool isEmpty() { 
     std::unique_lock<std::mutex> mlock(mutex_); 
     return queue_.empty(); 
    } 

    std::queue<T> queue_; 
    std::mutex mutex_; 
    std::condition_variable cond_; 
}; 

Rzucony stos wywołań błędu:

... std::_Mtx_lockX(_Mtx_internal_imp_t * * _Mtx) Line 68 C++ 
... std::_Mutex_base::lock() Line 42 C++ 
... std::unique_lock<std::mutex>::unique_lock<std::mutex>(std::mutex & _Mtx) Line 220 C++ 
... ThreadSafeQueue<int>::pop() Line 13 C++ 
... MyManager::worker() Zeile 178 C++ 
+0

dwie rzeczy: isEmpty nie jest zablokowana, a wielkość() mogą mieć prostszą implementację: po mutex jest zablokowany można po prostu wrócić queue_.size() (i mlock destructor zwalnia mutex) – marom

+0

@mamy dzięki, poprawiłem mój kod. Błąd nadal istnieje. – Anthea

+0

Dwie rzeczy: isEmpty i size mogą nie być publiczne. Wszystkie informacje, które zgłaszają, mogą być nieważne podczas oceny przez dzwoniącego. Jeśli nie są używane prywatnie, zostaną usunięte. – stefan

Odpowiedz

0

Spróbuj przesunąć „próbować” w górę (jak w poniższym przykładzie). Jeśli twój wątek czeka na dane (wewnątrz waitingList.pop()), to może czekać wewnątrz zmiennej warunkowej .wait(). Jest to "punkt przerwania", więc może rzucać, gdy wątek zostanie przerwany.

void MyManager::worker() 
{ 
    while (true) 
    { 
     try 
     { 
      int current = waitingList.pop(); 
      Object * p = objects.at(current); 
      p->calculateMesh(); //this task is internally locked by a mutex 

      boost::this_thread::interruption_point(); 
     } 
     catch (const boost::thread_interrupted&) 
     { 
      // Thread interruption request received, break the loop 
      std::cout << "- Thread interrupted. Exiting thread." << std::endl; 
      break; 
     } 
    } 
} 
+0

próbował, ale błąd nadal istnieje. Aktualizuję pytanie z listy debugowania wywołania awarii. – Anthea

+1

Zawinęłbym całą pętlę w klauzulę try-catch. Nie zmieni to niczego tutaj, ale punktem wyjątków jest to, że podróżują one przez wszystkie struktury przepływu sterowania, takie jak pętle, więc nie trzeba tego robić ręcznie. –

-2

Myślę, że jest to klasyczny problem związany z wątkiem czytnika/pisarza działającym na wspólnym buforze. Jednym z najbezpieczniejszych sposobów rozwiązania tego problemu jest użycie muteksów i sygnałów (nie jestem w stanie opublikować kodu tutaj.) Proszę wysłać mi e-mail, ja wysyłam kod do ciebie).

0

Może złapałeś niewłaściwą klasę wyjątków? Co oznaczałoby, że nie zostanie przyłapany. Nie znasz wątków, ale czy jest to mieszanka std :: threads i boost :: threads, które to powoduje?

Spróbuj złapać najniższy wyjątek nadrzędny.

3

Z mojego doświadczenia w pracy z wątkami zarówno w trybie Boost, jak i Java, próba wyłączenia wątków na zewnątrz jest zawsze kłopotliwa. Nigdy nie byłem w stanie sprawić, żeby to działało czysto.

Najlepsze, co otrzymałem, to mieć wartość boolowską dostępną dla wszystkich wątków konsumenckich ustawionych na wartość true. Po ustawieniu wartości false, wątki po prostu powrócą samodzielnie. W twoim przypadku można to łatwo umieścić w pętli while, którą masz.

Oprócz tego będziesz potrzebował synchronizacji, abyś mógł poczekać na powrót wątków, zanim je usuniesz, w przeciwnym razie możesz uzyskać pewne trudne do zdefiniowania zachowanie.

Przykład z poprzednim projektem kopalni:

tworzenia wątku

barrier = new boost::barrier(numOfThreads + 1); 
threads = new detail::updater_thread*[numOfThreads]; 

for (unsigned int t = 0; t < numOfThreads; t++) { 
    //This object is just a wrapper class for the boost thread. 
    threads[t] = new detail::updater_thread(barrier, this); 
} 

zniszczenie gwintu

for (unsigned int i = 0; i < numOfThreads; i++) { 
    threads[i]->requestStop();//Notify all threads to stop. 
} 

barrier->wait();//The update request will allow the threads to get the message to shutdown. 

for (unsigned int i = 0; i < numOfThreads; i++) { 
    threads[i]->waitForStop();//Wait for all threads to stop. 
    delete threads[i];//Now we are safe to clean up. 
} 

Niektóre metody, które mogą być interesujące z owinięcie gwintu.

//Constructor 
updater_thread::updater_thread(boost::barrier * barrier) 
{ 
    this->barrier = barrier; 
    running = true; 

    thread = boost::thread(&updater_thread::run, this); 
} 

void updater_thread::run() { 
    while (running) { 
     barrier->wait(); 
     if (!running) break; 

     //Do stuff 

     barrier->wait(); 
    } 
} 

void updater_thread::requestStop() { 
    running = false; 
} 

void updater_thread::waitForStop() { 
    thread.join(); 
}