2013-03-05 14 views
6

Potrzebuję pobrać plik przez ftp, zmienić go i przesłać z powrotem. Używam seler, aby to zrobić, ale jestem z systemem do problemów przy próbie użycia łańcuchowym, gdzie otrzymuję:Kolejność zadań selekcji sekwencyjnie

TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)

Również mogę używać łańcuchów i mieć pewność, że kroki będą sekwencyjne? jeśli nie, jaka jest alternatywa?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async() 
print res.get() 

Zadania:

@task() 
def download_ftp_image(ftp_server, username , password , filename, directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     if not os.path.exists(directory): 
      os.makedirs(directory) 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     else: 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     ftp.quit() 
    except error_perm, resp: 
     raise download_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

@task() 
def upload_ftp_image(ftp_server, username , password , file , directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     new_file= file.replace(directory, "") 
     directory = directory.replace("tmp","") 
     try: 
      ftp.storbinary("STOR " + directory + new_file , open(file, "rb")) 
     except: 
      ftp.mkd(directory) 
      ftp.storbinary("STOR " + directory + new_file, open(file, "rb")) 
     ftp.quit() 
    except error_perm, resp: 
     raise upload_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

i czy jest to dobre czy złe praktyki dla mojego konkretnego przypadku? :

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
result.get() 
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
#result.get() 

Odpowiedz

13

łańcuch jest zawsze przeszły poprzedni wynik jako pierwszy argument. Z chains documentation:

The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in mul(4, 16) since the result is 4.

Twój upload_ftp_image zadanie nie akceptuje ten dodatkowy argument, a zatem nie jest on.

Masz dobry przypadek użycia do łączenia; drugim zadaniem jest gwarantowane nazywane po pierwsze zadanie jest zakończone (w przeciwnym razie wynik nie może być przekazany na wszelki wypadek).

Wystarczy dodać argument do wyniku z poprzedniego zadania:

def upload_ftp_image(download_result, ftp_server, username , password , file , directory): 

można zrobić pewne zastosowanie tej wartości wynikowej; może sprawić, że metoda pobierania zwróci ścieżkę pobranego pliku, aby metoda przesyłania wiedziała, co przesłać?

+0

jak powinienem to zrobić? – psychok7

+0

@ psychok7: Rozszerzyłem trochę. –

+0

Wygląda na to, że działa :) Dziękuję za pomoc – psychok7

17

Inną opcją, jeśli nie chcesz, aby wartość powrotu poprzedniego zadania była używana jako argument, jest użycie "niezmienności".

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

Zamiast definiowania podzadań jak:

download_ftp_image.s(...) and upload_ftp_image.s(...) 

je zdefiniować jako:

download_ftp_image.si(...) and upload_ftp_image.si(...) 

I można teraz użyć zadań ze zwykłymi liczbą argumentów w łańcuchu .

Powiązane problemy