Wdrażam mapę IBackingMap dla mojej topologii Trident do przechowywania krotek do ElasticSearch (wiem, że istnieje kilka implementacji integracji Trident/ElasticSearch już istniejących w GitHub, ale zdecydowałem się na wdrożenie niestandardowego, który lepiej odpowiada mojemu zadaniu).Jak zamknąć połączenie z bazą danych otwarte przez implementację IBackingMap w ramach topologii Storm Trident?
Więc moja realizacja jest klasycznym jeden z fabryki:
public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {
// omitting here some other cool stuff...
private final Client client;
public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {
return new StateFactory() {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
MapState ms = OpaqueMap.build(cm);
return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
}
};
}
public ElasticSearchBackingMap(String host, int port, String clusterName) {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName).build();
// TODO add a possibility to close the client
client = new TransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(host, port));
}
// the actual implementation is left out
}
Widzisz robi host/port/nazwę klastra jako params wejściowych i tworzy klienta ElasticSearch jako członek klasy, ale nigdy ZAMYKA KLIENT.
Następnie jest on używany od wewnątrz topologii w dość znany sposób:
tridentTopology.newStream("spout", spout)
// ...some processing steps here...
.groupBy(aggregationFields)
.persistentAggregate(
ElasticSearchBackingMap.getFactoryFor(
ElasticSearchConfig.ES_HOST,
ElasticSearchConfig.ES_PORT,
ElasticSearchConfig.ES_CLUSTER_NAME
),
new Fields(FieldNames.OUTCOME),
new BatchAggregator(),
new Fields(FieldNames.AGGREGATED));
Ta topologia jest owinięta w niektórych public static void main, pakowany w słoik i wysłany do burzowe do wykonania.
Pytanie, czy powinienem się martwić zamknięciem połączenia ElasticSearch, czy też jest to własna firma Storm? Jeśli nie zrobi tego Storm, jak i kiedy w cyklu życia topologii powinienem to zrobić?
Z góry dziękuję!
TransportClient powinien być singleton dla każdego pracownika burzowego. [lista użytkowników] (http://elasticsearch-users.115913.n3.nabble.com/What-is-your-est-praktice-to-access-a-cluster-by-a-Java-client-td4015311. html). Właściwie myślę, że nie musisz zamykać klienta java, ponieważ topologia burzy nigdy nie powinna się zatrzymać. – fhussonnois
Hak może być: utworzyć singleton dla każdego pracownika, np. podczas tworzenia pierwszego stanu i zamykania tego singletonu w metodzie oczyszczania agregatora - widzę w swoim kodzie "BatchAggregator". Ale chciałbym również zobaczyć lepsze rozwiązanie ... – dedek
Zobacz także tę prośbę o funkcję: https://issues.apache.org/jira/browse/STORM-49 – dedek