2013-01-08 15 views
5

Uruchamiam 3 procesy i chcę, aby wstawiły ciąg do wspólnej tablicy, , do indeksu odpowiadającego procesowi (i).Python: wieloprocesorowość i macierz c_char_p

Spójrz na poniższy kod, generowany jest sygnał wyjściowy:

['test 0', None, None] 
['test 1', 'test 1', None] 
['test 2', 'test 2', 'test 2'] 

Dlaczego 'test 0' nadpisane przez test 1 i test 1 przez test 2?

Co chcę to (kolejność nie ma znaczenia):

['test 0', None, None] 
['test 0', 'test 1', None] 
['test 0', 'test 1', 'test 2'] 

Kod:

#!/usr/bin/env python 

import multiprocessing 
from multiprocessing import Value, Lock, Process, Array 
import ctypes 
from ctypes import c_int, c_char_p 

class Consumer(multiprocessing.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      multiprocessing.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr=self.arr, lock=self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr=None, lock=None): 
     with lock: 
      arr[self.i] = "test %d" % self.i 
      print arr[:] 

    def __str__(self): 
     return 'ARC' 

    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    arr = Array(ctypes.c_char_p, 3) 

    lock = multiprocessing.Lock() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

Używam Python 2.7.3 (Ubuntu)

Odpowiedz

5

Problem ten wydaje podobne do this one. Tam J.F. Sebastian spekulował, że przypisanie do arr[i] punktów arr[i] adresu pamięci, który był jedynie znaczący dla podprocesu wykonującego przypisanie. Pozostałe podprocesy odzyskują śmieci podczas przeglądania tego adresu.

Istnieją co najmniej dwa sposoby na uniknięcie tego problemu. Jednym z nich jest użycie multiprocessing.manager listę:

import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, lock, lst): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.lock = lock 
      self.lst = lst 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(lock = self.lock, lst = self.lst) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, lock, lst): 
     with lock: 
      lst[self.i] = "test {}".format(self.i) 
      print([lst[i] for i in range(3)]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    manager = mp.Manager() 
    lst = manager.list(['']*3) 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

Innym sposobem jest użycie wspólną tablicę o stałym rozmiarze takim jak mp.Array('c', 10).

import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr = self.arr, lock = self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr, lock): 
     with lock: 
      arr[self.i].value = "test {}".format(self.i) 
      print([a.value for a in arr]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    arr = [mp.Array('c', 10) for i in range(3)] 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

I spekulują, że powodem, dlaczego to działa, gdy mp.Array(ctypes.c_char_p, 3) nie robi, dlatego mp.Array('c', 10) ma stały rozmiar, adres pamięci nigdy się nie zmienia, natomiast mp.Array(ctypes.c_char_p, 3) ma zmienną wielkość, więc adres pamięci może się zmienić, gdy arr[i] jest przypisany do większego łańcucha.

Być może to właśnie the docs ostrzegają o tym, kiedy stwierdza,

Choć możliwe jest, aby przechowywać wskaźnik w pamięci współdzielonej pamiętać że będzie odnosić się do lokalizacji w przestrzeni adresowej określonej proces. Jednak wskaźnik jest prawdopodobnie nieprawidłowy w kontekście drugiego procesu i próbuje wyłuskać wskaźnik z , ponieważ drugi proces może spowodować awarię.

+0

Dziękuję miliard razy! Oba rozwiązania rzeczywiście działają :) Natknąłem się na posta J.F. Sebastiana, ale z jakiegoś powodu nie mogłem go wdrożyć ... doh! Teraz powiedz mi, gdzie powinienem zbudować swój posąg! Jeszcze raz dziękuję ... – Ujoux

+0

Dziękuję za interesujące pytanie i entuzjazm! Do zobaczenia na Stackoverflow. Jeśli chodzi o posągi - myślę, że kliknięcie w uparrow nad znacznikiem wyboru sprawia, że ​​jest on całkiem niezły; ^) – unutbu

+0

Zrobię tak szybko, jak będę miał potrzebną reputację 15, nie zapomnę;) – Ujoux

Powiązane problemy