2016-08-27 17 views
7

Eksperymentuję z przepływem powietrza dla rurociągów danych. Niestety nie udało mi się jak dotąd obsłużyć tego operatora bigquery. Szukałem rozwiązania najlepiej jak potrafię, ale nadal utknąłem. Używam sekwencyjnego executora działającego lokalnie.konfiguracja przepływu powietrza z operatorem bigquery

Oto mój kod:

from airflow import DAG 
from airflow.contrib.operators.bigquery_operator import BigQueryOperator 
from datetime import datetime, timedelta 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2015, 6, 1), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    # 'queue': 'bash_queue', 
    # 'pool': 'backfill', 
    # 'priority_weight': 10, 
    # 'end_date': datetime(2016, 1, 1), 
} 

dag = DAG(dag_id='bigQueryPipeline', default_args=default_args, schedule_interval=timedelta(1)) 

t1 = BigQueryOperator(
task_id='bigquery_test', 
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]', 
destination_dataset_table=False, 
bigquery_conn_id='bigquery_default', 
delegate_to=False, 
udf_config=False, 
dag=dag, 
)` 

Komunikat o błędzie:

[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project' 
Traceback (most recent call last): 
    File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module> 
    args.func(args) 
    File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test 
    ti.run(force=True, ignore_dependencies=True, test_mode=True) 
    File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper 
    result = func(*args, **kwargs) 
    File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run 
    result = task_copy.execute(context=context) 
    File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute 
    conn = hook.get_conn() 
    File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn 
    project = connection_extras['project'] 
+0

Jean-Christophe Rodrigue, wymyśliłeś rozwiązanie? Utknąłem z tą samą wiadomością. Nie wiem, co to jest bigquery_conn_id, ponieważ * bigquery_default * nie działa dla mnie. – BGA

Odpowiedz

6

Zajęło mi trochę czasu, by w końcu znaleźć, ponieważ nie jest udokumentowane bardzo wyraźnie. W interfejsie przepływu powietrza przejdź do opcji Administrator -> Połączenie. Ten identyfikator połączenia jest tym, do którego odwołują się parametry bigquery_connection_id. Musisz dodać w polu "dodatki" obiekt json, który definiuje ak, v para "projektu": ""

Musisz również dodać klucze dla "service_account" i "key_path", jeśli nie masz jawnie autoryzowanego konta na pudełku, w którym korzystasz z Przepływu powietrza. (Gcloud auth)

+0

Operator BigQuery jest uszkodzony w bieżącej wersji, skonfigurowałem go z wszystkimi niezbędnymi "dodatkami" i nie można się połączyć. Powinny to mieć w następnym wydaniu, ale nie mam pojęcia, kiedy to wychodzi. – Ted

+0

Używam wersji v1.7.1.3 i działa dla mnie dobrze. Miałem problemy, ponieważ Google uaktualnił klienta oauth2, aby nie zawierał już SignedJwtAssertionCredentials, które naprawiłem, obniżając moją wersję Oauth. Nowa wersja przełącza się na użycie ServiceAccountCredentials. –

+0

Mam kilka ostrzeżeń o przestarzałości, gdy użyłem tego rozwiązania ... coś o nie używaniu '** kwargs' – Mike

1

Jeśli trzeba to zrobić programowo, używam tego jako punkt_wejścia w naszej stosu, aby utworzyć połączenie, jeśli jeszcze nie istnieje:

from airflow.models import Connection 
from airflow.settings import Session 

session = Session() 
gcp_conn = Connection(
    conn_id='bigquery', 
    conn_type='google_cloud_platform', 
    extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}') 
if not session.query(Connection).filter(
     Connection.conn_id == gcp_conn.conn_id).first(): 
    session.add(gcp_conn) 
    session.commit() 
+1

Gdzie to by się podziało? – Mike

0

Ostatnio naprawiłem podobny problem określając zarówno bigquery_conn_id i google_cloud_storage_conn_id takiego:

t1 = BigQueryOperator(
    task_id='bigquery_test', 
    bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]', 
    destination_dataset_table=False, 
    bigquery_conn_id='bigquery_default',    <-- Need these both 
    google_cloud_storage_conn_id='bigquery_default', <-- becasue of inheritance 
    delegate_to=False, 
    udf_config=False, 
    dag=dag, 
) 

Zobacz więcej w tej odpowiedzi: https://stackoverflow.com/a/45664830/634627

0

i sol po prostu zmieniając parametr bigquery_conn_id='bigquery_default' na bigquery_conn_id='bigquery', po uruchomieniu kodu wklejonego tutaj przez @clifton w osobnym skrypcie pythona.

Powiązane problemy