Próbuję utworzyć prosty obiekt puli, który chciałbym mniej lub bardziej sprawiedliwie przydzielić dostęp do zestawu zasobów udostępnionych wszystkim wątkom, które o to proszą. W oknach zwykle mam tablicę Mutexów i robię WaitForMultipleObjects, z bWaitAll = FALSE (zobacz windows_pool_of_n_t poniżej). Ale mam nadzieję, że pewnego dnia będę mógł przenieść to na inne systemy operacyjne, więc chciałbym trzymać się standardu. Zbiór zasobów, ze zmienną condition_variable na size()! = 0, wydawał się oczywistym rozwiązaniem (patrz puli_n_t poniżej).Dlaczego std :: condition_variable powoduje, że planowanie jest niesprawiedliwe?
Ale z przyczyn, których nie rozumiem, ten kod serializuje dostęp do wątku. Nie oczekuję ścisłej uczciwości, ale jest to najgorszy z możliwych przypadek - wątek, który ostatni raz miał blokadę, zawsze ma następną blokadę. Nie chodzi o to, że std :: mutex nie jest zgodny z bardziej lub mniej systemowym harmonogramem systemu Windows, ponieważ użycie tylko muteksu bez zmiennej warunkowej działa zgodnie z oczekiwaniami, chociaż tylko dla puli jednej, oczywiście (patrz puli_na_t poniżej).
Czy ktoś może to wyjaśnić? Czy istnieje sposób obejścia tego?
wyniki:
C:\temp\stdpool>bin\stdpool.exe
pool:pool_of_one_t
thread 0:19826 ms
thread 1:19846 ms
thread 2:19866 ms
thread 3:19886 ms
thread 4:19906 ms
thread 5:19926 ms
thread 6:19946 ms
thread 7:19965 ms
thread 8:19985 ms
thread 9:20004 ms
pool:windows_pool_of_n_t(1)
thread 0:19819 ms
thread 1:19838 ms
thread 2:19858 ms
thread 3:19878 ms
thread 4:19898 ms
thread 5:19918 ms
thread 6:19938 ms
thread 7:19958 ms
thread 8:19978 ms
thread 9:19997 ms
pool:pool_of_n_t(1)
thread 9:3637 ms
thread 0:4538 ms
thread 6:7558 ms
thread 4:9779 ms
thread 8:9997 ms
thread 2:13058 ms
thread 1:13997 ms
thread 3:17076 ms
thread 5:17995 ms
thread 7:19994 ms
pool:windows_pool_of_n_t(2)
thread 1:9919 ms
thread 0:9919 ms
thread 2:9939 ms
thread 3:9939 ms
thread 5:9958 ms
thread 4:9959 ms
thread 6:9978 ms
thread 7:9978 ms
thread 9:9997 ms
thread 8:9997 ms
pool:pool_of_n_t(2)
thread 2:6019 ms
thread 0:7882 ms
thread 4:8102 ms
thread 5:8182 ms
thread 1:8382 ms
thread 8:8742 ms
thread 7:9162 ms
thread 9:9641 ms
thread 3:9802 ms
thread 6:10201 ms
pool:windows_pool_of_n_t(5)
thread 4:3978 ms
thread 3:3978 ms
thread 2:3979 ms
thread 0:3980 ms
thread 1:3980 ms
thread 9:3997 ms
thread 7:3999 ms
thread 6:3999 ms
thread 5:4000 ms
thread 8:4001 ms
pool:pool_of_n_t(5)
thread 2:3080 ms
thread 0:3498 ms
thread 8:3697 ms
thread 3:3699 ms
thread 6:3797 ms
thread 7:3857 ms
thread 1:3978 ms
thread 4:4039 ms
thread 9:4057 ms
thread 5:4059 ms
kod:
#include <iostream>
#include <deque>
#include <vector>
#include <mutex>
#include <thread>
#include <sstream>
#include <chrono>
#include <iomanip>
#include <cassert>
#include <condition_variable>
#include <windows.h>
using namespace std;
class pool_t {
public:
virtual void check_in(int size) = 0;
virtual int check_out() = 0;
virtual string pool_name() = 0;
};
class pool_of_one_t : public pool_t {
mutex lock;
public:
virtual void check_in(int resource) {
lock.unlock();
}
virtual int check_out() {
lock.lock();
return 0;
}
virtual string pool_name() {
return "pool_of_one_t";
}
};
class windows_pool_of_n_t : public pool_t {
vector<HANDLE> resources;
public:
windows_pool_of_n_t(int size) {
for (int i=0; i < size; ++i)
resources.push_back(CreateMutex(NULL, FALSE, NULL));
}
~windows_pool_of_n_t() {
for (auto resource : resources)
CloseHandle(resource);
}
virtual void check_in(int resource) {
ReleaseMutex(resources[resource]);
}
virtual int check_out() {
DWORD result = WaitForMultipleObjects(resources.size(),
resources.data(), FALSE, INFINITE);
assert(result >= WAIT_OBJECT_0
&& result < WAIT_OBJECT_0+resources.size());
return result - WAIT_OBJECT_0;
}
virtual string pool_name() {
ostringstream name;
name << "windows_pool_of_n_t(" << resources.size() << ")";
return name.str();
}
};
class pool_of_n_t : public pool_t {
deque<int> resources;
mutex lock;
condition_variable not_empty;
public:
pool_of_n_t(int size) {
for (int i=0; i < size; ++i)
check_in(i);
}
virtual void check_in(int resource) {
unique_lock<mutex> resources_guard(lock);
resources.push_back(resource);
resources_guard.unlock();
not_empty.notify_one();
}
virtual int check_out() {
unique_lock<mutex> resources_guard(lock);
not_empty.wait(resources_guard,
[this](){return resources.size() > 0;});
auto resource = resources.front();
resources.pop_front();
bool notify_others = resources.size() > 0;
resources_guard.unlock();
if (notify_others)
not_empty.notify_one();
return resource;
}
virtual string pool_name() {
ostringstream name;
name << "pool_of_n_t(" << resources.size() << ")";
return name.str();
}
};
void worker_thread(int id, pool_t& resource_pool)
{
auto start_time = chrono::system_clock::now();
for (int i=0; i < 100; ++i) {
auto resource = resource_pool.check_out();
this_thread::sleep_for(chrono::milliseconds(20));
resource_pool.check_in(resource);
this_thread::yield();
}
static mutex cout_lock;
{
unique_lock<mutex> cout_guard(cout_lock);
cout << "thread " << id << ":"
<< chrono::duration_cast<chrono::milliseconds>(
chrono::system_clock::now() - start_time).count()
<< " ms" << endl;
}
}
void test_it(pool_t& resource_pool)
{
cout << "pool:" << resource_pool.pool_name() << endl;
vector<thread> threads;
for (int i=0; i < 10; ++i)
threads.push_back(thread(worker_thread, i, ref(resource_pool)));
for (auto& thread : threads)
thread.join();
}
int main(int argc, char* argv[])
{
test_it(pool_of_one_t());
test_it(windows_pool_of_n_t(1));
test_it(pool_of_n_t(1));
test_it(windows_pool_of_n_t(2));
test_it(pool_of_n_t(2));
test_it(windows_pool_of_n_t(5));
test_it(pool_of_n_t(5));
return 0;
}
To może być trudne pytanie. Mam na myśli łatwą odpowiedź: 'condition_variable' nie daje takich gwarancji. Trudną odpowiedzią jest dokładnie to, jak źle jest, zakładając, że nie zrobiłeś oczywistych oops w powyższym kodzie. – Yakk
Nie widzę żadnych widocznych wiadomości. Podejrzewam, że jest to spowodowane nieznacznie różnymi interakcjami między 'this_thread :: yield()' i dwoma różnymi ścieżkami mutex. W Linuksie, spodziewam się, że twój kod będzie się odpowiednio planował. Zwróć uwagę, że Standard mówi o 'yield' jako * tylko o możliwości zmiany harmonogramu * ale szczegóły są specyficzne dla systemu operacyjnego. Interesującym eksperymentem może być próba zastąpienia 'this_thread :: yield();' z 'this_thread :: sleep_for (chrono :: nanoseconds (1));'. Spowoduje to utratę priorytetu wątku w kolejce planowania i prawdopodobnie wyeliminuje różnice w systemie Windows. –
Ani sleep_for(), ani Win32's :: Sleep() nie dają lepszego planowania. Wygląda na to, że komentarz Yaak jest odpowiedzią - standard tego nie obiecuje i nie powinienem próbować na niej polegać. –