2013-05-01 12 views
9

Zaimplementowałam pewien wzór oparty na module Active Object. Jest to bardzo prosta implementacja. Mam Scheduler, ActivationList, Requests i Futures, aby uzyskać odpowiedź. Moje wymagania były tak:doładowanie :: asio i obiekt aktywny

  • Dostęp do aktywnego obiektu powinny być szeregowane wykonując swoje metody we własnym wątku (główny req i założenie Active Object wzorca projektowego)
  • rozmówcy muszą być w stanie określić priorytet realizacji żądań. Oznacza to, że jeśli jest więcej niż zero żądań oczekujących na wykonanie, będą one uporządkowane według priorytetu przypisanego do każdego żądania. Żądania o wyższym priorytecie będą wykonywane najpierw, więc jeśli będą zawsze jakieś żądania oczekujące na listę aktywacji i będą miały wyższy priorytet niż dane żądania, żądanie to nigdy nie zostanie zrealizowane - będzie to dla mnie OK. określić maksymalną liczbę żądań oczekujących na liście (ograniczyć wykorzystanie pamięci).
  • Możliwe jest unieważnienie wszystkich oczekujących żądań.
  • Żądania powinny mieć możliwość zwracania wartości (blokowanie dzwoniącego) LUB po prostu będą wykonywane bez wartości powrót, ale osoba dzwoniąca zostanie zablokowana do czasu przetworzenia żądania LUB osoba dzwoniąca nie zostanie zablokowana i nie jest dla niej ważna, jeśli dany wniosek został przetworzony lub nie, g
  • Tuż przed wykonaniem żądania, zostanie wykonana pewna metoda wartownika w celu sprawdzenia, czy dane żądanie zostanie wykonane, czy nie. Jeśli nie - należy przez to zwrócić jakąś nieokreśloną wartość do rozmówcy (w moim bieżącej realizacji jest boost :: nikogo, ponieważ każdy wniosek typ zwracany jest boost :: opcjonalnie)

OK teraz pytanie: Czy jest możliwe aby użyć boost :: asio i spełnić wszystkie moje wymagania? Moja implementacja działa, ale chciałbym użyć czegoś, co prawdopodobnie zostało wdrożone w znacznie lepszy sposób, niż to zrobiłem. Chciałbym również poznać to na przyszłość i nie "ponownie wymyślać koła" po raz kolejny.

+0

Boost ASIO nie zablokuje. Ostatnia część twojego drugiego do ostatniego jest objęta ostatnim stwierdzeniem. wszystko inne jest całkowicie w stanie zrobić w regularnym C++ bez dopalania, choć, wprawdzie, łatwiejsze z nim. Może chcieć sprawdzić serię bodźców, jeśli jeszcze jej nie używasz. – johnathon

+0

Wcześniej zaimplementowałem go za pomocą zwykłego C++. Właściwie dzięki wielkiej pomocy ulepszenia wątku i wzmocnienia multi-conatiner. Ale celem jest nie używać mojej ipmplementacji, a zamiast niej użyj boost :: asio. – user2301299

Odpowiedz

28

Funkcja Boost.Asio może być używana w celu objęcia intencji Active Object: odłączyć wykonanie metody od wywołania metody. Dodatkowe wymagania będą musiały być obsługiwane na wyższym poziomie, ale nie jest zbyt skomplikowane, gdy korzystasz z Boost.Asio w połączeniu z innymi bibliotekami Boost.

Scheduler może używać:

ActivationList mogą być realizowane jako:

  • Boost.MultiIndex celu uzyskania najwyższego priorytet żądania metody. W pozycji z podpowiedziami insert() kolejność reklamowa jest zachowywana na żądanie o tym samym priorytecie. Można użyć
  • std::multiset lub std::multimap. Jednak w C++ 03 nie jest określona kolejność żądań z tym samym kluczem (priorytetem).
  • Jeśli nie wymaga metody ochronnej, można użyć std::priority_queue.

