2013-03-10 10 views
5

Próbuję napisać niestandardowy zlew do flume-ng. Spojrzałem na istniejące zlewozmywaki i dokumentację i zakodowałem je. Jednak metoda 'process()', która powinna odbierać zdarzenia, zawsze kończy się wartością null. Robię wydarzenie Event = channel.take(); ale wydarzenie ma wartość zerową. W dziennikach widzę, że ta metoda jest wywoływana wielokrotnie, ponieważ zdarzenie nadal znajduje się w kanale.Zlew niestandardowy dla Flume-ng null event

Czy ktoś może wskazać mi właściwy kierunek?

Odpowiedz

5

To jest szkielet funkcja procesu ... Jeśli nie uda się wydarzeniem Państwo wycofać, zmień status na odczekania. Jeśli nie, musisz zatwierdzić i ustawić status na READY. Nieważne, zawsze zamykasz transakcję.

Status status = null; 
    Channel channel = getChannel(); 
    Transaction transaction = channel.getTransaction(); 
    transaction.begin(); 
    try { 
     Event event = channel.take(); 

     if (event != null && validEvent(event.getBody()) >= 0) { 
      # make some printing 
     } 
     transaction.commit(); 
     status = Status.READY; 
    } catch (Throwable ex) { 
     transaction.rollback(); 
     status = Status.BACKOFF; 
     logger.error("Failed to deliver event. Exception follows.", ex); 
     throw new EventDeliveryException("Failed to deliver event: " + ex); 
    } finally { 
     transaction.close(); 
    } 
    return status; 

Jestem pewien, że to zadziała :).

+0

Awesome.Dziękujemy.Jest nadal pomaga mi w 2016 roku .. – logicalgeek

+0

hej mam podobny problem tutaj: https://stackoverflow.com/questions/46479157/streaming-kafka- wiadomości-do-mysql-baza danych Czy masz jakiś pomysł na ten temat? –

4

Jest to zgodne z projektem. Biegacz zlewu będzie odpytywać umywalkę wydarzeniami null, aby mieć pewność, że zlew jest żywy i gotowy do zaakceptowania przyszłych wydarzeń. Gdy otrzymasz zdarzenie null, upewnij się, że zwróciłeś Status.BACKOFF, a procesor tonera poczeka chwilę, zanim spróbujesz ponownie.

+0

Dziwne, że [dokumentacja] (http://flume.apache.org/FlumeDeveloperGuide.html#sink) nic o tym nie mówi. – Dmitry

+0

Zgadzam się. Dokumentacja flume jest bardzo minimalna i powinna być nieco bardziej szczegółowa. – logicalgeek

+0

Jaki jest czas wstrzymania? I jak jest kontrolowany? Klasa AbstractSink nie implementuje metod takich jak źródła, np. publicznych długo getBackOffSleepIncrement() publicznych długo getMaxBackOffSleepInterval ( – bearrito