2017-04-19 36 views
10

z dokumentacji przepływu powietrza:Airflow: wzór do uruchomienia nawiewu subdag raz

SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything 

Rozumiem subdagoperator jest faktycznie realizowany jako BackfillJob i dlatego musimy zapewnić schedule_interval dla operatora. Czy istnieje jednak sposób na uzyskanie semantycznego odpowiednika schedule_interval="@once" dla subdagu? Obawiam się, że jeśli użyję zestawu schedule_interval="@daily" dla subdagu, że podagar może działać więcej niż jeden raz, jeśli subdag trwa dłużej niż dzień, aby uruchomić.

def subdag_factory(parent_dag_name, child_dag_name, args): 
    subdag = DAG(
     dag_id="{parent_dag_name}.{child_dag_name}".format(
      parent_dag_name=parent_dag_name, child_dag_name=child_dag_name 
     ), 
     schedule_interval="@daily", # <--- this bit here 
     default_args=args 
    ) 

    ... do more stuff to the subdag here 
    return subdag 

TLDR: jak udawać się „tylko uruchomić ten subdag raz na spuście dag dominującej”

Odpowiedz

2

Spróbuj zewnętrznego wzorca wyzwalania z harmonogramem = None dla subdag. W takim przypadku będzie on uruchamiany tylko wtedy, gdy wyzwalacz zostanie wygenerowany przez rodzica dag

+0

Aby uzyskać wyjaśnienie, sugerujesz użycie [TriggerDagRunOperator] (https://airflow.incubator.apache.org/code.html?highlight=trigger%20dagrun#airflow.operators.TriggerDagRunOperator) w celu wywołania dag bez harmonogram? Kluczem do subdagu jest * blokowanie * semantyki, operator dagrun wyzwalacza tylko wyzwala dagrun, a następnie przechodzi dalej i nie czeka, aż dagrun zostanie ukończony. Dodatkowo, nie uzyskujesz przejrzystości w interfejsie przepływu powietrza, w którym uruchomiony był subdag, po prostu wiesz, że uruchomiono losowy dagrun. – gnicholas

4

Uważam, że [email protected] działa dobrze dla moich subdags. Być może moja wersja jest nieaktualna, ale miałem więcej problemów z moimi subdagami , które zawiodły nawet wtedy, gdy wszystkie zadania powiodły się (lub zostały pominięte) niż odwrotnie.

Rzeczywista przykładowy kod działa dość szczęśliwie żyć na moim komputerze teraz:

subdag_name = ".".join((parent_name,child_name)) 
logging.info(parent_name) 
logging.info(subdag_name) 
dag_subdag = DAG(
    dag_id=subdag_name, 
    default_args=dargs, 
    schedule_interval="@once", 
) 

W rzeczywistości, pierwotnie zbudowany prawie wszystkie moje DAG jak pliki uwielbiony cfg dla moich subdags. Nie jestem pewien, jak dobry jest pomysł, po kilku próbach i błędach, ale interwał harmonogramu nigdy nie był dla mnie blokerem.

Używam stosunkowo niedawnej wersji 1.8 z kilkoma dostosowaniami. Podążałem za przykładem sugestii Dag, aby przechowywać moje subdags w folderze wewnątrz folderu dags, aby nie pojawiały się w DagBag.

+0

Używam przepływu powietrza 1.7.1.3 i 1.8 nie jest opcją ATM, ponieważ ta wersja przypadkowo uszkodziła wtyczki niestandardowego executora. Przyjrzę się 1.8, aby sprawdzić, czy uruchamianie subdagów z harmonogramem '' @cece' 'jest możliwe, ale byłbym zaskoczony, gdyby to było prawdziwe, ponieważ dokumentacja mówi, że tak nie jest. – gnicholas

+0

Powodzenia? Mój kod nadal ucieka szczęśliwie. Próbowałem sprawdzić kanoniczny sposób, aby zrobić to dla ciebie w 1.7. Najbliższa rzecz, jaką udało mi się znaleźć (zakładając, że '@ once' nie jest wykonalna) jest ustawiona na' execution_timeout', ponieważ rzeczywiste zadanie subdag jest krótsze niż częstotliwość wykonania ustawiona w samym subagiku. W ten sposób przekroczycie limit czasu, zanim subdag będzie mógł uruchomić więcej zadań. Wiem, że to spekulacja, ale nie mogłem łatwo znaleźć przepływu powietrza w naszym widelcu, który jest tak stary jak ten, na którym się znajdujesz. – apathyman

+1

Chciałbym usłyszeć od autorów, dlaczego działa, gdy docs wyraźnie powiedzieć, że nie powinno. – qwwqwwq