2010-07-27 10 views
12

Jest to kod, aby utworzyć thread_group i wykonać wszystkie wątki równolegle:Jak zrobić zastrzyk :: thread_group wykonać określoną liczbę równoległych wątków

boost::thread_group group; 
for (int i = 0; i < 15; ++i) 
    group.create_thread(aFunctionToExecute); 
group.join_all(); 

Kod ten będzie wykonywał wszystkie wątki naraz. Co chcę zrobić, to wykonać je wszystkie z wyjątkiem maksymalnie 4 równolegle. Po zakończeniu onowania wykonywany jest kolejny, dopóki nie będzie więcej do wykonania.

Odpowiedz

3

Innym, bardziej wydajnym rozwiązaniem byłoby wywoływanie zwrotne każdego wątku do wątku podstawowego po zakończeniu, a przewodnik w wątku głównym mógł za każdym razem uruchomić nowy wątek. Zapobiega to powtarzalnym wywołaniom timed_join, ponieważ wątek podstawowy nic nie da, dopóki wywołanie zwrotne nie zostanie wywołane.

+1

Wreszcie kończy się coś takiego: Mam wątek, w którym rejestruję wszystkie zadania. Następnie tworzę n wątków i przekazuję jako argument do każdego wątku w wątku. Każdy wątek sprawdza, czy są jakieś zadania. Jeśli tak, po prostu wykonaj jedno zadanie do wykonania. W przeciwnym razie wątek się kończy. W ten sposób tworzymy n wątków, a nie wątek na zadanie (zadanie się kończy, tworzony jest nowy wątek). –

0

mam coś takiego:

boost::mutex mutex_; 
    boost::condition_variable condition_; 
    const size_t throttle_; 
    size_t size_; 
    bool wait_; 
    template <typename Env, class F> 
    void eval_(const Env &env, const F &f) { 
     { 
      boost::unique_lock<boost::mutex> lock(mutex_); 
      size_ = std::min(size_+1, throttle_); 
      while (throttle_ <= size_) condition_.wait(lock); 
     } 
     f.eval(env); 
     { 
      boost::lock_guard<boost::mutex> lock(mutex_); 
      --size_; 
     } 
     condition_.notify_one(); 
    } 
0

myślę szukasz implementacji thread_pool, który jest dostępny here.

Dodatkowo zauważyłem, że jeśli tworzysz wektor std :: future i przechowujesz kontrakty wielu std :: async_tasks w nim i nie masz żadnego kodu blokującego w funkcji przekazanej do wątku, VS2013 (co najmniej od co mogę potwierdzić) uruchomi dokładnie odpowiedni brak wątków, które może obsłużyć twoja maszyna. Ponownie wykorzystuje wątki po utworzeniu.

0

tworzę własne uproszczony interfejs boost::thread_group do wykonania tej pracy:

class ThreadGroup : public boost::noncopyable 
{ 
    private: 
     boost::thread_group  group; 
     std::size_t    maxSize; 
     float      sleepStart; 
     float      sleepCoef; 
     float      sleepMax; 
     std::set<boost::thread*> running; 

    public: 
     ThreadGroup(std::size_t max_size = 0, 
        float max_sleeping_time = 1.0f, 
        float sleeping_time_coef = 1.5f, 
        float sleeping_time_start = 0.001f) : 
      boost::noncopyable(), 
      group(), 
      maxSize(max_size), 
      sleepStart(sleeping_time_start), 
      sleepCoef(sleeping_time_coef), 
      sleepMax(max_sleeping_time), 
      running() 
     { 
      if(max_size == 0) 
       this->maxSize = (std::size_t)std::max(boost::thread::hardware_concurrency(), 1u); 
      assert(max_sleeping_time >= sleeping_time_start); 
      assert(sleeping_time_start > 0.0f); 
      assert(sleeping_time_coef > 1.0f); 
     } 

     ~ThreadGroup() 
     { 
      this->joinAll(); 
     } 

     template<typename F> boost::thread* createThread(F f) 
     { 
      float sleeping_time = this->sleepStart; 
      while(this->running.size() >= this->maxSize) 
      { 
       for(std::set<boost::thread*>::iterator it = running.begin(); it != running.end();) 
       { 
        const std::set<boost::thread*>::iterator jt = it++; 
        if((*jt)->timed_join(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time)))) 
         running.erase(jt); 
       } 
       if(sleeping_time < this->sleepMax) 
       { 
        sleeping_time *= this->sleepCoef; 
        if(sleeping_time > this->sleepMax) 
         sleeping_time = this->sleepMax; 
       } 
      } 
      return *this->running.insert(this->group.create_thread(f)).first; 
     } 

     void joinAll() 
     { 
      this->group.join_all(); 
     } 

     void interruptAll() 
     { 
#ifdef BOOST_THREAD_PROVIDES_INTERRUPTIONS 
      this->group.interrupt_all(); 
#endif 
     } 

     std::size_t size() const 
     { 
      return this->group.size(); 
     } 
    }; 

Oto przykład użycia, bardzo podobny do boost::thread_group z główną różnicą, że utworzenie wątku jest punktem oczekujących:

{ 
    ThreadGroup group(4); 
    for(int i = 0; i < 15; ++i) 
    group.createThread(aFunctionToExecute); 
} // join all at destruction 
Powiązane problemy