2011-10-29 11 views
12

Próbuję napisać klasę, która obliczy sumy kontrolne za pomocą wielu procesów, wykorzystując w ten sposób wiele rdzeni. Mam do tego dość prostą klasę i działa świetnie podczas wykonywania prostego przypadku. Ale ilekroć tworzę dwie lub więcej instancji klasy, pracownik nigdy nie kończy pracy. Wygląda na to, że nigdy nie otrzyma wiadomości, że rura została zamknięta przez rodzica.Używanie potoków wieloprocesorowych w pytonie

Cały kod można znaleźć poniżej. Najpierw obliczam oddzielnie sumy kontrolne md5 i sha1, które działają, a następnie próbuję wykonać obliczenia równolegle, a następnie program blokuje się, gdy nadejdzie czas zamknięcia rury.

Co się tutaj dzieje? Dlaczego rury nie działają tak, jak oczekuję? Sądzę, że mógłbym obejść ten problem, wysyłając komunikat "Stop" w kolejce i zmuszając dziecko do opuszczenia go w ten sposób, ale naprawdę chciałbym wiedzieć, dlaczego to nie działa tak jak jest.

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 
    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     self.parent_conn.close() # This is the child. Close unused end. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 


def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums in parallel causes a lockup! 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here! 

main() 

PS. Ten problem został rozwiązany Oto wersję roboczą powyższym kodzie, jeśli ktoś jest zainteresowany:

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 

    all_open_parent_conns = [] 

    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     ChecksumPipe.all_open_parent_conns.append(self.parent_conn) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     for conn in ChecksumPipe.all_open_parent_conns: 
      conn.close() # This is the child. Close unused ends. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 

def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums also works fine now 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() 

main() 
+0

Możesz dodać 'ChecksumPipe.all_open_parent_conns.remove (self.parent_conn)' po 'self.parent_conn.close()', aby pozwolić na zniszczenie połączenia. –

+0

'self.summer = eval (" hashlib.% S() "% csname)' wygląda brzydko. A co z 'self.summer = getattr (hashlib, csname)()'? – glglgl

Odpowiedz

7

Tak, to jest rzeczywiście zaskakujące zachowanie.

Jednakże, jeśli przyjrzeć się wyjściu lsof dla dwóch równoległych procesów potomnych, łatwo zauważyć, że drugi proces potomny ma więcej otwartych deskryptorów plików.

Co się dzieje, gdy dwa równoległe procesy podrzędne są uruchamiane, drugie dziecko dziedziczy potoki rodzica, tak, że gdy rodzic wywołuje self.parent_conn.close(), drugie dziecko nadal ma otwarty deskryptor pliku potoku, tak że opis pliku potoku nie jest Zamknięcie jądra (liczba odwołań jest większa niż 0), z tym, że self.child_conn.recv_bytes() w pierwszym równoległym procesie potomnym nigdy nie zostanie wyrzucony read() s EOF i EOFError.

Może być konieczne wysłanie jawnego komunikatu zamknięcia, a nie tylko zamknięcie deskryptorów plików, ponieważ wydaje się, że mają niewielką kontrolę nad tym, jakie deskryptory plików są udostępniane między którymi procesami (nie ma flagi deskryptora pliku close-on-fork).

+0

Dzięki! To dla mnie wyjaśniło. Rozwiązałem to w moim przykładzie, używając zmiennej dzielonej klasy zawierającej wszystkie otwarte połączenia we wszystkich instancjach, aby dzieci mogły zamknąć wszystkie gniazda, których nie potrzebują. –