Próbuję napisać klasę, która obliczy sumy kontrolne za pomocą wielu procesów, wykorzystując w ten sposób wiele rdzeni. Mam do tego dość prostą klasę i działa świetnie podczas wykonywania prostego przypadku. Ale ilekroć tworzę dwie lub więcej instancji klasy, pracownik nigdy nie kończy pracy. Wygląda na to, że nigdy nie otrzyma wiadomości, że rura została zamknięta przez rodzica.Używanie potoków wieloprocesorowych w pytonie
Cały kod można znaleźć poniżej. Najpierw obliczam oddzielnie sumy kontrolne md5 i sha1, które działają, a następnie próbuję wykonać obliczenia równolegle, a następnie program blokuje się, gdy nadejdzie czas zamknięcia rury.
Co się tutaj dzieje? Dlaczego rury nie działają tak, jak oczekuję? Sądzę, że mógłbym obejść ten problem, wysyłając komunikat "Stop" w kolejce i zmuszając dziecko do opuszczenia go w ten sposób, ale naprawdę chciałbym wiedzieć, dlaczego to nie działa tak jak jest.
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
self.parent_conn.close() # This is the child. Close unused end.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums in parallel causes a lockup!
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!
main()
PS. Ten problem został rozwiązany Oto wersję roboczą powyższym kodzie, jeśli ktoś jest zainteresowany:
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
all_open_parent_conns = []
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
for conn in ChecksumPipe.all_open_parent_conns:
conn.close() # This is the child. Close unused ends.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums also works fine now
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()
main()
Możesz dodać 'ChecksumPipe.all_open_parent_conns.remove (self.parent_conn)' po 'self.parent_conn.close()', aby pozwolić na zniszczenie połączenia. –
'self.summer = eval (" hashlib.% S() "% csname)' wygląda brzydko. A co z 'self.summer = getattr (hashlib, csname)()'? – glglgl