2015-12-29 9 views
7

Say Mam serwer PUB że zmq_send() „s czasie rzeczywistym wiadomości do SUB klienta. Jeśli klient jest zajęty i nie może wystarczająco szybko komunikować się, wiadomości będą buforowane w kliencie (i/lub serwerze).Howto make zeromq PUB/SUB zrzuca stare wiadomości zamiast nowych (dla kanałów w czasie rzeczywistym)?

Jeśli bufor będzie zbyt duży (wysoki znak wody), wówczas NOWA wiadomość zostanie upuszczona. W przypadku wiadomości w czasie rzeczywistym jest to przeciwieństwo tego, czego się chce. Stare wiadomości powinny zostać usunięte, aby znaleźć miejsce dla NOWYCH.

Czy jest jakiś sposób, aby to zrobić?

Idealnie chciałbym, aby kolejka odbiorcza klienta SUB była pusta lub zawierała tylko najnowszą wiadomość. Po otrzymaniu nowej wiadomości zastąpi starą. (Domyślam się, że problem polegałby na tym, że klient blokowałby na zmq_recv(), gdy kolejka jest pusta, marnując czas.)

W jaki sposób kanały informacyjne w czasie rzeczywistym są zwykle realizowane w ZeroMQ?

+3

Ill odpowiedzieć na moje własne pytanie tutaj. Jest ustawienie dla zmq_setsockopt ZMQ_CONFLATE: Zachowaj tylko ostatnią wiadomość. –

+0

ZMQ_CONFLATE działa, ale nie działają dobrze z filtrami. Jeśli klient SUB subskrybuje dwa filtry. Wtedy byłoby rozsądnie oczekiwać, że kolejka będzie w stanie zawierać najnowszą wiadomość obu typów. To jednak nie działa. Tylko jedna wiadomość będzie kiedykolwiek w kolejce. Drugi będzie zagubiony. Jednym rozwiązaniem jest utworzenie dwóch gniazd PUB, a zatem dwóch kolejek, po jednej dla każdego typu filtra. A następnie wyślij różne wiadomości na różne gniazda. –

+0

Powyższe obejście prawdopodobnie nie jest dobrym pomysłem. Jeśli wymagane jest jedno gniazdo na filtr, można łatwo uzyskać tysiące gniazd. W systemie Windows FD_SETSIZE ma wartość 64. Ta zmienna ma coś wspólnego z "Maksymalna liczba obsługiwanych gniazd". –

Odpowiedz

5

Odpowiem tutaj na moje własne pytanie. Ustawienie ZMQ_CONFLATE "Zachowaj tylko ostatnią wiadomość" wydawało się obiecujące, ale nie działa z filtrami subskrypcji. Zawiera tylko jedną wiadomość w kolejce. Jeśli masz więcej niż jeden filtr, zarówno stare, jak i nowe wiadomości z innego typu filtrów zostaną odrzucone.

Podobnie jak zalecenie przewodnika zeromq, aby po prostu zabić powolnych subskrybentów, ale to nie wydaje się realistyczne rozwiązanie. Posiadanie subskrybentów o różnych prędkościach odczytu, subskrybowanych do tego samego szybkiego wydawcy, powinno być normalnym przypadkiem użycia. Niektórzy z tych subskrybentów mogą żyć na wolnych komputerach, inni na szybkich itp. ZeroMQ powinien jakoś sobie z tym poradzić.

http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern

skończyło się robi ręczne zrzucenie starej kolejce komunikatów po stronie klienta. Wygląda na to, że działa dobrze. Dostaję subskrybowane wiadomości do klienta, które są mniej niż 3ms stare (przez tcp localhost) w ten sposób. Działa to nawet w przypadkach, gdy mam pięć tysięcy, 10 sekund starych wiadomości, w kolejce przed tymi kilkoma komunikatami w czasie rzeczywistym z tyłu. To mi wystarczy.

Nie mogę pomóc, ale uważam, że jest to coś, co powinna dostarczyć biblioteka. Prawdopodobnie mogłaby to zrobić lepiej.

Anyways tutaj jest po stronie klienta, stara wiadomość upuszczanie, kod:

bool Empty(zmq::socket_t& socket) { 
    bool ret = true; 
    zmq::pollitem_t poll_item = { socket, 0, ZMQ_POLLIN, 0 }; 
    zmq::poll(&poll_item, 1, 0); //0 = no wait 
    if (poll_item.revents & ZMQ_POLLIN) { 
     ret = false; 
    } 
    return ret; 
} 

std::vector<std::string> GetRealtimeSubscribedMessageVec(zmq::socket_t& socket_sub, int timeout_ms) 
{ 
    std::vector<std::string> ret; 

    struct MessageTmp { 
     int id_ = 0; 
     std::string data_; 
     boost::posix_time::ptime timestamp_; 
    }; 

    std::map<int, MessageTmp> msg_map; 

    int read_msg_count = 0; 
    int time_in_loop = 0; 
    auto start_of_loop = boost::posix_time::microsec_clock::universal_time(); 
    do { 
     read_msg_count++; 

     //msg format sent by publisher is: filter, timestamp, data 
     MessageTmp msg; 
     msg.id_ = boost::lexical_cast<int>(s_recv(socket_sub)); 
     msg.timestamp_ = boost::posix_time::time_from_string(s_recv(socket_sub)); 
     msg.data_ = s_recv(socket_sub); 

     msg_map[msg.id_] = msg; 

     auto now = boost::posix_time::microsec_clock::universal_time(); 
     time_in_loop = (now - start_of_loop).total_milliseconds(); 
     if (time_in_loop > timeout_ms) { 
      std::cerr << "Timeout reached. Publisher is probably sending messages quicker than we can drop them." << std::endl; 
      break; 
     } 
    } while ((Empty(socket_sub) == false)); 

    if (read_msg_count > 1) { 
     std::cout << "num of old queued up messages dropped: " << (read_msg_count - 1) << std::endl; 
    } 

    for (const auto &pair: msg_map) { 
     const auto& msg_tmp = pair.second; 

     auto now = boost::posix_time::microsec_clock::universal_time(); 
     auto message_age_ms = (now - msg_tmp.timestamp_).total_milliseconds(); 

     if (message_age_ms > timeout_ms) { 
      std::cerr << "[SUB] Newest message too old. f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl; 
     } 
     else { 
      std::cout << "[SUB] f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl; 
      ret.push_back(msg_tmp.data_); 
     } 
    } 

    return ret; 
} 
Powiązane problemy