2012-02-09 7 views
5

Mam pewną funkcję producenta, która polega na blokowaniu połączeń I/O i niektórych funkcjach konsumenckich, które również opierają się na blokowaniu połączeń I/O. Aby je przyspieszyć, użyłem biblioteki mikro-gwintów Gevent jako kleju.Jak mogę wdrożyć paramigmat oparty na wielu producentach i wielu konsumentach w Gevent?

Oto co mój paradygmat wygląda następująco:

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 

def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 

def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 



for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

#This doesnt work. 
for j in range(2): 
    producers.append(gevent.spawn(producer)) 

#Uncommenting this makes this script work. 
#producer() 

q.join() 

Mam cztery konsumenta i chciałby mieć dwóch producentów. Producenci wychodzą, gdy wysyłają sygnał, tj. 10. Konsumenci nadal żyją z tej kolejki, a całe zadanie kończy się, gdy producenci i konsumenci się skończyli.

To jednak nie działa. Jeśli skomentuję pętlę for, która spawnuje wielu producentów i używa tylko jednego producenta, skrypt działa poprawnie.

Nie mogę zrozumieć, co zrobiłem źle.

Wszelkie pomysły?

Dzięki

Odpowiedz

6

W rzeczywistości nie chcesz wyjść, gdy kolejka nie ma niedokończonej pracy, ponieważ koncepcyjnie nie jest, gdy aplikacja powinna się zakończyć.

Chcesz wyjść, gdy producenci skończą, a następnie, gdy nie ma niedokończonych prac.

# Wait for all producers to finish producing 
gevent.joinall(producers) 
# *Now* we want to make sure there's no unfinished work 
q.join() 
# We don't care about workers. We weren't paying them anything, anyways 
gevent.killall(workers) 
# And, we're done. 
3

myślę, że robi q.join() zanim cokolwiek zostanie umieszczone w kolejce i zamyka się natychmiast. Spróbuj dołączyć do wszystkich producentów przed dołączeniem do kolejki.

+0

Hi ZCH, ja nie do końca, a następnie swoją odpowiedź. Czy możesz wkleić mały fragment kodu? To by trochę wyjaśniło. –

+0

@MridangAgarwalla - Przed 'q.join()' napisz 'dla producenta w producencie: producer.join()'. W ten sposób najpierw poczekasz, aż wszyscy producenci zakończą pracę, a potem do momentu, aż kolejka będzie pusta. – zch

+0

Aha, może zaimplementowałem je źle. Chciałem, aby moi producenci i konsumenci działali równolegle, tzn. Producenci dodawali do kolejki, dopóki nie skończą, gdy konsumenci będą ją karmić, dopóki wszystkie elementy kolejki nie zostaną zakończone, a producenci nie będą już dodawać rzeczy do kolejki. –

0

To, co chcesz zrobić, to zablokować główny program, podczas gdy producenci i pracownicy komunikują się. Blokowanie w kolejce będzie czekać, aż kolejka będzie pusta, a następnie wydajna, co może być natychmiastowe. Umieścić to na koniec programu zamiast q.join()

gevent.joinall(producers) 
0

spotkałem same problemy jak twoje. Głównym problemem związanym z Twoim kodem było to, że twój producent został zainicjowany w wątku, który spowodował, że pracownik nie mógł natychmiast otrzymać zadania.

Proponuję, aby w głównym procesie uruchomić producer(), a nie odradzać się w wątku gevent. Gdy przebieg procesu spotka się z producentem, który może natychmiast przekazać zadanie.

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 

def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 

def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 


producer() 

for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

Kody Powyższe ma sens .. :)

Powiązane problemy