10

Korzystam z sterownika datastax java 3.1.0, aby połączyć się z kelnera i moją wersją kelnerki jest 2.0.10. Piszę asynchronicznie z konsekwencją QUORUM.Jak przepustnica zapisuje żądanie do kassandra podczas pracy z "executeAsync"?

private final ExecutorService executorService = Executors.newFixedThreadPool(10); 

    public void save(String process, int clientid, long deviceid) { 
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; 
    try { 
     BoundStatement bs = CacheStatement.getInstance().getStatement(sql); 
     bs.setConsistencyLevel(ConsistencyLevel.QUORUM); 
     bs.setString(0, process); 
     bs.setInt(1, clientid); 
     bs.setLong(2, deviceid); 

     ResultSetFuture future = session.executeAsync(bs); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
     @Override 
     public void onSuccess(ResultSet result) { 
      logger.logInfo("successfully written"); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      logger.logError("error= ", t); 
     } 
     }, executorService); 
    } catch (Exception ex) { 
     logger.logError("error= ", ex); 
    } 
    } 

Moja powyższa metoda zapisu zostanie wywołana z wielu wątków z bardzo dużą szybkością.

Pytanie:

chcę udusić wniosek do executeAsync metody, która pisze asynchronicznie do Cassandry. Jeśli piszę z dużą prędkością, niż może obsłużyć mój klaster Cassandra, to zacznie wyrzucać błędy i chcę, aby wszystkie moje zapisy trafiały z powodzeniem do kassandra bez żadnych strat.

Widziałem to post, gdzie rozwiązaniem jest użycie Semaphore ze stałą liczbą zezwoleń. Ale nie jestem pewien jak i jak najlepiej to zrealizować. Nigdy wcześniej nie używałem Semaphora. To jest logika. Czy ktoś może podać przykład z podstawą Semafor na moim kodzie lub jeśli istnieje lepszy sposób/opcja, to daj mi znać również.

W kontekście pisania programu DataLoader, można zrobić coś jak następuje:

  • zachować rzeczy proste wykorzystanie semafora lub inny konstrukt o stałej liczbie pozwoleń (które będą być twoją maksymalną liczbą zapytań o numer ). Ilekroć przejdziesz do wysłania zapytania przy użyciu executeAsync, uzyskać pozwolenie. Powinieneś naprawdę potrzebować tylko jednego wątku (ale może chcesz, aby wprowadził pulę rozmiaru rdzeni # cpu, która to robi), która nabywa zezwoleń z Semaforu i wykonuje kwerendy. Będzie po prostu blokować , dopóki nie będzie dostępnego pozwolenia.
  • Użyj Futures.addCallback dla przyszłości zwróconej z executeAsync. Callback powinien wywoływać Sempahore.release() w obu przypadkach onSuccess i onFailure. Zwolnienie zezwolenia powinno zezwalać na wątek w kroku 1, aby kontynuować i przesłać kolejne żądanie.

także Widziałem kilka innych post gdzie rozmawialiśmy o użyciu RingBuffer lub Guava RateLimitter więc który z nich jest lepszy i powinien być używany? Poniżej znajdują się opcje można myślę:

  • Korzystanie Semaphore
  • za pomocą pierścienia Buffer
  • Korzystanie Guava Oceń Limiter

Czy ktoś może mi pomóc z przykładem jak możemy udusić żądanie lub uzyskać ciśnienie wsteczne dla Kasandra i upewnić się, że wszystkie zapisy trafiają z powodzeniem do kassandra?

Odpowiedz

4

Nie jest to autorytatywna odpowiedź, ale może byłoby pomocne. Najpierw powinieneś rozważyć, co zrobiłbyś, gdy zapytanie nie może być wykonane od razu. Bez względu na to, jaką stawkę limitu wybrałeś, jeśli otrzymasz prośbę o wyższą stawkę, niż możesz napisać do Cassandry, w końcu proces zostanie zatkany oczekującymi żądaniami.W tym momencie będziesz musiał powiedzieć swoim klientom, aby wstrzymali swoje prośby przez jakiś czas ("odepchnij"). Na przykład. jeśli przychodzą przez HTTP, wówczas status odpowiedzi byłby 429 "Zbyt wiele żądań". Jeśli generujesz żądania w tym samym procesie, zdecyduj, który najdłuższy limit czasu jest akceptowalny. To powiedziawszy, jeśli Cassandra nie może nadążyć, to czas na skalowanie (lub dostrajanie).

Być może przed wdrożeniem limitów stawek warto eksperymentować i dodawać sztuczne opóźnienia w wątkach przed wywołaniem metody save (przy użyciu Thread.sleep (...)) i sprawdzić, czy rozwiązuje ona Twój problem, czy coś innego jest potrzebne.

Błąd zwrotu kwerendy to przeciwciśnienie z Cassandry. Ale możesz wybrać lub wdrożyć RetryPolicy, aby określić, kiedy ponawiać próby niepowodzenia kwerend.

Możesz również obejrzeć: connection pool options (a zwłaszcza Monitoring and tuning the pool). Można ustawić numer asynchronicznego requests per connection. Jednak dokumentacja mówi, że dla Cassandry 2.x tego parametru kapsle do 128 i nie należy go zmienić (ja z nią eksperymentować choć :)

Wykonanie z Semaphore wygląda

/* Share it among all threads or associate with a thread for per-thread limits 
    Number of permits is to be tuned depending on acceptable load. 
*/ 
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) { 
    .... 
    queryPermits.acquire(); // Blocks until a permit is available 

    ResultSetFuture future = session.executeAsync(bs); 
    Futures.addCallback(future, new FutureCallback<ResultSet>() { 
    @Override 
    public void onSuccess(ResultSet result) { 
     queryPermits.release(); 
     logger.logInfo("successfully written"); 
    } 
    @Override 
    public void onFailure(Throwable t) { 
     queryPermits.release(); // Permit should be released in all cases. 
     logger.logError("error= ", t); 
    } 
    }, executorService); 
    .... 
} 

