2016-06-14 10 views
5

Podczas próby użycia funkcji szablonów przepływu powietrza (przez Jinja2) za pomocą narzędzia PostgresOperator, nie udało mi się uzyskać renderowania. Jest całkiem możliwe, że robię coś nie tak, ale jestem dość zagubiony, co może być problem. Oto przykład, aby odtworzyć błąd TemplateNotFound Byłem dostaniem:TemplateNotFound przy użyciu PostgresOperatora Airflow z szablonami Jinja i SQL

airflow.cfg

airflow_home = /home/gregreda/airflow 
dags_folder = /home/gregreda/airflow/dags 

istotne DAG i zmienne

default_args = { 
    'owner': 'gregreda', 
    'start_date': datetime(2016, 6, 1), 
    'schedule_interval': None, 
    'depends_on_past': False, 
    'retries': 3, 
    'retry_delay': timedelta(minutes=5) 
} 

this_dag_path = '/home/gregreda/airflow/dags/example_csv_to_redshift' 
dag = DAG(
    dag_id='example_csv_to_redshift', 
    schedule_interval=None, 
    default_args=default_args 
) 

/example_csv_to_redshift/csv_to_redshift.py

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table', 
    sql=this_dag_path + '/copy_to_redshift.sql', 
    params=dict(
     AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'), 
     AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY') 
    ), 
    postgres_conn_id='postgres_redshift', 
    autocommit=False, 
    dag=dag 
) 

/example_csv_to_redshift/copy_to_redshift.sql

COPY public.table_foobar FROM 's3://mybucket/test-data/import/foobar.csv' 
CREDENTIALS 'aws_access_key_id={{ AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ AWS_SECRET_ACCESS_KEY }}' 
CSV 
NULL as 'null' 
IGNOREHEADER as 1; 

Wywołanie airflow render example_csv_to_redshift load_table 2016-06-14 zgłasza wyjątek poniżej. Uwaga Zauważam, że korzystam z tego problemu także dla innych DAG, dlatego widzisz wspomnianą ścieżkę z example_redshift_query_to_csv.

[2016-06-14 21:24:57,484] {__init__.py:36} INFO - Using executor SequentialExecutor 
[2016-06-14 21:24:57,565] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt 
[2016-06-14 21:24:57,596] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt 
[2016-06-14 21:24:57,763] {models.py:154} INFO - Filling up the DagBag from /home/gregreda/airflow/dags 
[2016-06-14 21:24:57,828] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files 
    setattr(self, attr, env.loader.get_source(env, content)[0]) 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source 
    raise TemplateNotFound(template) 
TemplateNotFound: /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql 
[2016-06-14 21:24:57,834] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files 
    setattr(self, attr, env.loader.get_source(env, content)[0]) 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source 
    raise TemplateNotFound(template) 
TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql 
Traceback (most recent call last): 
    File "/usr/local/bin/airflow", line 15, in <module> 
    args.func(args) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 359, in render 
    ti.render_templates() 
    File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1409, in render_templates 
    rendered_content = rt(attr, content, jinja_context) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2017, in render_template 
    return jinja_env.get_template(content).render(**context) 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 812, in get_template 
    return self._load_template(name, self.make_globals(globals)) 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 774, in _load_template 
    cache_key = self.loader.get_source(self, name)[1] 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source 
    raise TemplateNotFound(template) 
jinja2.exceptions.TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql 

Wszelkie pomysły na naprawę są doceniane.

Odpowiedz

3

Standard PEBCAK error.

Wystąpił problem z podaniem ścieżki do szablonu SQL w zadanym zadaniu przepływu powietrza, które musi być względne.

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table', 
    sql='/copy_to_redshift.sql', 
    params=dict(
     AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'), 
     AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY') 
    ), 
    postgres_conn_id='postgres_redshift', 
    autocommit=False, 
    dag=dag 
) 

Dodatkowo szablon SQL musiała zostać nieco zmienione (uwaga params. ... ten czas):

COPY public.pitches FROM 's3://mybucket/test-data/import/heyward.csv' 
CREDENTIALS 'aws_access_key_id={{ params.AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ params.AWS_SECRET_ACCESS_KEY }}' 
CSV 
NULL as 'null' 
IGNOREHEADER as 1; 
+1

Udało mi się nauczyć, jak korzystać z Variables, PostgresOperator i referencyjnych plików .sql z tego. Dzięki! – trench

0

Dla nieco większą kontrolę, init swoją DAG z template_searchpath param, a potem po prostu użyć nazwa pliku w operatorze.

:param template_searchpath: This list of folders (non relative) 
    defines where jinja will look for your templates. Order matters. 
    Note that jinja/airflow includes the path of your DAG file by 
    default 
:type template_searchpath: string or list of stings