2017-05-31 40 views
6

Mam subdag w przepływie powietrza z długim etapem (zazwyczaj około 2 godzin, choć zależy to od tego, która jednostka jest uruchamiana). Zgodnie z 1.7.1.3, ten krok konsekwentnie powodowałby, że SubDAG zatrzymałby się w stanie "uruchomionym", gdy wszystkie kroki w ramach powiodłyby się. Moglibyśmy obejść to, ponieważ nie mieliśmy kroków po SubDAG, ręcznie oznaczając SubDagOperator jako pomyślny (a nie działający) w bazie danych.Przepływ powietrza - zadanie długotrwałe w subdag oznaczone jako nieudane po godzinie

Testujemy przepływu powietrza 1.8.1 teraz, modernizacji, wykonując następujące czynności:

  1. Shuting dół naszą planującego i pracowników
  2. Via pip, odinstalowanie przepływ powietrza i instalacji Apache przepływu powietrza (w wersji 1.8.1)
  3. Runing powietrza upgradedb
  4. Uruchamianie programu planującego nawiewu i pracowników

z o systemie nie naruszony, ten sam DAG traci teraz w 100% czas po tym, jak długo trwające zadanie osiągnie 1 godzinę (chociaż dziwnie, nie dokładnie 3600 sekund później - może to być w dowolnym miejscu od 30 do 90 sekund po tyknięciu godzin) z komunikatem "Wykonawca zakończył zadanie zadania instancji (nie powiodło się), mimo że zadanie mówi, że działa. Czy zadanie zostało zabite na zewnątrz? ". Jednak samo zadanie nadal działa na robotniku, nie ma już problemu, ale istnieje nieporozumienie między planistą błędnie myślącym, że zadanie nie powiodło się (zobacz this line z jobs.py) w oparciu o bazę danych, pomimo faktycznego zadania

Potwierdziłem, że w jakiś sposób stan "nie powiodło się" w tabeli task_instance bazy danych przepływu powietrza, dlatego chciałbym wiedzieć, co może być ustawienie stanu zadania nie powiodło się, gdy zadanie sam jest nadal działa

Oto dag próbki, które wywołuje problem.

from datetime import datetime 
from airflow.models import DAG 
from airflow.operators.bash_operator import BashOperator 
from airflow.operators.subdag_operator import SubDagOperator 

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)} 

def define_sub(dag, step_name, sleeptime): 
    op = BashOperator(
     task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag 
    ) 
    return dag 

def gen_sub_dag(parent_name, step_name, sleeptime): 
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS) 
    define_sub(sub, step_name, sleeptime) 
    return sub 

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None) 

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent 
) 
+0

Dzisiaj, wpadłem na ten sam problem, Subdag z jednym długim zadaniem, po nieco ponad godzinie otrzymałem komunikat o błędzie. Co ciekawe, program planujący próbował ponownie uruchomić zadanie, które nie powiodło się z powodu zablokowanego przepływu zasobów z powietrza. Pierwotne zadanie było kontynuowane i zakończyło się poprawnie, a przepływ powietrza oznaczał, że subdag nie powiodło się, zanim zadanie się zakończyło. –

+0

Jakiego executora używasz. Czy to Celery + Redis? –

Odpowiedz

0

Jeśli rzeczywiście biegasz z Selerem i Redisem, spójrz na visibility timeout setting dla Selera i zwiększ go ponad oczekiwany czas zakończenia zadania.

Chociaż konfigurujemy Seler do zadań-ack-late, nadal ma problemy z znikającymi zadaniami. Uważamy to za a bug w selerze.

Powiązane problemy