2016-11-17 36 views
28

Uruchomiłem webserver Airflow i zaplanowałem kilka dagów. Widzę dagów na GUI WWW.Przepływ powietrza: jak usunąć DAG?

Jak mogę usunąć określoną DAG z uruchamiania i pokazywać w GUI WWW? Czy jest w tym celu polecenie CLI Airflow?

Rozejrzałem się, ale nie mogłem znaleźć odpowiedzi na prosty sposób usunięcia DAG po załadowaniu i zaplanowaniu.

+0

Nie ma CLI dla tego produktu. Ale istnieje żądanie wycofania, jeśli chcesz spróbować je przywrócić: https://github.com/apache/incubator-airflow/pull/1344 – TheF1rstPancake

Odpowiedz

2

Nie ma nic wbudowanego w przepływ powietrza, który to robi dla Ciebie. Aby usunąć DAG, usuń go z repozytorium i usuń wpisy bazy danych w tabeli metastore przepływu powietrza - dag.

+0

Musiałem też ponownie uruchomić komputer, na którym jest harmonogram i serwer WWW biegnie, aby dokończyć porządkowanie. Po prostu ponowne uruchomienie serwera i harmonogramu były niewystarczające. –

7

Właśnie napisałem skrypt, który usuwa wszystko związane z konkretnym dagiem, ale jest to tylko dla MySQL. Możesz napisać inną metodę łączenia, jeśli używasz PostgreSQL. Pierwotnie polecenia publikowane przez Lance na https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 Po prostu umieszczam to w skrypcie. Mam nadzieję że to pomoże. Format: python script.py dag_id

import sys 
import MySQLdb 

dag_input = sys.argv[1] 

query = {'delete from xcom where dag_id = "' + dag_input + '"', 
     'delete from task_instance where dag_id = "' + dag_input + '"', 
     'delete from sla_miss where dag_id = "' + dag_input + '"', 
     'delete from log where dag_id = "' + dag_input + '"', 
     'delete from job where dag_id = "' + dag_input + '"', 
     'delete from dag_run where dag_id = "' + dag_input + '"', 
     'delete from dag where dag_id = "' + dag_input + '"' } 

def connect(query): 
     db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database") 
     cur = db.cursor() 
     cur.execute(query) 
     db.commit() 
     db.close() 
     return 

for value in query: 
     print value 
     connect(value) 
10

Nie wiem, dlaczego Apache Airflow nie mają oczywisty i łatwy sposób, aby usunąć DAG

Zapisano https://issues.apache.org/jira/browse/AIRFLOW-1002

+2

PR dla tego jest otwarty, ale nie został jeszcze scalony. Link dla zainteresowanych - https://github.com/apache/incubator-airflow/pull/2199. –

14

To mój dostosowany kod z użyciem PostgresHook domyślny identyfikator połączenia.

import sys 
from airflow.hooks.postgres_hook import PostgresHook 

dag_input = sys.argv[1] 
hook=PostgresHook(postgres_conn_id= "airflow_db") 

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]: 
    sql="delete from {} where dag_id='{}'".format(t, dag_input) 
    hook.run(sql, True) 
+2

Myślę, że możesz również dodać 'task_fail' i' dag_stats' do tej listy tabel – marengaz

4

Napisałem skrypt, który usuwa wszystkie metadane związane z konkretnym dagem dla domyślnej DB SQLite. Jest to oparte na odpowiedzi Jezusa powyżej, ale dostosowane z Postgres do SQLite. Użytkownicy powinni ustawić ../airflow.db wszędzie tam, gdzie script.py jest przechowywany względem domyślnego pliku airflow.db (zazwyczaj ~/airflow). Aby wykonać, użyj python script.py dag_id.

import sqlite3 
import sys 

conn = sqlite3.connect('../airflow.db') 
c = conn.cursor() 

dag_input = sys.argv[1] 

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]: 
    query = "delete from {} where dag_id='{}'".format(t, dag_input) 
    c.execute(query) 

conn.commit() 
conn.close() 
+0

to działa i jest dobrym rozwiązaniem przynajmniej do momentu połączenia PR –

1

Można wyczyścić zbiór instancji zadania, jak gdyby nigdy nie prowadził z:

airflow clear dag_id -s 2017-1-23 -e 2017-8-31 

a następnie usunąć plik z folderu dag DAG

+1

Może to spowodować, że niektóre nieczyste dane w tabelach 'dag' – Chengzhi

Powiązane problemy