2012-01-01 7 views
5

Próbuję napisać pozornie prostą implementację klasycznego producenta - konsumenta idiomu w Pythonie. Jest jeden porównywalnie szybki producent dla wielu wolniejszych konsumentów. Zasadniczo ta jest łatwa do wykonania przy użyciu modułu Queue, a dokumentacja biblioteki ma przykład, w którym pojawia się tylko kilka linii kodu.Producent/konsument Python z obsługą wyjątków

Jednak chcę również, aby kod działał poprawnie w przypadku wystąpienia wyjątków. Zarówno producent i wszyscy konsumenci powinni zatrzymać się w przypadku, gdy którykolwiek z następujących rzeczy zdarzyć:

  • producent nie z wyjątkiem
  • każdy konsument nie z wyjątkiem
  • użytkownik zatrzymuje program (powodując KeyboardInterrupt)

Po tym wszystkim proces powinien zakończyć się niepowodzeniem, podnosząc początkowy wyjątek, aby poinformować dzwoniącego o tym, co poszło nie tak.

Głównym wyzwaniem wydaje się oczyścić zakończenie wątku konsumenta bez kończenia się w złączeniu blokującym(). Wydaje się być popularnym, aby ustawić Thread.deamon = True, ale moim zdaniem to powoduje wyciek zasobów w przypadku, gdy producent zawiedzie z wyjątkiem.

Udało mi się napisać implementację spełniającą moje wymagania (patrz poniżej). Jednak Uważam kod za dużo bardziej złożony niż oczekiwano.

Czy istnieje lepszy sposób radzenia sobie z tym scenariuszem?

Oto kilka przykładowych połączeń i otrzymany końcowy komunikat dziennika z moim obecnym realizacji:

produkują i zużywają 10 pozycji:

$ python procon.py 
INFO:root:processed all items 

nie wytwarzają elementy:

$ python procon.py --items 0 
INFO:root:processed all items 

Wyprodukuj 5 artykułów dla 10 konsumentów, wykorzystując w ten sposób tylko niektórych dostępnych konsumentów:

$ python procon.py --items 5 --consumers 10 
INFO:root:processed all items 

przerwań przez naciśnięcie Control-C:

$ python procon.py 
^CWARNING:root:interrupted by user 

nie udały Punkt 3:

$ python procon.py --producer-fails-at 3 
ERROR:root:cannot produce item 3 

Fail spożywać Punkt 3:

$ python procon.py --consumer-fails-at 3 
ERROR:root:cannot consume item 3 

Fail zużywają ostatnia pozycja:

$ python procon.py --items 10 --consumer-fails-at 9 
ERROR:root:cannot consume item 9 

I tu jest chyba zbyt skomplikowane kod źródłowy:

""" 
Consumer/producer to test exception handling in threads. Both the producer 
and the consumer can be made to fail deliberately when processing a certain 
item using command line options. 
""" 
import logging 
import optparse 
import Queue 
import threading 
import time 

_PRODUCTION_DELAY = 0.1 
_CONSUMPTION_DELAY = 0.3 

# Delay for ugly hacks and polling loops. 
_HACK_DELAY = 0.05 

class _Consumer(threading.Thread): 
    """ 
    Thread to consume items from an item queue filled by a producer, which can 
    be told to terminate in two ways: 

    1. using `finish()`, which keeps processing the remaining items on the 
     queue until it is empty 
    2. using `cancel()`, which finishes consuming the current item and then 
     terminates 
    """ 
    def __init__(self, name, itemQueue, failedConsumers): 
     super(_Consumer, self).__init__(name=name) 
     self._log = logging.getLogger(name) 
     self._itemQueue = itemQueue 
     self._failedConsumers = failedConsumers 
     self.error = None 
     self.itemToFailAt = None 
     self._log.info(u"waiting for items to consume") 
     self._isFinishing = False 
     self._isCanceled = False 

    def finish(self): 
     self._isFinishing = True 

    def cancel(self): 
     self._isCanceled = True 

    def consume(self, item): 
     self._log.info(u"consume item %d", item) 
     if item == self.itemToFailAt: 
      raise ValueError("cannot consume item %d" % item) 
     time.sleep(_CONSUMPTION_DELAY) 

    def run(self): 
     try: 
      while not (self._isFinishing and self._itemQueue.empty()) \ 
        and not self._isCanceled: 
       # HACK: Use a timeout when getting the item from the queue 
       # because between `empty()` and `get()` another consumer might 
       # have removed it. 
       try: 
        item = self._itemQueue.get(timeout=_HACK_DELAY) 
        self.consume(item) 
       except Queue.Empty: 
        pass 
      if self._isCanceled: 
       self._log.info(u"canceled") 
      if self._isFinishing: 
       self._log.info(u"finished") 
     except Exception, error: 
      self._log.error(u"cannot continue to consume: %s", error) 
      self.error = error 
      self._failedConsumers.put(self) 


