2013-05-31 11 views
9

Szukam klasy Python (najlepiej część standardowego języka, a nie biblioteki trzeciej), aby zarządzać asynchronicznym przesyłaniem wiadomości w stylu rozgłaszania.Potrzebuję asynchronicznej kolejki komunikatów bezpiecznej dla wątków

Będę miał jeden wątek, który umieszcza komunikaty w kolejce (metoda "putMessageOnQueue" nie może blokować), a następnie wiele innych wątków, które będą wszystkie czekały na wiadomości, przypuszczalnie nazywając niektóre blokujące funkcję "waitForMessage". Gdy wiadomość jest umieszczona w kolejce, chcę, aby każdy z oczekujących wątków otrzymał własną kopię wiadomości.

Spojrzałem na wbudowaną klasę Queue, ale nie sądzę, że jest to odpowiednie, ponieważ zużywanie wiadomości wydaje się wymagać usunięcia ich z kolejki, więc tylko jeden wątek klienta zobaczyłby każdy z nich.

Wygląda na to, że powinien to być typowy przypadek, czy ktoś może polecić rozwiązanie?

+0

I wierzę, że możesz zbudować własną klasę, która śledzi, które wątki otrzymały wiadomość, bez wielu problemów. – Bakuriu

Odpowiedz

7

Myślę, że typowym podejściem do tego jest użycie osobnej kolejki komunikatów dla każdego wątku i wepchnięcie komunikatu do każdej kolejki, która wcześniej zarejestrowała zainteresowanie otrzymywaniem takich wiadomości.

Coś jak to powinno działać, ale to jest kod niesprawdzone ...

from time import sleep 
from threading import Thread 
from Queue import Queue 

class DispatcherThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(DispatcherThread, self).__init__(*args, **kwargs) 
     self.interested_threads = [] 

    def run(self): 
     while 1: 
      if some_condition: 
       self.dispatch_message(some_message) 
      else: 
       sleep(0.1) 

    def register_interest(self, thread): 
     self.interested_threads.append(thread) 

    def dispatch_message(self, message): 
     for thread in self.interested_threads: 
      thread.put_message(message) 



class WorkerThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(WorkerThread, self).__init__(*args, **kwargs) 
     self.queue = Queue() 


    def run(self): 

     # Tell the dispatcher thread we want messages 
     dispatcher_thread.register_interest(self) 

     while 1: 
      # Wait for next message 
      message = self.queue.get() 

      # Process message 
      # ... 

    def put_message(self, message): 
     self.queue.put(message) 


dispatcher_thread = DispatcherThread() 
dispatcher_thread.start() 

worker_threads = [] 
for i in range(10): 
    worker_thread = WorkerThread() 
    worker_thread.start() 
    worker_threads.append(worker_thread) 

dispatcher_thread.join() 
+0

Idealny, który działa świetnie! Szkoda, że ​​nie ma gotowej wersji, ale myślę, że zasada nie jest tak skomplikowana, kiedy ktoś wyjaśnia to wyraźnie (tak jak ty). – codebox

+0

@codebox Cóż, jest lepsze wsparcie w module ['multiprocessing'] (http://docs.python.org/2/library/multiprocessing.html), ale dotyczy to raczej podprocesów niż wątków. Chyba dlatego, że komunikacja między procesami jest zwykle bardziej skomplikowana niż komunikacja między wątkami, ponieważ wątki w naturalny sposób dzielą tę samą stertę. – Aya

2

myślę, że to jest bardziej prosty przykład do przodu (przykład wzięty z kolejki w Python Lib)

from threading import Thread 
from Queue import Queue 


num_worker_threads = 2 

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join()  # block until all tasks are done 
+0

Jak to spełnia wymagania pytania? Wyraźnie powiedział, że kolejka nie działa, ponieważ każdy wątek potrzebuje kopii przedmiotu. – Wlerin

Powiązane problemy