2015-01-12 14 views
7

Muszę przechowywać około 250 wartości liczbowych na sekundę na klienta, czyli około 900 000 numerów na godzinę. Prawdopodobnie nie będzie to całodniowe nagranie (prawdopodobnie od 5 do 10 godzin dziennie), ale podzielę dane na podstawie identyfikatora klienta i dnia dokonania odczytu. Maksymalna długość rzędu wynosi około 22-23 M, co jest nadal możliwe do zarządzania. Neverteless, mój program wygląda następująco:Klaster Cassandra ze słabą wydajnością płytki i stabilnością płytki

CREATE TABLE measurement (
    clientid text, 
    date text, 
    event_time timestamp, 
    value int, 
    PRIMARY KEY ((clientid,date), event_time) 
); 

KEYSPACE ma współczynnik replikacji 2, tylko do testowania, znicz jest GossipingPropertyFileSnitch i NetworkTopologyStrategy. Wiem, że współczynnik replikacji 3 to wyższy standard produkcji.

Następnie stworzyłem niewielki klaster na serwerach firmowych, trzy wirtualne wirtualne maszyny z 2 procesorami x 2 rdzeniami i 16 GB pamięci RAM i dużą ilością miejsca. Jestem z nimi w gigabitowej sieci LAN. Klaster działa, bazując na nodetool.

Oto kod używam przetestować konfigurację:

 Cluster cluster = Cluster.builder() 
       .addContactPoint("192.168.1.100") 
       .addContactPoint("192.168.1.102") 
       .build(); 
     Session session = cluster.connect(); 
     DateTime time = DateTime.now(); 
     BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true); 

    try { 

     ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts 

     String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)"; 
     PreparedStatement preparedStatement = session.prepare(insertQuery); 
     BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also 

     //generating the entries 
     for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements 
      time = time.plus(4); //4ms between each entry 
      BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important 
      batch.add(bound); 

      //The batch statement must have 65535 statements at most 
      if (batch.size() >= 65534) { 
       queryQueue.put(batch); 
       batch = new BatchStatement(); 
      } 
     } 
     queryQueue.put(batch); //the last batch, perhaps shorter than 65535 

     //storing the data 
     System.out.println("Starting storing"); 
     while (!queryQueue.isEmpty()) { 
      pool.execute(() -> { 
       try { 

        long threadId = Thread.currentThread().getId(); 
        System.out.println("Started: " + threadId); 
        BatchStatement statement = queryQueue.take(); 
        long start2 = System.currentTimeMillis(); 
        session.execute(statement); 
        System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2)); 
       } catch (Exception ex) { 
        System.out.println(ex.toString()); 
       } 
      }); 

     } 
     pool.shutdown(); 
     pool.awaitTermination(120,TimeUnit.SECONDS); 


    } catch (Exception ex) { 
     System.out.println(ex.toString()); 
    } finally { 
     session.close(); 
     cluster.close(); 
    } 

wymyśliłem kodu czytając posty tutaj i na innych blogów i stron internetowych. Jak zrozumiałem, ważne jest, aby klient używał wielu wątków, dlatego to zrobiłem. Próbowałem również używać operacji asynchronicznych.

Wynik końcowy jest taki, bez względu na to, z którego podejścia korzystam, jedna partia wykonuje się w ciągu 5-6 sekund, chociaż może to zająć nawet 10. Bierze to samo, jeśli wprowadzę tylko jedną partię (czyli tylko ~ 65 tys. Kolumn) lub jeśli używam głupiego pojedynczego wątku. Szczerze mówiąc, spodziewałem się trochę więcej. Zwłaszcza, że ​​uzyskuję mniej lub bardziej podobną wydajność na moim laptopie z lokalną instancją.

Drugi, może ważniejszy problem, to wyjątki, z którymi mam do czynienia w nieprzewidywalny sposób. Są dwa:

com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra Timeout podczas kwerendy pisać w konsystencji jeden (1 replika musieli ale tylko 0 uznał zapis)

i