class Worker(object): 
    """ 
    Controller for interaction between producer and consumers. 
    """ 
    def __init__(self, itemsToProduceCount, itemProducerFailsAt, 
      itemConsumerFailsAt, consumerCount): 
     self._itemsToProduceCount = itemsToProduceCount 
     self._itemProducerFailsAt = itemProducerFailsAt 
     self._itemConsumerFailsAt = itemConsumerFailsAt 
     self._consumerCount = consumerCount 
     self._itemQueue = Queue.Queue() 
     self._failedConsumers = Queue.Queue() 
     self._log = logging.getLogger("producer") 
     self._consumers = [] 

    def _possiblyRaiseConsumerError(self): 
      if not self._failedConsumers.empty(): 
       failedConsumer = self._failedConsumers.get() 
       self._log.info(u"handling failed %s", failedConsumer.name) 
       raise failedConsumer.error 

    def _cancelAllConsumers(self): 
     self._log.info(u"canceling all consumers") 
     for consumerToCancel in self._consumers: 
      consumerToCancel.cancel() 
     self._log.info(u"waiting for consumers to be canceled") 
     for possiblyCanceledConsumer in self._consumers: 
      # In this case, we ignore possible consumer errors because there 
      # already is an error to report. 
      possiblyCanceledConsumer.join(_HACK_DELAY) 
      if possiblyCanceledConsumer.isAlive(): 
       self._consumers.append(possiblyCanceledConsumer) 

    def work(self): 
     """ 
     Launch consumer thread and produce items. In case any consumer or the 
     producer raise an exception, fail by raising this exception 
     """ 
     self.consumers = [] 
     for consumerId in range(self._consumerCount): 
      consumerToStart = _Consumer(u"consumer %d" % consumerId, 
       self._itemQueue, self._failedConsumers) 
      self._consumers.append(consumerToStart) 
      consumerToStart.start() 
      if self._itemConsumerFailsAt is not None: 
       consumerToStart.itemToFailAt = self._itemConsumerFailsAt 

     self._log = logging.getLogger("producer ") 
     self._log.info(u"producing %d items", self._itemsToProduceCount) 

     for itemNumber in range(self._itemsToProduceCount): 
      self._possiblyRaiseConsumerError() 
      self._log.info(u"produce item %d", itemNumber) 
      if itemNumber == self._itemProducerFailsAt: 
       raise ValueError("ucannot produce item %d" % itemNumber) 
      # Do the actual work. 
      time.sleep(_PRODUCTION_DELAY) 
      self._itemQueue.put(itemNumber) 

     self._log.info(u"telling consumers to finish the remaining items") 
     for consumerToFinish in self._consumers: 
      consumerToFinish.finish() 
     self._log.info(u"waiting for consumers to finish") 
     for possiblyFinishedConsumer in self._consumers: 
      self._possiblyRaiseConsumerError() 
      possiblyFinishedConsumer.join(_HACK_DELAY) 
      if possiblyFinishedConsumer.isAlive(): 
       self._consumers.append(possiblyFinishedConsumer) 


if __name__ == "__main__": 
    logging.basicConfig(level=logging.INFO) 
    parser = optparse.OptionParser() 
    parser.add_option("-c", "--consumer-fails-at", metavar="NUMBER", 
     type="long", help="number of items at which consumer fails (default: %default)") 
    parser.add_option("-i", "--items", metavar="NUMBER", type="long", 
     help="number of items to produce (default: %default)", default=10) 
    parser.add_option("-n", "--consumers", metavar="NUMBER", type="long", 
     help="number of consumers (default: %default)", default=2) 
    parser.add_option("-p", "--producer-fails-at", metavar="NUMBER", 
     type="long", help="number of items at which producer fails (default: %default)") 
    options, others = parser.parse_args() 
    worker = Worker(options.items, options.producer_fails_at, 
     options.consumer_fails_at, options.consumers) 
    try: 
     worker.work() 
     logging.info(u"processed all items") 
    except KeyboardInterrupt: 
     logging.warning(u"interrupted by user") 
     worker._cancelAllConsumers() 
    except Exception, error: 
     logging.error(u"%s", error) 
     worker._cancelAllConsumers() 
