2017-12-24 163 views
5

W systemie Airflow napotykam problem, który muszę przekazać job_flow_id do jednego z moich kroków emr. Jestem w stanie pobrać job_flow_id od operatora, ale kiedy mam zamiar utworzyć kroki do przesłania do klastra, wartość task_instance jest nieprawidłowa. Mam następujący kod:Przepływ powietrza - Instancja zadania w sterowniku EMR

def issue_step(name, args): 
    return [ 
     { 
      "Name": name, 
      "ActionOnFailure": "CONTINUE", 
      "HadoopJarStep": { 
       "Jar": "s3://....", 
       "Args": args 
      } 
     } 
    ] 

dag = DAG('example', 
      description='My dag', 
      schedule_interval='0 8 * * 6', 
      dagrun_timeout=timedelta(days=2)) 

try: 

    create_emr = EmrCreateJobFlowOperator(
     task_id='create_job_flow', 
     aws_conn_id='aws_default',   
     dag=dag 
    ) 

    load_data_steps = issue_step('load', ['arg1', 'arg2']) 

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id') 
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
     "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id 

    load_data = EmrAddStepsOperator(
     task_id='load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others 
     aws_conn_id='aws_default', 
     steps=load_data_steps, 
     dag=dag 
    ) 

    check_load_data = EmrStepSensor(
     task_id='watch_load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    cluster_remover = EmrTerminateJobFlowOperator(
     task_id='remove_cluster', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    create_emr_recommendations >> load_data 
    load_data >> check_load_data 
    check_load_data >> cluster_remover 

except AirflowException as ae: 
    print ae.message 

Problem polega na tym, że kiedy sprawdzić EMR, zamiast widząc --cluster-id j-1234 w kroku load_data widzę --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}", co powoduje mój krok na niepowodzenie.

Jak mogę uzyskać rzeczywistą wartość wewnątrz funkcji krokowej?

Dzięki i wesołych świąt

+0

czy próbowałeś dodać wartość bez cudzysłowów? load_data_steps [0] ["HadoopJarStep"] ["Args"]. append ( {{task_instance.xcom_pull ('create_job_flow', key = 'return_value')}}) –

+0

gdzie otrzymam '' 'task_instance''' obiekt z? Nadal uczę się go używać. – davideberdin

Odpowiedz

3

I okazało się, że nie ma PR na repozytorium przepływu powietrza o this. Problem polega na tym, że w modelu EmrAddStepsOperator nie ma szablonów. Aby rozwiązać ten problem, zrobiłem co następuje:

  • Utworzono operator zwyczaj, który dziedziczy z EmrAddStepsOperator
  • dodanych w tym operatora jako Plugin
  • Called nowo operator w moim DAG złożyć

Tutaj kod niestandardowego operatora i wtyczki w pliku custom_emr_add_step_operator.py (patrz drzewo poniżej)

from __future__ import division, absolute_import, print_function 

from airflow.plugins_manager import AirflowPlugin 
from airflow.utils import apply_defaults 

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator 


class CustomEmrAddStepsOperator(EmrAddStepsOperator): 
    template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above 

    @apply_defaults 
    def __init__(
      self, 
      *args, **kwargs): 
     super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs) 

    def execute(self, context): 
     super(CustomEmrAddStepsOperator, self).execute(context=context) 


# Defining the plugin class 
class CustomPlugin(AirflowPlugin): 
    name = "custom_plugin" 
    operators = [CustomEmrAddStepsOperator] 

W moim pliku DAG Zadzwoniłem do wtyczki w ten sposób

from airflow.operators import CustomEmrAddStepsOperator 

Struktura mojego projektu i wtyczek wygląda następująco:

├── config 
│   └── airflow.cfg 
├── dags 
│   ├── __init__.py 
│   └── my_dag.py 
├── plugins 
│   ├── __init__.py 
│   └── operators 
│    ├── __init__.py 
│    └── custom_emr_add_step_operator.py 
└── requirements.txt 

Jeśli używasz IDE takich jak pycharm, to będzie narzekać, ponieważ mówi, że nie może znaleźć modułu. Ale kiedy uruchomisz przepływ powietrza, ten problem nie pojawi się. Pamiętaj też, aby upewnić się, że w swoim airflow.cfg wskazujesz właściwy folder plugins, aby Airflow mógł odczytać twoją nowo utworzoną wtyczkę.

Powiązane problemy