com.datastax.driver.core.exceptions.NoHostAvailableException: Wszystkie gospodarza (y) próbuje dla zapytania nie powiodło się (próbuje: /192.168.1.102:9042 (com.datastax.dri ver.core.TransportException: [/192.168.1.102:9042] Połączenie zostało zamknięte), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Połączenie ma zamknięte) /192.168.1.101:9042 (com.datastax.driver.core.TransportException [/192.168.1.101:9042] połączenie zostało zamknięte))

w dolnym wierszu, jestem robienie czegoś złego? Czy powinienem zreorganizować sposób ładowania danych lub zmienić schemat. Próbowałem zmniejszyć długość wiersza (więc mam 12-godzinne wiersze), ale to nie miało dużej różnicy.

============================== Aktualizacja:

byłem niegrzeczny i zapomniał wkleić przykładowe kodu, którego użyłem po udzieleniu odpowiedzi na pytanie. Działa całkiem dobrze, jednak kontynuuję moje badania z KairosDB i binarnym transferem z Astyanax.Wygląda na to, że mogę uzyskać znacznie lepszą wydajność z nimi w porównaniu z CQL, chociaż KairosDB może mieć pewne problemy, gdy jest w przeciążeniu (ale pracuję nad nim) i Astyanax jest trochę gadatliwy do wykorzystania na mój gust. Mimo to, oto kod, może gdzieś się mylę.

Numer szczeliny semafora nie ma wpływu na wydajność przy przechodzeniu powyżej 5000, czyli prawie na stałym poziomie.

String insertQuery = "insert into keyspace.measurement  (userid,time_by_hour,time,value) values (?, ?, ?, ?)"; 
     PreparedStatement preparedStatement =  session.prepare(insertQuery); 
     Semaphore semaphore = new Semaphore(15000); 

    System.out.println("Starting " + Thread.currentThread().getId()); 
    DateTime time = DateTime.parse("2015-01-05T12:00:00"); 
    //generating the entries 
    long start = System.currentTimeMillis(); 

    for (int i = 0; i < 900000; i++) { 

     BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important 
     semaphore.acquire(); 
     ResultSetFuture resultSetFuture = session.executeAsync(statement); 
     Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() { 
      @Override 
      public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) { 

       semaphore.release(); 
      } 

      @Override 
      public void onFailure(Throwable throwable) { 
       System.out.println("Error: " + throwable.toString()); 
       semaphore.release(); 
      } 
     }); 
     time = time.plus(4); //4ms between each entry 
    } 

Odpowiedz

4

Jakie są Twoje wyniki przy użyciu niezalogowanego dozowania? Czy na pewno chcesz w ogóle używać instrukcji wsadowych? https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

+0

Nie drastycznie różne. Jestem prawie pewien, że chcę użyć partii, ponieważ pracowałem już nad podobnymi rzeczami w innych projektach, a zdania jeden po drugim były wolniejsze. W każdym razie nie ma sensu, aby była szybsza. –

+1

Spod ma rację. Partie w Cassandrze nie są optymalizacją wydajności. Zalogowane partie powinny być używane tylko wtedy, gdy wymagana jest atomowość i istnieje kara za wydajność w celu osiągnięcia zapisu atomowego. Nawet niezalogowane partie są często wolniejsze niż proste asynchroniczne zapytania, w istocie wymuszają niepotrzebną koordynację (chyba, że ​​kluczujesz grupowo i używasz tokena - może jesteś tutaj). Zwykle zalecam proste zapisywanie asynchroniczne. Oto kolejny artykuł na poparcie tego poglądu: http: //lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/ – phact

+1

Odnośnie twoich limitów czasu, stanie się to, gdy zaczniesz przytłaczają twoje węzły c * zbyt wieloma zapisami. Łatwo to zrobić z asynchronicznymi zapytaniami, ponieważ twój program generuje zapis tak szybko, jak tylko może nieprzerwanie. Po usunięciu twoich partii (w szczególności logowania) powinieneś zauważyć poprawę, ale być może będziesz musiał zmniejszyć przepustowość swoich zapisów, a nawet zwiększyć czasy oczekiwania, jeśli zezwala na to Twoja umowa SLA. – phact

Powiązane problemy