2015-06-03 16 views
7

w Pythonie (2.7) Staram się tworzyć procesy (z multiprzetwarzania) w zadaniu selera (seler 3.1.17) ale daje błąd:seler: demoniczny procesy nie mogą mieć dzieci

daemonic processes are not allowed to have children 

Googling to, znalazłem, że najnowsze wersje bilard naprawić "błąd", ale mam najnowszą wersję (3.3.0.20) i błąd nadal się dzieje. Próbowałem również wdrożyć this workaround w moim zadaniu selera, ale daje ten sam błąd.

Czy ktoś wie, jak to zrobić? Każda pomoc jest mile widziana, Patricka

EDIT: fragmentów kodu

Zadanie:

from __future__ import absolute_import 
from celery import shared_task 
from embedder.models import Embedder 

@shared_task 
def embedder_update_task(embedder_id): 
    embedder = Embedder.objects.get(pk=embedder_id) 
    embedder.test() 

Sztuczna funkcja testu (from here):

def sleepawhile(t): 
    print("Sleeping %i seconds..." % t) 
    time.sleep(t) 
    return t  

def work(num_procs): 
    print("Creating %i (daemon) workers and jobs in child." % num_procs) 
    pool = mp.Pool(num_procs) 

    result = pool.map(sleepawhile, 
     [randint(1, 5) for x in range(num_procs)]) 

    # The following is not really needed, since the (daemon) workers of the 
    # child's pool are killed when the child is terminated, but it's good 
    # practice to cleanup after ourselves anyway. 
    pool.close() 
    pool.join() 
    return result 

def test(self): 
    print("Creating 5 (non-daemon) workers and jobs in main process.") 
    pool = MyPool(5) 

    result = pool.map(work, [randint(1, 5) for x in range(5)]) 

    pool.close() 
    pool.join() 
    print(result) 

My prawdziwy funkcja:

import mulitprocessing as mp 

def test(self): 
    self.init() 
    for saveindex in range(self.start_index,self.start_index+self.nsaves): 
     self.create_storage(saveindex) 
     # process creation: 
     procs = [mp.Process(name="Process-"+str(i),target=getattr(self,self.training_method),args=(saveindex,)) for i in range(self.nproc)] 
     for p in procs: p.start() 
     for p in procs: p.join() 
    print "End of task" 

funkcji init definiuje tablicę wieloprocesorowe i obiekt, który podziela tą pamięć tak, że wszystkie moje procesy mogą aktualizować tę samą tablicę w tym samym czasie:

mp_arr = mp.Array(c.c_double, np.random.rand(1000000)) # example 
self.V = numpy.frombuffer(mp_arr.get_obj()) #all the processes can update V 

Błąd generowany, gdy zadanie jest nazywany :

[2015-06-04 09:47:46,659: INFO/MainProcess] Received task: embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda] 
[2015-06-04 09:47:47,674: WARNING/Worker-5] Creating 5 (non-daemon) workers and jobs in main process. 
[2015-06-04 09:47:47,789: ERROR/MainProcess] Task embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda]  raised unexpected: AssertionError('daemonic processes are not allowed to have children',) 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task 
    R = retval = fun(*args, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__ 
    return self.run(*args, **kwargs) 
    File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/tasks.py", line 21, in embedder_update_task 
    embedder.test() 
    File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/models.py", line 475, in test 
    pool = MyPool(5) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 159, in __init__ 
self._repopulate_pool() 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool 
    w.start() 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 124, in start 
'daemonic processes are not allowed to have children' 
AssertionError: daemonic processes are not allowed to have children 
+0

zaktualizuj swoje pytanie z fragment kodu, który powoduje wyjątek i ful l wyjątek. – scytale

+0

Gotowe. Dziękuję Ci. – Patrick

+0

Dodano mój prawdziwy kod (w przeciwieństwie do * sztucznego *). Dziękuję scytale za pomoc, jest to bardzo cenne. – Patrick

Odpowiedz

5

billiard i multiprocessing są różne biblioteki - billiard jest projekt seler własny widelec z multiprocessing. Będziesz musiał zaimportować billiard i użyć go zamiast multiprocessing

Jednak lepszą odpowiedzią jest prawdopodobnie to, że powinieneś zreorganizować swój kod, aby odradzać więcej zadań z Celery, zamiast korzystać z dwóch różnych sposobów dystrybucji swojej pracy.

Można to zrobić za pomocą seler canvas

from celery import group 

@app.task 
def sleepawhile(t): 
    print("Sleeping %i seconds..." % t) 
    time.sleep(t) 
    return t  

def work(num_procs): 
    return group(sleepawhile.s(randint(1, 5)) for x in range(num_procs)]) 

def test(self): 
    my_group = group(work(randint(1, 5)) for x in range(5)) 
    result = my_group.apply_async() 
    result.get() 

Mam próbowali dokonać działającą wersję kodu, który używa prymitywów płótna zamiast wieloprocesorowych. Ponieważ jednak twój przykład był dość sztuczny, nie jest łatwo wymyślić coś, co ma sens.

Aktualizacja:

Oto tłumaczenie z prawdziwego kodu, który korzysta z płótna Celery:

tasks.py:

@shared_task 
run_training_method(saveindex, embedder_id): 
    embedder = Embedder.objects.get(pk=embedder_id) 
    embedder.training_method(saveindex) 

models.py:

from tasks import run_training_method 
from celery import group 

class Embedder(Model): 

    def embedder_update_task(self): 
     my_group = [] 

     for saveindex in range(self.start_index, self.start_index + self.nsaves): 
      self.create_storage(saveindex) 
      group.extend([run_training_method.subtask((saveindex, self.id)) 
         for i in range(self.nproc)]) 

     result = group(my_group).apply_async() 
+0

Dziękuję, Scytale, spróbuj! Czy wiesz, czy istnieje odpowiednik wieloprocesorowego.barray w bilard (cf my init() metoda), tak aby wszystkie zadania mogą współużytkować tę samą zmienną pamięci? – Patrick

+0

Nie, nie ma - pracownicy selerzy nie mogą zakładać, że mogą coś udostępniać, ponieważ mogą być uruchomieni na różnych hostach. Jeśli masz zamiar uruchamiać tylko na 1 komputerze, to może lepszym rozwiązaniem będzie używanie tylko wieloprocesowości, ponieważ masz wygodę współdzielonej pamięci. Jeśli potrzebujesz pracować na wielu maszynach, to możesz używać tylko selera i przetwarzać dane w stylu mapreduce na swoje dane. – scytale

+0

Dziękuję za całą pomoc! Patrick – Patrick

Powiązane problemy