Korzystam z sterownika datastax java 3.1.0, aby połączyć się z kelnera i moją wersją kelnerki jest 2.0.10. Piszę asynchronicznie z konsekwencją QUORUM.Jak przepustnica zapisuje żądanie do kassandra podczas pracy z "executeAsync"?
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
Moja powyższa metoda zapisu zostanie wywołana z wielu wątków z bardzo dużą szybkością.
Pytanie:
chcę udusić wniosek do executeAsync
metody, która pisze asynchronicznie do Cassandry. Jeśli piszę z dużą prędkością, niż może obsłużyć mój klaster Cassandra, to zacznie wyrzucać błędy i chcę, aby wszystkie moje zapisy trafiały z powodzeniem do kassandra bez żadnych strat.
Widziałem to post, gdzie rozwiązaniem jest użycie Semaphore
ze stałą liczbą zezwoleń. Ale nie jestem pewien jak i jak najlepiej to zrealizować. Nigdy wcześniej nie używałem Semaphora. To jest logika. Czy ktoś może podać przykład z podstawą Semafor na moim kodzie lub jeśli istnieje lepszy sposób/opcja, to daj mi znać również.
W kontekście pisania programu DataLoader, można zrobić coś jak następuje:
- zachować rzeczy proste wykorzystanie semafora lub inny konstrukt o stałej liczbie pozwoleń (które będą być twoją maksymalną liczbą zapytań o numer ). Ilekroć przejdziesz do wysłania zapytania przy użyciu executeAsync, uzyskać pozwolenie. Powinieneś naprawdę potrzebować tylko jednego wątku (ale może chcesz, aby wprowadził pulę rozmiaru rdzeni # cpu, która to robi), która nabywa zezwoleń z Semaforu i wykonuje kwerendy. Będzie po prostu blokować , dopóki nie będzie dostępnego pozwolenia.
- Użyj Futures.addCallback dla przyszłości zwróconej z executeAsync. Callback powinien wywoływać Sempahore.release() w obu przypadkach onSuccess i onFailure. Zwolnienie zezwolenia powinno zezwalać na wątek w kroku 1, aby kontynuować i przesłać kolejne żądanie.
także Widziałem kilka innych post gdzie rozmawialiśmy o użyciu RingBuffer
lub Guava RateLimitter
więc który z nich jest lepszy i powinien być używany? Poniżej znajdują się opcje można myślę:
- Korzystanie Semaphore
- za pomocą pierścienia Buffer
- Korzystanie Guava Oceń Limiter
Czy ktoś może mi pomóc z przykładem jak możemy udusić żądanie lub uzyskać ciśnienie wsteczne dla Kasandra i upewnić się, że wszystkie zapisy trafiają z powodzeniem do kassandra?
sądzisz można podać mi przykład dla kolejki lub bufor przykład mi dałeś? Myślę, że to mi najbardziej odpowiada w moim scenariuszu. – john