2015-02-26 14 views
10

Wdrażam mapę IBackingMap dla mojej topologii Trident do przechowywania krotek do ElasticSearch (wiem, że istnieje kilka implementacji integracji Trident/ElasticSearch już istniejących w GitHub, ale zdecydowałem się na wdrożenie niestandardowego, który lepiej odpowiada mojemu zadaniu).Jak zamknąć połączenie z bazą danych otwarte przez implementację IBackingMap w ramach topologii Storm Trident?

Więc moja realizacja jest klasycznym jeden z fabryki:

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> { 

    // omitting here some other cool stuff... 
    private final Client client; 

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) { 

     return new StateFactory() { 

      @Override 
      public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { 

       ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName); 
       CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE); 
       MapState ms = OpaqueMap.build(cm); 
       return new SnapshottableMap(ms, new Values(GLOBAL_KEY)); 
      } 
     }; 
    } 

    public ElasticSearchBackingMap(String host, int port, String clusterName) { 

     Settings settings = ImmutableSettings.settingsBuilder() 
       .put("cluster.name", clusterName).build(); 

     // TODO add a possibility to close the client 
     client = new TransportClient(settings) 
       .addTransportAddress(new InetSocketTransportAddress(host, port)); 
    } 

    // the actual implementation is left out 
} 

Widzisz robi host/port/nazwę klastra jako params wejściowych i tworzy klienta ElasticSearch jako członek klasy, ale nigdy ZAMYKA KLIENT.

Następnie jest on używany od wewnątrz topologii w dość znany sposób:

tridentTopology.newStream("spout", spout) 
      // ...some processing steps here... 
      .groupBy(aggregationFields) 
      .persistentAggregate(
        ElasticSearchBackingMap.getFactoryFor(
          ElasticSearchConfig.ES_HOST, 
          ElasticSearchConfig.ES_PORT, 
          ElasticSearchConfig.ES_CLUSTER_NAME 
        ), 
        new Fields(FieldNames.OUTCOME), 
        new BatchAggregator(), 
        new Fields(FieldNames.AGGREGATED)); 

Ta topologia jest owinięta w niektórych public static void main, pakowany w słoik i wysłany do burzowe do wykonania.

Pytanie, czy powinienem się martwić zamknięciem połączenia ElasticSearch, czy też jest to własna firma Storm? Jeśli nie zrobi tego Storm, jak i kiedy w cyklu życia topologii powinienem to zrobić?

Z góry dziękuję!

+0

TransportClient powinien być singleton dla każdego pracownika burzowego. [lista użytkowników] (http://elasticsearch-users.115913.n3.nabble.com/What-is-your-est-praktice-to-access-a-cluster-by-a-Java-client-td4015311. html). Właściwie myślę, że nie musisz zamykać klienta java, ponieważ topologia burzy nigdy nie powinna się zatrzymać. – fhussonnois

+1

Hak może być: utworzyć singleton dla każdego pracownika, np. podczas tworzenia pierwszego stanu i zamykania tego singletonu w metodzie oczyszczania agregatora - widzę w swoim kodzie "BatchAggregator". Ale chciałbym również zobaczyć lepsze rozwiązanie ... – dedek

+0

Zobacz także tę prośbę o funkcję: https://issues.apache.org/jira/browse/STORM-49 – dedek

Odpowiedz

3

OK, odpowiadając na moje własne pytanie.

Po pierwsze, jeszcze raz dzięki @dedek za sugestie i odnowienie biletu w Jira Storma.

Wreszcie, ponieważ nie ma oficjalnego sposobu, aby to zrobić, zdecydowałem się przejść do metody oczyszczania() filtra Trident. Tak dalece zostały zweryfikowane następujące (dla Burzy v 0.9.4.):

Z LocalCluster

  • porządki() jest wywoływana podczas zamykania klastra
  • porządki() nie dostać wywoływana po zabiciu topologii, to nie powinno być tragedia, najprawdopodobniej nie użyje LocalCluster dla prawdziwych wdrożeń w każdym razie

z prawdziwą klastra

  • to jest wywoływana gdy topologia zostaje zabity, a także wtedy, gdy pracownik został zatrzymany przy użyciu pkill -TERM -u burzę -f „backtype.storm.daemon.worker”
  • nie dostać nazywany jeżeli pracownik zostaje zabity z kill -9 lub kiedy się zawiesi lub - niestety - gdy pracownik umiera z powodu wyjątku

ogólnej, która daje mniej lub bardziej przyzwoity gwarancję czyszczenia(), aby uzyskać nazywane , pod warunkiem, że będziesz ostrożny przy obsłudze wyjątków (staram się dodawać "thundercatche" do każdego z moich Tridentów prymitywy i tak).

Mój kod:

public class CloseFilter implements Filter { 

    private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class); 

    private final Closeable[] closeables; 

    public CloseFilter(Closeable... closeables) { 
     this.closeables = closeables; 
    } 

    @Override 
    public boolean isKeep(TridentTuple tuple) { 
     return true; 
    } 

    @Override 
    public void prepare(Map conf, TridentOperationContext context) { 

    } 

    @Override 
    public void cleanup() { 
     for (Closeable c : closeables) { 
      try { 
       c.close(); 
      } catch (Exception e) { 
       LOG.warn("Failed to close an instance of {}", c.getClass(), e); 
      } 
     } 
    } 
} 

Jednak byłoby miło, gdyby niektóre haki dzień dla połączeń zamykających stać się częścią API.

Powiązane problemy