+0

Może nie to, czego szukasz, ale istnieje ogromna biblioteka Pythona o nazwie seler, że można użyć zamiast pisania własna implementacja kolejkowania. –

+0

Dzięki za wskaźnik. Seler wygląda interesująco na złożone zadania z wykorzystaniem usług sieciowych i baz danych. Dla mojego szczególnego zadania producent odczytuje linie z pliku i wykonuje pewne podstawowe analizy strukturalne i przekazuje dane konsumentom - a więc głównie pracę intensywną we/wy. Konsumenci przetwarzają dane, wykonując intensywną pracę procesora. Ponieważ wszystko odbywa się w pamięci na tym samym komputerze, standardowa kolejka Pythona wydaje się być w porządku. – roskakori

Odpowiedz

0

Ponieważ dotychczasowe odpowiedzi dały dobre wskazówki, ale brakowało działającego kodu, wziąłem kod z mojego pytania i zapakowałem go do biblioteki, która jest dostępna pod numerem http://pypi.python.org/pypi/proconex/. Kod źródłowy można znaleźć pod adresem https://github.com/roskakori/proconex. Podczas gdy interfejs wydaje się rozsądny, implementacja nadal korzysta z odpytywania, więc zgłoszenia są mile widziane.

Każdy wyjątek w wątku producenta lub konsumenta jest powtórzony w wątku głównym. Upewnij się, że używasz oświadczenia with lub finally:worker.close(), aby upewnić się, że wszystkie wątki zostały poprawnie zamknięte.

Oto krótki przykład dla producenta z dwoma konsumentów do liczb całkowitych:

import logging 
import proconex 

class IntegerProducer(proconex.Producer): 
    def items(self): 
     for item in xrange(10): 
      logging.info('produce %d', item) 
      yield item 

class IntegerConsumer(proconex.Consumer): 
    def consume(self, item): 
     logging.info('consume %d with %s', item, self.name) 

if __name__ == '__main__': 
    logging.basicConfig(level=logging.INFO) 
    producer = IntegerProducer() 
    consumer1 = IntegerConsumer('consumer1') 
    consumer2 = IntegerConsumer('consumer2') 

    with proconex.Worker(producer, [consumer1, consumer2]) as worker: 
     worker.work() 
2

Trzeba kolejkę o sposobie odstąpienia że opróżnia kolejkę wewnętrznego, ustawia flagę anulowane, a potem wszyscy się budzi. Pracownik obudzi się z join(), sprawdzi flagę anulowania w kolejce i będzie działał prawidłowo. Konsumenci obudzą się z get() i sprawdzą flagę anulowaną w kolejce i wydrukują błąd. Wtedy twój konsument musiałby po prostu wywołać metodę cancel() w przypadku wyjątku.

Niestety kolejka w języku Python nie ma metody anulowania. Kilka wybory przeskoczyć do głowy:

  • przewróceniu własne kolejki (może być trudne, aby zrobić to dobrze)
  • Extend kolejkę Python i dodaj anulować metodą (parom kod do wewnętrznego realizacji Python kolejki klasa)
  • Serwer proxy klasy kolejki i przeciążenie dołączyć/uzyskać z logiką zajętości oczekiwania (nadal zajętym kilofem, ale ogranicza się do jednego punktu i czyści kod producenta/konsumenta)
  • Znajdź inną implementację kolejki/biblioteka tam jest
+0

Tak, przeniesienie logiki anulowania do kolejki z pewnością oczyści kod roboczy. Biorąc pod uwagę moje wymagania, kolejka musiałaby również pamiętać ewentualne informacje o wyjątku, ponieważ chcę, aby konsumenci zgłaszali błąd pracownikowi, a nie tylko go drukowali. Ale na pewno można to zrobić. Czy ktoś wie o istniejącej implementacji takiej kolejki? – roskakori