Request może być typu określone: ​​

  • boost::functionboost::bind i mogą być wykorzystane w celu zapewnienia usunięcia typu, podczas gdy wiązanie z typów wywołalnych bez wprowadzania Request hierarchii.

Futures może korzystać z pomocy Boost.Thread's Futures.

  • future.valid() zwróci true jeśli Request został dodany do ActivationList.
  • future.wait() zablokuje czekanie, aż wynik będzie dostępny.
  • future.get() zablokuje oczekiwanie na wynik.
  • Jeśli dzwoniący nie zrobi nic z future, wówczas dzwoniący nie będzie blokowany.
  • Inną korzyścią wynikającą z zastosowania kontraktów futures w grze Boost.Thread jest to, że wyjątki pochodzące z numeru Request zostaną przesłane do Future.

Powyżej znajduje się pełna przykład wykorzystując różne biblioteki Boost i powinny spełniać wymagania:

// Standard includes 
#include <algorithm> // std::find_if 
#include <iostream> 
#include <string> 

// 3rd party includes 
#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/function.hpp> 
#include <boost/make_shared.hpp> 
#include <boost/multi_index_container.hpp> 
#include <boost/multi_index/ordered_index.hpp> 
#include <boost/multi_index/member.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/thread.hpp> 
#include <boost/utility/result_of.hpp> 

/// @brief scheduler that provides limits with prioritized jobs. 
template <typename Priority, 
      typename Compare = std::less<Priority> > 
class scheduler 
{ 
public: 
    typedef Priority priority_type; 
private: 

    /// @brief method_request is used to couple the guard and call 
    ///  functions for a given method. 
    struct method_request 
    { 
    typedef boost::function<bool()> ready_func_type; 
    typedef boost::function<void()> run_func_type; 

    template <typename ReadyFunctor, 
       typename RunFunctor> 
    method_request(ReadyFunctor ready, 
        RunFunctor run) 
     : ready(ready), 
     run(run) 
    {} 

    ready_func_type ready; 
    run_func_type run; 
    }; 

    /// @brief Pair type used to associate a request with its priority. 
    typedef std::pair<priority_type, 
        boost::shared_ptr<method_request> > pair_type; 

    static bool is_method_ready(const pair_type& pair) 
    { 
    return pair.second->ready(); 
    } 

public: 

