Eksperymentuję z przepływem powietrza dla rurociągów danych. Niestety nie udało mi się jak dotąd obsłużyć tego operatora bigquery. Szukałem rozwiązania najlepiej jak potrafię, ale nadal utknąłem. Używam sekwencyjnego executora działającego lokalnie.konfiguracja przepływu powietrza z operatorem bigquery
Oto mój kod:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(dag_id='bigQueryPipeline', default_args=default_args, schedule_interval=timedelta(1))
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default',
delegate_to=False,
udf_config=False,
dag=dag,
)`
Komunikat o błędzie:
[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
Traceback (most recent call last):
File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
args.func(args)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
ti.run(force=True, ignore_dependencies=True, test_mode=True)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
result = task_copy.execute(context=context)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
conn = hook.get_conn()
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
project = connection_extras['project']
Jean-Christophe Rodrigue, wymyśliłeś rozwiązanie? Utknąłem z tą samą wiadomością. Nie wiem, co to jest bigquery_conn_id, ponieważ * bigquery_default * nie działa dla mnie. – BGA