(W prawdziwym kodzie Stworzyłem wywołanie zwrotne opakowania, które zwalniałoby zezwolenia, a następnie wywoływało metody opakowane)

RateLimiter Guava jest podobny do semafora, ale pozwala na tymczasowe impulsy po niepełnym wykorzystaniu i limity żądań oparte na taktowaniu (nie całkowita liczba aktywnych zapytań).

Mimo to, wnioski z różnych powodów mogą się nie powieść, więc prawdopodobnie lepiej będzie mieć plan, jak je ponowić (w przypadku sporadycznych błędów).

Może to nie być odpowiednie w twoim przypadku, ale spróbowałbym użyć kolejki lub bufora do kolejkowania żądań (np. java.util.concurrent.ArrayBlockingQueue). "Pełny bufor" oznaczałby, że klienci powinni poczekać lub zrezygnować z żądania. Bufor będzie również używany do ponownej rejestracji nieudanych żądań. Jednak bardziej sprawiedliwe żądania nieudane prawdopodobnie powinny zostać umieszczone na wierzchu kolejki, aby były najpierw ponawiane. Również należy jakoś poradzić sobie z sytuacją, gdy kolejka jest pełna i pojawiają się nowe żądania zakończone niepowodzeniem w tym samym czasie. Pracownik o jednym wątku wybierałby kolejki żądań i wysyłał je do Cassandry. Ponieważ nie powinno to wiele robić, jest mało prawdopodobne, aby stał się wąskim gardłem. Ten pracownik może również zastosować własne limity stawek, np. na podstawie czasu z com.google.common.util.concurrent.RateLimiter.

Jeśli chcesz uniknąć utraty wiadomości w jak największym stopniu, możesz umieścić brokera wiadomości z uporem (np. Kafka) przed Cassandrą. W ten sposób przychodzące wiadomości mogą przetrwać nawet długie przerwy Cassandry. Ale wydaje mi się, że w twoim przypadku to przesada.

+0

sądzisz można podać mi przykład dla kolejki lub bufor przykład mi dałeś? Myślę, że to mi najbardziej odpowiada w moim scenariuszu. – john

1

Po prostu za pomocą kolejki blokującej powinna to zrobić dobrze. Kontrakty futures są gwintowane, a ich oddzwonienie (sukces i porażka) będzie działało jako konsument, a gdziekolwiek nazwiesz metodę składowania, będziesz działał jako producent.

Jeszcze lepszy będzie sposób, wrzucisz kompletne żądanie do kolejki, i usuniesz go kolejno, po każdym usunięciu kolizji.

private final ExecutorService executorService = Executors.newFixedThreadPool(10); 

public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) { 
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; 
    try { 
     BoundStatement bs = CacheStatement.getInstance().getStatement(sql); 
     bs.setConsistencyLevel(ConsistencyLevel.QUORUM); 
     bs.setString(0, process); 
     bs.setInt(1, clientid); 
     bs.setLong(2, deviceid); 

     ResultSetFuture future = session.executeAsync(bs); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
     @Override 
     public void onSuccess(ResultSet result) { 
      logger.logInfo("successfully written"); 
      queue.take(); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      logger.logError("error= ", t); 
      queue.take(); 
     } 
     }, executorService); 
    } catch (Exception ex) { 
     logger.logError("error= ", ex); 
    } 
} 

public void invokeSaveInLoop(){ 
    Object dummyObj = new Object(); 
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);; 
    for(int i=0; i< 1000; i++){ 
     save("process", clientid, deviceid, queue); 
     queue.put(dummyObj); 
    } 
} 

Jeśli chcesz iść dalej i sprawdzić obciążenie klastra połowie drogi

public static String getCurrentState(){  
StringBuilder response = new StringBuilder(); 
      response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n"); 
      final LoadBalancingPolicy loadBalancingPolicy = 
        cluster.getConfiguration().getPolicies().getLoadBalancingPolicy(); 
      final PoolingOptions poolingOptions = 
        cluster.getConfiguration().getPoolingOptions(); 
      Session.State state = session.getState(); 
      for (Host host : state.getConnectedHosts()) { 
       HostDistance distance = loadBalancingPolicy.distance(host); 
       int connections = state.getOpenConnections(host); 
       int inFlightQueries = state.getInFlightQueries(host); 
       response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n", 
           host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries, 
           connections * 
             poolingOptions.getMaxRequestsPerConnection(distance))) 
         .append("<br>\n"); 
      } 
      return response.toString(); 
} 
Powiązane problemy