2012-02-16 12 views
13

Próbuję zbudować moje indeksy w Lucene z wieloma wątkami. Zacząłem więc kodowanie i napisałem następujący kod. Najpierw znajduję pliki i dla każdego pliku tworzę wątek, aby go zaindeksować. Potem dołączam do wątków i optymalizuję indeksy. Działa, ale nie jestem pewien ... czy mogę mu zaufać na dużą skalę? Czy jest jakiś sposób, aby to poprawić?Poprawy indeksowania wielu wątków z lucene

import java.io.File; 
import java.io.FileFilter; 
import java.io.FileReader; 
import java.io.IOException; 
import java.io.File; 
import java.io.FileReader; 
import java.io.BufferedReader; 
import org.apache.lucene.index.IndexWriter; 
import org.apache.lucene.document.Field; 
import org.apache.lucene.document.Document; 
import org.apache.lucene.store.RAMDirectory; 
import org.apache.lucene.analysis.standard.StandardAnalyzer; 
import org.apache.lucene.analysis.StopAnalyzer; 
import org.apache.lucene.index.IndexReader; 
import org.apache.lucene.store.Directory; 
import org.apache.lucene.store.FSDirectory; 
import org.apache.lucene.util.Version; 
import org.apache.lucene.index.TermFreqVector; 

public class mIndexer extends Thread { 

    private File ifile; 
    private static IndexWriter writer; 

    public mIndexer(File f) { 
    ifile = f.getAbsoluteFile(); 
    } 

    public static void main(String args[]) throws Exception { 
    System.out.println("here..."); 

    String indexDir; 
     String dataDir; 
    if (args.length != 2) { 
     dataDir = new String("/home/omid/Ranking/docs/"); 
     indexDir = new String("/home/omid/Ranking/indexes/"); 
    } 
    else { 
     dataDir = args[0]; 
     indexDir = args[1]; 
    } 

    long start = System.currentTimeMillis(); 

    Directory dir = FSDirectory.open(new File(indexDir)); 
    writer = new IndexWriter(dir, 
    new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")), 
    true, 
    IndexWriter.MaxFieldLength.UNLIMITED); 
    int numIndexed = 0; 
    try { 
     numIndexed = index(dataDir, new TextFilesFilter()); 
    } finally { 
     long end = System.currentTimeMillis(); 
     System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds"); 
     writer.optimize(); 
     System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds"); 
     writer.close(); 
    } 
    System.out.println("Enjoy your day/night"); 
    } 

    public static int index(String dataDir, FileFilter filter) throws Exception { 
    File[] dires = new File(dataDir).listFiles(); 
    for (File d: dires) { 
     if (d.isDirectory()) { 
     File[] files = new File(d.getAbsolutePath()).listFiles(); 
     for (File f: files) { 
      if (!f.isDirectory() && 
      !f.isHidden() && 
      f.exists() && 
      f.canRead() && 
      (filter == null || filter.accept(f))) { 
       Thread t = new mIndexer(f); 
       t.start(); 
       t.join(); 
      } 
     } 
     } 
    } 
    return writer.numDocs(); 
    } 

    private static class TextFilesFilter implements FileFilter { 
    public boolean accept(File path) { 
     return path.getName().toLowerCase().endsWith(".txt"); 
    } 
    } 

    protected Document getDocument() throws Exception { 
    Document doc = new Document(); 
    if (ifile.exists()) { 
     doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES)); 
     doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED)); 
     String cat = "WIR"; 
     cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1); 
     cat = cat.substring(cat.lastIndexOf('/')+1, cat.length()); 
     //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES)); 
     //System.out.println(cat.subSequence(0, cat.length())); 
    } 
    return doc; 
    } 

    public void run() { 
    try { 
     System.out.println("Indexing " + ifile.getAbsolutePath()); 
     Document doc = getDocument(); 
     writer.addDocument(doc); 
    } catch (Exception e) { 
     System.out.println(e.toString()); 
    } 

    } 
} 

Każde hep jest rozpatrywane.

Odpowiedz

13

