2012-06-20 30 views
5

Zamierzam zaimplementować serwer boost :: asio z pulą wątków, używając pojedynczego io_service (HTTP Server 3 example). io_service będzie związany z gniazdem domeny unix i przekazywać żądania przechodzące od połączeń w tym gnieździe do różnych wątków. Aby zmniejszyć zużycie zasobów, chcę, aby pula wątków była dynamiczna.Przykład dynamicznej puli wątków w boost :: asio

Oto koncepcja. Najpierw tworzony jest pojedynczy wątek. Kiedy przychodzi żądanie i serwer widzi, że nie ma jałowego wątku w puli, tworzy nowy wątek i przekazuje do niego żądanie. Serwer może utworzyć maksymalnie maksymalną liczbę wątków. Idealnie powinien on mieć funkcjonalność zawieszania wątków, które są bezczynne przez pewien czas.

Czy ktoś zrobił coś podobnego? A może ktoś ma odpowiedni przykład?

Co do mnie, myślę, że powinienem jakoś nadpisać io_service.dispatch, aby to osiągnąć.

Odpowiedz

5

Może istnieć kilka wyzwań z początkowego podejścia:

  • boost::asio::io_service nie ma pochodzić z lub przepisany. Zwróć uwagę na brak funkcji wirtualnych.
  • Jeśli biblioteka wątków nie zapewnia możliwości sprawdzania stanu wątku, informacje o stanie muszą być zarządzane osobno.

Alternatywnym rozwiązaniem jest opublikowanie pracy w numerze io_service, a następnie sprawdzenie, jak długo trwała ona w numerze io_service. Jeśli delta czasu między momentem, w którym była gotowa do uruchomienia, a faktycznym uruchomieniem przekroczyła pewien próg, oznacza to, że w kolejce jest więcej zadań niż wątków obsługujących kolejkę. Główną zaletą tego jest to, że dynamiczna logika wzrostu puli nici zostaje oddzielona od innej logiki.

Oto przykład, który pozwala to osiągnąć, korzystając z deadline_timer.

  • Ustaw deadline_timer na wygaśnięcie 3 sekund od teraz.
  • Asynchronicznie czekać na deadline_timer. Program obsługi będzie gotowy do pracy 3 sekund od momentu ustawienia deadline_timer.
  • W asynchronicznym programie obsługi sprawdź bieżący czas względem daty wygaśnięcia licznika. Jeśli jest większa niż 2 sekund, tworzona jest kopia zapasowa kolejki io_service, więc dodaj wątek do puli wątków.

Przykład:

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 

class thread_pool_checker 
    : private boost::noncopyable 
{ 
public: 

    thread_pool_checker(boost::asio::io_service& io_service, 
         boost::thread_group& threads, 
         unsigned int max_threads, 
         long threshold_seconds, 
         long periodic_seconds) 
    : io_service_(io_service), 
     timer_(io_service), 
     threads_(threads), 
     max_threads_(max_threads), 
     threshold_seconds_(threshold_seconds), 
     periodic_seconds_(periodic_seconds) 
    { 
     schedule_check(); 
    } 

private: 

    void schedule_check(); 
    void on_check(const boost::system::error_code& error); 

private: 

    boost::asio::io_service& io_service_; 
    boost::asio::deadline_timer timer_; 
    boost::thread_group&  threads_; 
    unsigned int    max_threads_; 
    long      threshold_seconds_; 
    long      periodic_seconds_; 
}; 

void thread_pool_checker::schedule_check() 
{ 
    // Thread pool is already at max size. 
    if (max_threads_ <= threads_.size()) 
    { 
    std::cout << "Thread pool has reached its max. Example will shutdown." 
       << std::endl; 
    io_service_.stop(); 
    return; 
    } 

    // Schedule check to see if pool needs to increase. 
    std::cout << "Will check if pool needs to increase in " 
      << periodic_seconds_ << " seconds." << std::endl; 
    timer_.expires_from_now(boost::posix_time::seconds(periodic_seconds_)); 
    timer_.async_wait( 
    boost::bind(&thread_pool_checker::on_check, this, 
       boost::asio::placeholders::error)); 
} 

