Odpowiem tutaj na moje własne pytanie. Ustawienie ZMQ_CONFLATE "Zachowaj tylko ostatnią wiadomość" wydawało się obiecujące, ale nie działa z filtrami subskrypcji. Zawiera tylko jedną wiadomość w kolejce. Jeśli masz więcej niż jeden filtr, zarówno stare, jak i nowe wiadomości z innego typu filtrów zostaną odrzucone.
Podobnie jak zalecenie przewodnika zeromq, aby po prostu zabić powolnych subskrybentów, ale to nie wydaje się realistyczne rozwiązanie. Posiadanie subskrybentów o różnych prędkościach odczytu, subskrybowanych do tego samego szybkiego wydawcy, powinno być normalnym przypadkiem użycia. Niektórzy z tych subskrybentów mogą żyć na wolnych komputerach, inni na szybkich itp. ZeroMQ powinien jakoś sobie z tym poradzić.
http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern
skończyło się robi ręczne zrzucenie starej kolejce komunikatów po stronie klienta. Wygląda na to, że działa dobrze. Dostaję subskrybowane wiadomości do klienta, które są mniej niż 3ms stare (przez tcp localhost) w ten sposób. Działa to nawet w przypadkach, gdy mam pięć tysięcy, 10 sekund starych wiadomości, w kolejce przed tymi kilkoma komunikatami w czasie rzeczywistym z tyłu. To mi wystarczy.
Nie mogę pomóc, ale uważam, że jest to coś, co powinna dostarczyć biblioteka. Prawdopodobnie mogłaby to zrobić lepiej.
Anyways tutaj jest po stronie klienta, stara wiadomość upuszczanie, kod:
bool Empty(zmq::socket_t& socket) {
bool ret = true;
zmq::pollitem_t poll_item = { socket, 0, ZMQ_POLLIN, 0 };
zmq::poll(&poll_item, 1, 0); //0 = no wait
if (poll_item.revents & ZMQ_POLLIN) {
ret = false;
}
return ret;
}
std::vector<std::string> GetRealtimeSubscribedMessageVec(zmq::socket_t& socket_sub, int timeout_ms)
{
std::vector<std::string> ret;
struct MessageTmp {
int id_ = 0;
std::string data_;
boost::posix_time::ptime timestamp_;
};
std::map<int, MessageTmp> msg_map;
int read_msg_count = 0;
int time_in_loop = 0;
auto start_of_loop = boost::posix_time::microsec_clock::universal_time();
do {
read_msg_count++;
//msg format sent by publisher is: filter, timestamp, data
MessageTmp msg;
msg.id_ = boost::lexical_cast<int>(s_recv(socket_sub));
msg.timestamp_ = boost::posix_time::time_from_string(s_recv(socket_sub));
msg.data_ = s_recv(socket_sub);
msg_map[msg.id_] = msg;
auto now = boost::posix_time::microsec_clock::universal_time();
time_in_loop = (now - start_of_loop).total_milliseconds();
if (time_in_loop > timeout_ms) {
std::cerr << "Timeout reached. Publisher is probably sending messages quicker than we can drop them." << std::endl;
break;
}
} while ((Empty(socket_sub) == false));
if (read_msg_count > 1) {
std::cout << "num of old queued up messages dropped: " << (read_msg_count - 1) << std::endl;
}
for (const auto &pair: msg_map) {
const auto& msg_tmp = pair.second;
auto now = boost::posix_time::microsec_clock::universal_time();
auto message_age_ms = (now - msg_tmp.timestamp_).total_milliseconds();
if (message_age_ms > timeout_ms) {
std::cerr << "[SUB] Newest message too old. f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
}
else {
std::cout << "[SUB] f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
ret.push_back(msg_tmp.data_);
}
}
return ret;
}
Ill odpowiedzieć na moje własne pytanie tutaj. Jest ustawienie dla zmq_setsockopt ZMQ_CONFLATE: Zachowaj tylko ostatnią wiadomość. –
ZMQ_CONFLATE działa, ale nie działają dobrze z filtrami. Jeśli klient SUB subskrybuje dwa filtry. Wtedy byłoby rozsądnie oczekiwać, że kolejka będzie w stanie zawierać najnowszą wiadomość obu typów. To jednak nie działa. Tylko jedna wiadomość będzie kiedykolwiek w kolejce. Drugi będzie zagubiony. Jednym rozwiązaniem jest utworzenie dwóch gniazd PUB, a zatem dwóch kolejek, po jednej dla każdego typu filtra. A następnie wyślij różne wiadomości na różne gniazda. –
Powyższe obejście prawdopodobnie nie jest dobrym pomysłem. Jeśli wymagane jest jedno gniazdo na filtr, można łatwo uzyskać tysiące gniazd. W systemie Windows FD_SETSIZE ma wartość 64. Ta zmienna ma coś wspólnego z "Maksymalna liczba obsługiwanych gniazd". –