2012-05-25 11 views
11

Ostatnio pracowałem z Amazon Web Services (AWS) i zauważyłem, że nie ma zbyt wiele dokumentacji na ten temat, więc dodałem swoje rozwiązanie.Jak mogę poczekać na zakończenie elastycznego przepływu pracy MapReduce w aplikacji Java?

Piszę aplikację przy użyciu Amazon Elastic MapReduce (Amazon EMR). Po zakończeniu obliczeń musiałem wykonać pewne prace nad utworzonymi przez nie plikami, więc musiałem wiedzieć, kiedy przepływ pracy zakończył pracę.

W ten sposób można sprawdzić, czy przepływ zadanie zakończone:

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials); 

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest() 
    .withJobFlowStates("COMPLETED"); 

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows(); 
JobFlowDetail detail = jobs.get(0); 

detail.getJobFlowId(); //the id of one of the completed jobs 

Można również poszukać konkretnego id pracy w DescribeJobFlowsRequest a następnie sprawdzić, czy to zadanie zostało zakończone z niepowodzeniem.

Mam nadzieję, że pomoże innym.

+5

Złożenie własnego rozwiązania problemu natychmiast jest bardzo mile widziane tutaj jednak pożądane podejście jest podzielić to na pytanie i odpowiedź nadal, patrz [Jest OK, aby zadawać i odpowiadać na własne pytania] (http : //blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) - pomaga to odpowiednio sortować/kategoryzować rzeczy, np. zrobić miejsce na naprawdę bez odpowiedzi pytania zastosowanie, dzięki! –

+0

Dzięki, zauważę to jako punkt odniesienia w przyszłości. – siditom

+0

Powinieneś również uwzględnić pozostałe ukończone stany. Niektórzy ludzie czytający to mogą zapętlać się na zawsze, jeśli zainicjują 'jobAttributes' jak podano. 'DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest(). WithJobFlowStates (" COMPLETED "," TERMINATED "," FAILED ");' –

Odpowiedz

1

Po zakończeniu przepływu zadań klaster się zatrzymuje, a partycja HDFS zostaje utracona. , aby zapobiec utracie danych, skonfiguruj ostatni krok przepływu pracy, aby przechowywać wyniki w usłudze Amazon S3.

Jeśli JobFlowInstancesDetail: parametr KeepJobFlowAliveWhenNoSteps jest ustawiony na TRUE, przepływ pracy będzie przejście do stanu CZEKANIA zamiast zamykania raz kroki zostały zakończone.

Maksymalnie 256 kroków jest dozwolone w każdym przepływie pracy.

Jeśli Twoja praca jest czasochłonna, zalecamy okresowe przechowywanie wyników.

Krótko mówiąc: nie ma sposobu, aby wiedzieć, kiedy to się stanie. Zamiast tego należy zapisać dane w ramach pracy.

1

Użyj opcji --wait-for-steps podczas tworzenia przepływu pracy.

./elastic-mapreduce --create \ 
... 
--wait-for-steps \ 
... 
3

Wpadłem również na ten problem, a oto rozwiązanie, na które teraz wpadłem. Nie jest doskonały, ale mam nadzieję, że będzie pomocny. Dla porównania używam Java 1.7 i AWS Java SDK w wersji 1.9.13.

Zauważ, że ten kod zakłada, że ​​czekasz na klastrzewypowiedzieć, a nie kroki ściśle mówiąc; jeśli twój klaster zostanie zakończony po wykonaniu wszystkich kroków, to jest w porządku, ale jeśli używasz klastrów, które pozostają żywe po ukończeniu kroku, to nie pomoże ci zbytnio.

Należy również zauważyć, że ten kod monitoruje i rejestruje zmiany stanu klastra, a ponadto diagnozuje, czy klaster został zakończony z błędami i zgłasza wyjątek, jeśli tak się stało.

private void yourMainMethod() { 
    RunJobFlowRequest request = ...; 

    try { 
     RunJobFlowResult submission = emr.runJobFlow(request); 
     String jobFlowId = submission.getJobFlowId(); 
     log.info("Submitted EMR job as job flow id {}", jobFlowId); 

     DescribeClusterResult result = 
      waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS); 
     diagnoseClusterResult(result, jobFlowId); 
    } finally { 
     emr.shutdown(); 
    } 
} 

private DescribeClusterResult waitForCompletion(
      AmazonElasticMapReduceClient emr, String jobFlowId, 
      long sleepTime, TimeUnit timeUnit) 
     throws InterruptedException { 
    String state = "STARTING"; 
    while (true) { 
     DescribeClusterResult result = emr.describeCluster(
       new DescribeClusterRequest().withClusterId(jobFlowId) 
     ); 
     ClusterStatus status = result.getCluster().getStatus(); 
     String newState = status.getState(); 
     if (!state.equals(newState)) { 
      log.info("Cluster id {} switched from {} to {}. Reason: {}.", 
        jobFlowId, state, newState, status.getStateChangeReason()); 
      state = newState; 
     } 

     switch (state) { 
      case "TERMINATED": 
      case "TERMINATED_WITH_ERRORS": 
      case "WAITING": 
       return result; 
     } 

     timeUnit.sleep(sleepTime); 
    } 
} 

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) { 
    ClusterStatus status = result.getCluster().getStatus(); 
    ClusterStateChangeReason reason = status.getStateChangeReason(); 
    ClusterStateChangeReasonCode code = 
     ClusterStateChangeReasonCode.fromValue(reason.getCode()); 
    switch (code) { 
    case ALL_STEPS_COMPLETED: 
     log.info("Completed EMR job {}", jobFlowId); 
     break; 
    default: 
     failEMR(jobFlowId, status); 
    } 
} 

private static void failEMR(String jobFlowId, ClusterStatus status) { 
    String msg = "EMR cluster run %s terminated with errors. ClusterStatus = %s"; 
    throw new RuntimeException(String.format(msg, jobFlowId, status)); 
} 
Powiązane problemy