    /// @brief Construct scheduler. 
    /// 
    /// @param max_threads Maximum amount of concurrent task. 
    /// @param max_request Maximum amount of request. 
    scheduler(std::size_t max_threads, 
      std::size_t max_request) 
    : work_(io_service_), 
     max_request_(max_request), 
     request_count_(0) 
    { 
    // Spawn threads, dedicating them to the io_service. 
    for (std::size_t i = 0; i < max_threads; ++i) 
     threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, &io_service_)); 
    } 

    /// @brief Destructor. 
    ~scheduler() 
    { 
    // Release threads from the io_service. 
    io_service_.stop(); 
    // Cleanup. 
    threads_.join_all(); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param priority Priority of job. 
    /// @param ready_func Invoked to check if method is ready to run. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename ReadyFunctor, 
      typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(priority_type priority, 
     const ReadyFunctor& ready_func, 
     const RunFunctor& run_func) 
    { 
    typedef typename boost::result_of<RunFunctor()>::type result_type; 
    typedef boost::unique_future<result_type> future_type; 

    boost::unique_lock<mutex_type> lock(mutex_); 

    // If max request has been reached, then return an invalid future. 
    if (max_request_ && 
     (request_count_ == max_request_)) 
     return future_type(); 

    ++request_count_; 

    // Use a packaged task to handle populating promise and future. 
    typedef boost::packaged_task<result_type> task_type; 

    // Bind does not work with rvalue, and packaged_task is only moveable, 
    // so allocate a shared pointer. 
    boost::shared_ptr<task_type> task = 
     boost::make_shared<task_type>(run_func); 

    // Create method request. 
    boost::shared_ptr<method_request> request = 
     boost::make_shared<method_request>(
     ready_func, 
     boost::bind(&task_type::operator(), task)); 

    // Insert into priority. Hint to inserting as close to the end as 
    // possible to preserve insertion order for request with same priority. 
    activation_list_.insert(activation_list_.end(), 
          pair_type(priority, request)); 

    // There is now an outstanding request, so post to dispatch. 
    io_service_.post(boost::bind(&scheduler::dispatch, this)); 

    return task->get_future(); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param ready_func Invoked to check if method is ready to run. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename ReadyFunctor, 
      typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(const ReadyFunctor& ready_func, 
     const RunFunctor& run_func) 
    { 
    return insert(priority_type(), ready_func, run_func); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param priority Priority of job. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(priority_type priority, 
     const RunFunctor& run_func) 
    { 
    return insert(priority, &always_ready, run_func); 
    } 

    /// @brief Insert a method request with default priority into the 
    ///  scheduler. 
    /// 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @param functor Job to run. 
    /// 
    /// @return future associated with the job. 
    template <typename RunFunc> 
    boost::unique_future<typename boost::result_of<RunFunc()>::type> 
    insert(const RunFunc& run_func) 
    { 
    return insert(&always_ready, run_func); 
    } 

    /// @brief Cancel all outstanding request. 
    void cancel() 
    { 
    boost::unique_lock<mutex_type> lock(mutex_); 
    activation_list_.clear(); 
    request_count_ = 0; 
    } 

private: 

    /// @brief Dispatch a request. 
    void dispatch() 
    { 
    // Get the current highest priority request ready to run from the queue. 
    boost::unique_lock<mutex_type> lock(mutex_); 
    if (activation_list_.empty()) return; 

    // Find the highest priority method ready to run. 
    typedef typename activation_list_type::iterator iterator; 
    iterator end = activation_list_.end(); 
    iterator result = std::find_if(
     activation_list_.begin(), end, &is_method_ready); 

    // If no methods are ready, then post into dispatch, as the 
    // method may have become ready. 
    if (end == result) 
    { 
     io_service_.post(boost::bind(&scheduler::dispatch, this)); 
     return; 
    } 

    // Take ownership of request. 
    boost::shared_ptr<method_request> method = result->second; 
    activation_list_.erase(result); 

    // Run method without mutex. 
    lock.unlock(); 
    method->run();  
    lock.lock(); 

    // Perform bookkeeping. 
    --request_count_; 
    } 

    static bool always_ready() { return true; } 

private: 

    /// @brief List of outstanding request. 
    typedef boost::multi_index_container< 
    pair_type, 
    boost::multi_index::indexed_by< 
     boost::multi_index::ordered_non_unique< 
     boost::multi_index::member<pair_type, 
            typename pair_type::first_type, 
            &pair_type::first>, 
     Compare 
     > 
    > 
    > activation_list_type; 
    activation_list_type activation_list_; 

    /// @brief Thread group managing threads servicing pool. 
    boost::thread_group threads_; 

    /// @brief io_service used to function as a thread pool. 
    boost::asio::io_service io_service_; 

    /// @brief Work is used to keep threads servicing io_service. 
    boost::asio::io_service::work work_; 

    /// @brief Maximum amount of request. 
    const std::size_t max_request_; 

    /// @brief Count of outstanding request. 
    std::size_t request_count_; 

    /// @brief Synchronize access to the activation list. 
    typedef boost::mutex mutex_type; 
    mutex_type mutex_; 
}; 

typedef scheduler<unsigned int, 
        std::greater<unsigned int> > high_priority_scheduler; 

/// @brief adder is a simple proxy that will delegate work to 
///  the scheduler. 
class adder 
{ 
public: 
    adder(high_priority_scheduler& scheduler) 
    : scheduler_(scheduler) 
    {} 

    /// @brief Add a and b with a priority. 
    /// 
    /// @return Return future result. 
    template <typename T> 
    boost::unique_future<T> add(
    high_priority_scheduler::priority_type priority, 
    const T& a, const T& b) 
    { 
    // Insert method request 
    return scheduler_.insert(
     priority, 
     boost::bind(&adder::do_add<T>, a, b)); 
    } 

    /// @brief Add a and b. 
    /// 
    /// @return Return future result. 
    template <typename T> 
    boost::unique_future<T> add(const T& a, const T& b) 
    { 
    return add(high_priority_scheduler::priority_type(), a, b); 
    } 

private: 

    /// @brief Actual add a and b. 
    template <typename T> 
    static T do_add(const T& a, const T& b) 
    { 
    std::cout << "Starting addition of '" << a 
       << "' and '" << b << "'" << std::endl; 
    // Mimic busy work. 
    boost::this_thread::sleep_for(boost::chrono::seconds(2)); 
    std::cout << "Finished addition" << std::endl; 
    return a + b; 
    } 

private: 
    high_priority_scheduler& scheduler_; 
}; 

bool get(bool& value) { return value; } 
void guarded_call() 
{ 
    std::cout << "guarded_call" << std::endl; 
} 

int main() 
{ 
    const unsigned int max_threads = 1; 
    const unsigned int max_request = 4; 

    // Sscheduler 
    high_priority_scheduler scheduler(max_threads, max_request); 

    // Proxy 
    adder adder(scheduler); 

    // Client 

    // Add guarded method to scheduler. 
    bool ready = false; 
    std::cout << "Add guarded method." << std::endl; 
    boost::unique_future<void> future1 = scheduler.insert(
    boost::bind(&get, boost::ref(ready)), 
    &guarded_call); 

    // Add 1 + 100 with default priority. 
    boost::unique_future<int> future2 = adder.add(1, 100); 

    // Force sleep to try to get scheduler to run request 2 first. 
    boost::this_thread::sleep_for(boost::chrono::seconds(1)); 

    // Add: 
    // 2 + 200 with low priority (5) 
    // "test" + "this" with high priority (99) 
    boost::unique_future<int> future3 = adder.add(5, 2, 200); 
    boost::unique_future<std::string> future4 = adder.add(99, 
    std::string("test"), std::string("this")); 

    // Max request should have been reached, so add another. 
    boost::unique_future<int> future5 = adder.add(3, 300); 

    // Check if request was added. 
    std::cout << "future1 is valid: " << future1.valid() 
      << "\nfuture2 is valid: " << future2.valid() 
      << "\nfuture3 is valid: " << future3.valid() 
      << "\nfuture4 is valid: " << future4.valid() 
      << "\nfuture5 is valid: " << future5.valid() 
      << std::endl; 

    // Get results for future2 and future3. Do nothing with future4's results. 
    std::cout << "future2 result: " << future2.get() 
      << "\nfuture3 result: " << future3.get() 
      << std::endl; 

    std::cout << "Unguarding method." << std::endl; 
    ready = true; 
    future1.wait(); 
} 

Realizacja wykorzystuje basen gwintu 1 z max 4 życzenie.

  • pytanie1 strzeże do końca programu i powinien być ostatni uruchomić.
  • request2 (1 + 100) jest wstawiany z domyślnym priorytetem i powinien być uruchamiany jako pierwszy.
  • request3 (2 + 200) ma niski priorytet i powinien zostać uruchomiony po request4.
  • request4 ('test' + 'this') jest wstawiany z wysokim priorytetem i powinien być uruchamiany przed request3.
  • Żądanie5 nie powinno się wstawiać z powodu żądania maks. I nie powinno być ważne.

Wyjście jest w następujący sposób:

Add guarded method. 
Starting addition of '1' and '100' 
future1 is valid: 1 
future2 is valid: 1 
future3 is valid: 1 
future4 is valid: 1 
future5 is valid: 0 
Finished addition 
Starting addition of 'test' and 'this' 
Finished addition 
Starting addition of '2' and '200' 
Finished addition 
future2 result: 101 
future3 result: 202 
Unguarding method. 
guarded_call
+1

Dziękuję za tę odpowiedź, żałuję, że nie mogę dać ci więcej niż 1 przegłosowania. – MrEvil

+0

Bardzo pomocny post, co doładowania brakuje to przypadki użycia, że ​​nie muszę przeszukiwać github/tak dla – arynaq

Powiązane problemy