2013-06-04 12 views
5

Chcę użyć elastycznego interfejsu API massicsearch przy użyciu java i zastanawiając się, jak ustawić rozmiar partii.elasticsearch java wielkość wsadu zbiorczego

Obecnie używam go jako:

BulkRequestBuilder bulkRequest = getClient().prepareBulk(); 
while(hasMore) { 
    bulkRequest.add(getClient().prepareIndex(indexName, indexType, artist.getDocId()).setSource(json)); 
    hasMore = checkHasMore(); 
} 
BulkResponse bResp = bulkRequest.execute().actionGet(); 
//To check failures 
log.info("Has failures? {}", bResp.hasFailures()); 

jakiś pomysł jak mogę ustawić rozmiar luzem/batch?

+1

Proszę zaznaczyć odpowiedź prawidłowe ..... –

Odpowiedz

21

To zależy głównie od wielkości dokumentów, dostępnych zasobów na kliencie i typu klienta (klienta transportu lub klienta węzła).

Klient węzła jest świadomy odłamków w klastrze i wysyła dokumenty bezpośrednio do węzłów, w których znajdują się odłamki, w których powinny być indeksowane. Z drugiej strony klient transportu jest normalnym klientem, który przesyła swoje żądania do listy węzłów w sposób cykliczny. Żądanie masowe zostanie wysłane do jednego węzła, który stałby się twoją bramą podczas indeksowania.

Ponieważ używasz interfejsu Java API, sugeruję, abyś rzucił okiem na BulkProcessor, co znacznie ułatwia i elastycznie indeksuje. Można zdefiniować maksymalną liczbę akcji, maksymalny rozmiar i maksymalny odstęp czasu od ostatniego wykonania masowego. W razie potrzeby zrobi to automatycznie automatycznie. Możesz także ustawić maksymalną liczbę równoczesnych żądań zbiorczych.

Po utworzeniu BulkProcessor takiego:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { 
    @Override 
    public void beforeBulk(long executionId, BulkRequest request) { 
     logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
     logger.info("Executed bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
     logger.warn("Error executing bulk", failure); 
    } 
    }).setBulkActions(bulkSize).setConcurrentRequests(maxConcurrentBulk).build(); 

Po prostu trzeba dodać swoje wnioski do niego:

bulkProcessor.add(indexRequest); 

i zamknąć go na końcu, aby wypłukać wszelkie ewentualne wnioski, które mogą mieć jeszcze nie zostały wykonane:

bulkProcessor.close(); 

Aby w końcu odpowiedzieć na pytanie: e fajną rzeczą w przypadku BulkProcessor jest również to, że ma rozsądne wartości domyślne: 5 MB rozmiaru, 1000 akcji, 1 równoczesne żądanie, brak czasu płukania (co może być przydatne do ustawienia).

0

Musisz zliczyć swój twórca zgłoszeń zbiorczych, gdy osiągnie limit wielkości partii, a następnie zindeksuj je i wypłucz starsze wersje zbiorcze. tutaj jest przykład kodu

Settings settings = ImmutableSettings.settingsBuilder() 
    .put("cluster.name", "MyClusterName").build(); 

TransportClient client = new TransportClient(settings); 
String hostname = "myhost ip"; 
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port)); 

BulkRequestBuilder bulkBuilder = client.prepareBulk(); 
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path")))); 
long bulkBuilderLength = 0; 
String readLine = ""; 
String index = "my_index_name"; 
String type = "my_type_name"; 
String id = ""; 

while((readLine = br.readLine()) != null){ 
    id = somefunction(readLine); 
    String json = new ObjectMapper().writeValueAsString(readLine); 
    bulkBuilder.add(client.prepareIndex(index, type, id).setSource(json)); 
    bulkBuilderLength++; 
    if(bulkBuilderLength % 1000== 0){ 
     logger.info("##### " + bulkBuilderLength + " data indexed."); 
     BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
     if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
     } 
     bulkBuilder = client.prepareBulk(); 
    } 
} 

br.close(); 

if(bulkBuilder.numberOfActions() > 0){ 
    logger.info("##### " + bulkBuilderLength + " data indexed."); 
    BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
    if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
    } 
    bulkBuilder = client.prepareBulk(); 
} 

nadzieję, że to pomaga dzięki

Powiązane problemy