2013-07-19 10 views
6

Poniższy kod nie wydają się biec równolegle, a nie jestem pewien dokładnie dlaczego:Can not Get wieloprocesorowe do uruchomienia procesów jednocześnie

def run_normalizers(config, debug, num_threads, name=None): 

    def _run(): 
     print('Started process for normalizer') 
     sqla_engine = init_sqla_from_config(config) 
     image_vfs = create_s3vfs_from_config(config, config.AWS_S3_IMAGE_BUCKET) 
     storage_vfs = create_s3vfs_from_config(config, config.AWS_S3_STORAGE_BUCKET) 

     pp = PipedPiper(config, image_vfs, storage_vfs, debug=debug) 

     if name: 
      pp.run_pipeline_normalizers(name) 
     else: 
      pp.run_all_normalizers() 
     print('Normalizer process complete') 

    threads = [] 
    for i in range(num_threads): 
     threads.append(multiprocessing.Process(target=_run)) 
    [t.start() for t in threads] 
    [t.join() for t in threads] 


run_normalizers(...) 

Zmienna config jest tylko słownik zdefiniowane z zewnątrz _run() funkcja. Wydaje się, że wszystkie procesy są tworzone - ale nie jest to szybsze niż w przypadku pojedynczego procesu. Zasadniczo, co się dzieje w funkcjach run_**_normalizers() czyta się z tabeli kolejki w bazie danych (SQLAlchemy), następnie wykonuje kilka żądań HTTP, a następnie uruchamia "potok" normalizatorów w celu modyfikacji danych, a następnie zapisania ich z powrotem w bazie danych. Pochodzę z obszaru JVM, gdzie wątki są "ciężkie" i często używane do równoległości - jestem trochę zdezorientowany tym, ponieważ uważałem, że moduł wieloprocesowy powinien ominąć ograniczenia GIL Pythona.

+0

Moduł przetwarzania wieloprocesorowego wykorzystuje procesy, a nie wątki. W związku z tym GIL nie ma na nią wpływu. –

+0

Przetestowałem Twój kod i podstawowa technika jest w porządku. Nie jestem pewien co do współużytkowanego 'config', jeśli słownik' config' jest często używany, co teoretycznie może spowalniać działanie. Możliwe, że procesor nie jest tutaj twoim wąskim gardłem. –

+0

Uruchomiłem go tylko na mojej stacji roboczej, 8 rdzeniach z 16 GB pamięci RAM. Przy 1 lub 1, 8 lub 16 procesach nic się nie zmienia - a zasoby systemowe są w porządku. –

Odpowiedz

3

naprawiłem mój problem z wieloprocesorem - i faktycznie zmieniłem wątki. Nie jestem pewien, co tak naprawdę naprawiło to myślenie - po prostu ponownie zaprojektowałem wszystko i zrobiłem robotnikom i zadaniom, a co nie i rzeczy lecą teraz. Oto podstawowe informacje o tym, co zrobiłem:

import abc 
from Queue import Empty, Queue 
from threading import Thread 

class AbstractTask(object): 
    """ 
     The base task 
    """ 
    __metaclass__ = abc.ABCMeta 

    @abc.abstractmethod 
    def run_task(self): 
     pass 

class TaskRunner(object): 

    def __init__(self, queue_size, num_threads=1, stop_on_exception=False): 
     super(TaskRunner, self).__init__() 
     self.queue    = Queue(queue_size) 
     self.execute_tasks  = True 
     self.stop_on_exception = stop_on_exception 

     # create a worker 
     def _worker(): 
      while self.execute_tasks: 

       # get a task 
       task = None 
       try: 
        task = self.queue.get(False, 1) 
       except Empty: 
        continue 

       # execute the task 
       failed = True 
       try: 
        task.run_task() 
        failed = False 
       finally: 
        if failed and self.stop_on_exception: 
         print('Stopping due to exception') 
         self.execute_tasks = False 
        self.queue.task_done() 

     # start threads 
     for i in range(0, int(num_threads)): 
      t = Thread(target=_worker) 
      t.daemon = True 
      t.start() 


    def add_task(self, task, block=True, timeout=None): 
     """ 
      Adds a task 
     """ 
     if not self.execute_tasks: 
      raise Exception('TaskRunner is not accepting tasks') 
     self.queue.put(task, block, timeout) 


    def wait_for_tasks(self): 
     """ 
      Waits for tasks to complete 
     """ 
     if not self.execute_tasks: 
      raise Exception('TaskRunner is not accepting tasks') 
     self.queue.join() 

wszystko zrobić, to stworzyć TaskRunner i dodać do niego zadania (tysiące z nich), a następnie wait_for_tasks zadzwonić(). więc, oczywiście w re-architekturze, którą zrobiłem, "naprawiłem" jakiś inny problem, który miałem. Dziwne jednak.

1

Jeśli nadal szuka rozwiązania wieloprocesorowe, najpierw może chcieć sprawdzić, jak korzystać z basenu pracowników, to nie będzie musiał zarządzać procesami NUM_THREADS na własną rękę: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

I dla problemu spowolnienia próbowałeś przekazać obiekt config jako argument do funkcji _run? Nie wiem, czy/jak to by zmieniło wewnętrznie, ale jest to domysły, że może coś zmienić.

Powiązane problemy