2013-10-05 11 views
6

Planuję użyć sterownika Java Datastax do pisania do Cassandry .. Byłem głównie zainteresowany Batch Writes i Asycnhronous funkcji sterownika Datastax java, ale nie jestem w stanie uzyskać żadnych tutoriale, które może mi wyjaśnić, jak włączyć te funkcje w moim poniżej kod, który wykorzystuje sterownik Datastax Java ..Jak używać funkcji Asynchronous/Batch pisze z Datastax Java driver

/** 
* Performs an upsert of the specified attributes for the specified id. 
*/ 
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) { 

    try { 

     // make a sql here using the above input parameters. 

     String sql = sqlPart1.toString()+sqlPart2.toString(); 

     DatastaxConnection.getInstance(); 
     PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql); 
     prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);   

     BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()])); 

     DatastaxConnection.getSession().execute(query); 

    } catch (InvalidQueryException e) { 
     LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e); 
    } catch (Exception e) { 
     LOG.error("Exception in DatastaxClient::upsertAttributes "+e); 
    } 
} 

w poniższym kodzie, tworzę połączenie do Cassandry węzły przy użyciu sterownika Datastax Java.

/** 
* Creating Cassandra connection using Datastax Java driver 
* 
*/ 
private DatastaxConnection() { 

    try{ 
     builder = Cluster.builder(); 
     builder.addContactPoint("some_nodes"); 

     builder.poolingOptions().setCoreConnectionsPerHost(
       HostDistance.LOCAL, 
       builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL)); 

     cluster = builder 
       .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) 
       .withReconnectionPolicy(new ConstantReconnectionPolicy(100L)) 
       .build(); 

     StringBuilder s = new StringBuilder(); 
     Set<Host> allHosts = cluster.getMetadata().getAllHosts(); 
     for (Host h : allHosts) { 
      s.append("["); 
      s.append(h.getDatacenter()); 
      s.append(h.getRack()); 
      s.append(h.getAddress()); 
      s.append("]"); 
     } 
     System.out.println("Cassandra Cluster: " + s.toString()); 

     session = cluster.connect("testdatastaxks"); 

    } catch (NoHostAvailableException e) { 
     e.printStackTrace(); 
     throw new RuntimeException(e); 
    } catch (Exception e) { 

    } 
} 

Czy ktoś może mi pomóc w jaki sposób dodać zapisy partii lub asynchroniczne funkcje do mojego powyższego kodu .. dzięki za pomoc ..

Używam Cassandrę 1.2.9

Odpowiedz

8

Dla Asynch to tak proste, jak przy użyciu executeAsync funkcję:

... 
DatastaxConnection.getSession().executeAsync(query); 

dla partii, trzeba zbudować kwerendę (używam strun ponieważ kompilator wie, w jaki sposób zoptymalizować łańcuch Conca tenation naprawdę dobrze):

String cql = "BEGIN BATCH " 
     cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); "; 
     cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); "; 
     cql += "APPLY BATCH; " 

DatastaxConnection.getInstance(); 
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql); 
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);   

// this is where you need to be careful 
// bind expects a comma separated list of values for all the params (?) above 
// so for the above batch we need to supply 4 params:      
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2"); 

DatastaxConnection.getSession().execute(query); 

Na marginesie, myślę, że wiązanie rachunku może wyglądać mniej więcej tak, zakładając, że zmiana atrybutów do listy map gdzie każda mapa reprezentuje aktualizacji/insert wewnątrz partii :

BoundStatement query = prepStatement.bind(userId, 
              attributesList.get(0).values().toArray(new Object[attributes.size()]), 
              userId_2, 
              attributesList.get(1).values().toArray(new Object[attributes.size()])); 
+0

Czy jest jakiś sposób, aby to zrobić z nazwanymi parametrami? – Highstead

+1

@Highstead Jaki język programowania? Powyższe jest java więc ([rodzaj nie] (http://java.dzone.com/articles/named-parameters-java)) –

+0

Skoncentrowałem się na python, ale założyłem, że istnieje sposób, aby zrobić to w jednym w drugim byłby sposób na zrobienie tego. Stary sterownik cql obsługuje go, ale został wycofany. Tak więc chciałem zastąpić funkcjonalność. – Highstead

5

dla przykładu przewidzianego w odpowiedzi Lyuben, wyznaczając pewne cechy partii jak Type.COUNTER (jeśli trzeba zaktualizować liczniki) za pomocą strun nie będzie działać. Zamiast tego możesz uporządkować przygotowane instrukcje wsadowo w następujący sposób:

final String insertQuery = "INSERT INTO test.prepared (id, col_1) VALUES (?,?);"; 
final PreparedStatement prepared = session.prepare(insertQuery); 

final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED); 
batch.add(prepared.bind(userId1, "something")); 
batch.add(prepared.bind(userId2, "another")); 
batch.add(prepared.bind(userId3, "thing")); 

session.executeAsync(batch); 
+1

Podoba mi się to lepiej niż zaakceptowana odpowiedź. Tutaj zawartość partii może być dynamiczna (w przeciwieństwie do ustalonej CQL i liczby argumentów w zaakceptowanej odpowiedzi) – 0cd

Powiązane problemy