void thread_pool_checker::on_check(const boost::system::error_code& error) 
{ 
    // On error, return early. 
    if (error) return; 

    // Check how long this job was waiting in the service queue. This 
    // returns the expiration time relative to now. Thus, if it expired 
    // 7 seconds ago, then the delta time is -7 seconds. 
    boost::posix_time::time_duration delta = timer_.expires_from_now(); 
    long wait_in_seconds = -delta.seconds(); 

    // If the time delta is greater than the threshold, then the job 
    // remained in the service queue for too long, so increase the 
    // thread pool. 
    std::cout << "Job job sat in queue for " 
      << wait_in_seconds << " seconds." << std::endl; 
    if (threshold_seconds_ < wait_in_seconds) 
    { 
    std::cout << "Increasing thread pool." << std::endl; 
    threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, 
        &io_service_)); 
    } 

    // Otherwise, schedule another pool check. 
    run(); 
} 

// Busy work functions. 
void busy_work(boost::asio::io_service&, 
       unsigned int); 

void add_busy_work(boost::asio::io_service& io_service, 
        unsigned int count) 
{ 
    io_service.post(
    boost::bind(busy_work, 
       boost::ref(io_service), 
       count)); 
} 

void busy_work(boost::asio::io_service& io_service, 
       unsigned int count) 
{ 
    boost::this_thread::sleep(boost::posix_time::seconds(5)); 

    count += 1; 

    // When the count is 3, spawn additional busy work. 
    if (3 == count) 
    { 
    add_busy_work(io_service, 0); 
    } 
    add_busy_work(io_service, count); 
} 

int main() 
{ 
    using boost::asio::ip::tcp; 

    // Create io service. 
    boost::asio::io_service io_service; 

    // Add some busy work to the service. 
    add_busy_work(io_service, 0); 

    // Create thread group and thread_pool_checker. 
    boost::thread_group threads; 
    thread_pool_checker checker(io_service, threads, 
           3, // Max pool size. 
           2, // Create thread if job waits for 2 sec. 
           3); // Check if pool needs to grow every 3 sec. 

    // Start running the io service. 
    io_service.run(); 

    threads.join_all(); 

    return 0; 
} 

wyjściowa:

Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 7 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 4 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 3 seconds. 
Increasing thread pool. 
Thread pool has reached its max. Example will shutdown.
+1

Jeśli dobrze rozumiem, zadania busy_work może czekać w kolejce do sekundy, jak również basenu sprawdzający, nawet jeśli maksymalna liczba wątek nie był osiągnięte, ponieważ nowe wątki nie są tworzone z wyprzedzeniem. To sprawia, że ​​ta zasada jest mało użyteczna, ponieważ funkcja bycia dynamicznym nie powinna tak bardzo obniżać wydajności. Powinno to spowodować wydłużenie czasu wykonywania zadań tylko o czas potrzebny na utworzenie nowego wątku w porównaniu do czasu potrzebnego puli statycznej. Mimo wszystko dziekuję. – boqapt

+0

@ user484936: Twoje zrozumienie jest poprawne.Wzrost puli następuje po wykryciu degradacji; jest to jedno z prostszych podejść do łączenia i nie powinno "obniżać wydajności". Jeśli chcesz przydzielić wątki _, gdy wiesz, że są one potrzebne, musisz zarządzać stanem wątków, wprowadzając obciążenie dla wszystkich wątków i może wymagać, aby logika stanów była rozproszona w całym kodzie. Jeśli chcesz przydzielić wątki _as, musisz przewidzieć, że będą one potrzebne, a następnie mieć dedykowany wątek, który umieści zadanie w usłudze, a następnie wykonaj oczekiwanie czasowe na odpowiedź. –

+0

Zastanawiam się, co dzieje się w scenariuszu, w którym jest wykonywane tylko jedno długo działające zadanie i niepotrzebnie dodajemy wątek do puli po uruchomieniu naszego timera. Jeśli w danym momencie nie ma więcej zdarzeń do przetworzenia, podejście to wydaje mi się nieefektywne. Proszę, popraw mnie jeśli się mylę. – russoue

Powiązane problemy