2015-08-31 10 views
10

Używam Selera do obsługi planowania zadań w aplikacji Django, którą rozwijam. Pracuję z bazą danych Django tylko do testowania.Zezwalaj na wykonanie zadania, jeśli nie zostało jeszcze zaplanowane przy użyciu selera.

Próbowałem tylko kilku rzeczy, aby obsłużyć wykonanie zadania tylko wtedy, gdy nie jest ono już zaplanowane lub jest w toku, jak proponowane w tym article, ale nic nie działa do tej pory.

coś takiego:

task.py

@task() 
def add(x, y): 
    return x + y 

A potem, gdy nazywają go dwa razy jak w następujący sposób:

import myapp.tasks.add 

myapp.tasks.add.apply_async((2,2), task_id=1, countdown=15) 
myapp.tasks.add.apply_async((2,2), task_id=2, countdown=15) 

Powinno zezwalaj na jedną instancję opartą na countdown=15. Jak mogę osiągnąć to, że drugie połączenie nigdy go nie uruchomi, jeśli jest jeszcze inne działanie lub oczekiwanie?

Odpowiedz

5

Jednym z problemów z zaakceptowaną odpowiedzią jest to, że jest powolny. Sprawdzanie, czy zadanie jest już uruchomione, polega na wykonaniu wywołania do brokera, a następnie iterowaniu za pośrednictwem uruchomionych i aktywnych zadań. Jeśli chcesz szybko umieścić w kolejce zadanie, to nie zadziała. Obecne rozwiązanie ma również mały wyścigowy stan, w którym 2 procesy mogą sprawdzać, czy zadanie zostało umieszczone w kolejce na tym samym poziomie (sprawdzić, czy nie jest), co spowodowałoby 2 dodatkowe zadania.

Lepszym rozwiązaniem byłoby to, co nazywam zadeklarowanymi zadaniami. Zasadniczo zwiększasz licznik za każdym razem, gdy kolejkujesz zadanie. Gdy zadanie zacznie się zmniejszać. Użyj redis, a potem wszystko jest atomowe.

np.

kolejce zadania:

conn = get_redis() 
conn.incr(key) 
task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

Następnie w zadaniu, masz 2 opcje, czy chcesz, aby wykonać zadanie 15 sekund po pierwszym został kolejce (przepustnica) lub wykonać go po 15 sekundach ostatni był w kolejce (debounce). Oznacza to, że jeśli nadal będziemy próbować uruchomić to samo zadanie, przedłużamy licznik czasu, czy też po prostu czekamy 15 na pierwszy i zignorujemy pozostałe zadania, które były w kolejce.

Łatwy do wspierania zarówno tutaj jest nieczułości gdzie czekamy aż zadania przestaje coraz kolejce:

conn = get_redis() 
counter = conn.decr(key) 
if counter > 0: 
    # task is queued 
    return 
# continue on to rest of task 

wersja przepustnicy:

counter = conn.getset(key, '0') 
if counter == '0': 
    # we already ran so ignore all the tasks that were queued since 
    return 
# continue on to task 

Inną zaletą tego rozwiązania nad akceptowany jest fakt, że Klucz jest całkowicie pod Twoją kontrolą. Jeśli chcesz, aby to samo zadanie było wykonywane, ale tylko raz dla różnych identyfikatorów/obiektów, na przykład, włącz to do swojego klucza.

Aktualizacja

myślał o tym jeszcze można zrobić wersję przepustnicy nawet łatwiejszy, bez konieczności stania w kolejce zadań.

przepustnicy v2 (przy kolejce zadania)

conn = get_redis() 
counter = conn.incr(key) 
if counter == 1: 
    # queue up the task only the first time 
    task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

Następnie w zadaniu ustawić licznik z powrotem do 0.

Nawet nie trzeba używać licznik, jeśli miał zestaw, do którego można dodać klucz do zestawu. Jeśli wrócisz 1, klucz nie był w zestawie i powinieneś umieścić go w kolejce. Jeśli odzyskasz 0, klucz jest już w zestawie, więc nie kolejkuj zadania.

+0

Tak, zgadzam się, nawet jak powiedziałeś, że proces iteracji nad wszystkimi uruchomionymi zadaniami jest kosztowny. Nice –

+0

skąd 'get_redis' pochodzi? –

+0

To jest moja własna metoda, która zwraca połączenie redis. – dalore

2

Spójrz, zanim skoczysz! Możesz sprawdzić, czy istnieją jakieś zadania uruchomione/czekające przed kolejkowaniem zadań.

Teraz, jeśli ustawisz w kolejce trzy zadania dodatkowe, pierwsza zostanie umieszczona w kolejce do wykonania, pozostanie nie będzie czekała w kolejce.

for i in range(3): 
    if not is_running_waiting('add'): 
     add.apply_async((2,2), countdown=15) 
Powiązane problemy