Jeśli chcesz parallelize indeksowanie, są dwie rzeczy, które możesz zrobić:

  • parallelizing wywołania addDocument,
  • zwiększenie maksymalnej liczby wątków swojej seryjnej harmonogramu.

Jesteś na właściwej ścieżce, aby zrównoleglić połączenia do addDocuments, ale utworzenie jednego wątku na dokument nie będzie skalowane, ponieważ liczba dokumentów potrzebnych do indeksowania będzie rosnąć. Powinieneś raczej użyć stałego rozmiaru ThreadPoolExecutor. Ponieważ to zadanie wymaga głównie dużej mocy obliczeniowej (w zależności od analizatora i sposobu pobierania danych), ustawienie liczby procesorów komputera jako maksymalnej liczby wątków może być dobrym początkiem.

Jeśli chodzi o program do scalania, można zwiększyć maksymalną liczbę wątków, które mogą być używane z setMaxThreadCount method of ConcurrentMergeScheduler. Należy pamiętać, że dyski są znacznie lepsze w sekwencyjnych odczytach/zapisach niż losowe zapisywanie/zapisywanie, ponieważ ustawienie zbyt dużej maksymalnej liczby wątków w harmonogramie scalania raczej spowalnia indeksowanie w dół niż przyspieszenie.

Ale zanim spróbujesz zrównoleglić proces indeksowania, prawdopodobnie powinieneś spróbować znaleźć miejsce, w którym znajduje się wąskie gardło. Jeśli twój dysk jest zbyt wolny, wąskim gardłem będzie prawdopodobnie proces kasowania i scalania, w wyniku czego równoległe wywołania addDocument (które zasadniczo polegają na analizie dokumentu i buforowaniu wyniku analizy w pamięci) nie poprawią szybkości indeksowania w ogóle.

Kilka uwag niepożądane:

  • Istnieją pewne trwają prace w wersji rozwojowej Lucene w celu poprawy równoległość indeksowania (część płukania zwłaszcza ta blog entry wyjaśnia jak to działa).

  • Lucene ma fajną stronę wiki pod numerem how to improve indexing speed, w której znajdziesz inne sposoby na poprawienie szybkości indeksowania.

+0

Naprawdę doceniam twoje pomoc. Twój komentarz na temat liczby wątków był naprawdę użyteczny. Nie wspominałem o tym wcześniej ... – orezvani

5

Myślę, że bardziej nowoczesny sposób to zrobić, należy użyć ThreadPoolExecutor i przesłać Runnable, który wykonuje indeksowanie. Możesz poczekać, aż wszystkie wątki zakończą się przy użyciu .awaitTermination lub CountdownLatch.

Nie jestem wielkim fanem, że twoja główna klasa rozszerza wątek, po prostu stwórz możliwą do uruchomienia klasę wewnętrzną, która przenosi jego depenacje do konstruktora. Dzięki temu Twój kod staje się bardziej czytelny, ponieważ praca wykonywana przez wątki jest wyraźnie oddzielona od kodu konfiguracji aplikacji.

Kilka uwag na temat stylu, nie jestem wielkim fanem tego, że twoja główna klasa rzuca wyjątek, zazwyczaj oznacza to, że nie masz jasnego pojęcia o różnych sprawdzonych przypadkach wyjątku, których kod, którego używasz, może rzucać . Zwykle nie należy robić tego, jeśli nie masz konkretnego powodu.

+0

Z góry dziękuję. Właściwie zaimplementowałem Runnable, co było fajnym pomysłem i wykorzystałem ThreadPoolExecutor, który rozwiązał prawdziwy błąd w programie wspomnianym przez jpountz. – orezvani

+0

Wadą 'awaitTermination' jest to, że nie czeka na zakończenie wszystkich wątków, ale zakończy po n jednostkach czasu. :-(Pętla jest niezbędna –

+0

zgadzam się z tym, to się okaże, że IndexWriter nie zamyka się poprawnie, a writer_lock nadal będzie istniał nawet w indeksie Katalog nie jest manipulowany przez autora indeksu – JasonHuang