2010-02-22 24 views
17

Mam wątek Java tak:Wartości zwracane z Java Threads

public class MyThread extends Thread { 
     MyService service; 
     String id; 
     public MyThread(String id) { 
      this.id = node; 
     } 
     public void run() { 
      User user = service.getUser(id) 
     } 
    } 

Mam około 300 identyfikatorów, a co kilka sekund - ja odpalić wątki do nawiązania połączenia dla każdego z id. Na przykład.

for(String id: ids) { 
    MyThread thread = new MyThread(id); 
    thread.start(); 
} 

Teraz chciałbym zebrać wyniki z poszczególnych wątków i zrobić wkładkę wsadowy do bazy danych, zamiast dokonywania 300 bazie wstawia co 2 sekundy.

Każdy pomysł, jak mogę to osiągnąć?

Odpowiedz

17

Jeśli chcesz zebrać wszystkie wyniki przed wykonaniem aktualizacji bazy danych, można użyć metody invokeAll. To zajmuje się księgowością, która byłaby wymagana, jeśli przesyłasz zadania pojedynczo, na przykład sugeruje daveb.

private static final ExecutorService workers = Executors.newCachedThreadPool(); 

... 

Collection<Callable<User>> tasks = new ArrayList<Callable<User>>(); 
for (final String id : ids) { 
    tasks.add(new Callable<User>() 
    { 

    public User call() 
     throws Exception 
    { 
     return svc.getUser(id); 
    } 

    }); 
} 
/* invokeAll blocks until all service requests complete, 
* or a max of 10 seconds. */ 
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS); 
for (Future<User> f : results) { 
    User user = f.get(); 
    /* Add user to batch update. */ 
    ... 
} 
/* Commit batch. */ 
... 
+0

W moim przypadku niektóre zgłoszenia serwisowe mogą nigdy nie wrócić lub zbyt długo czekać na ich zwrot. Tak więc "invokeAll" wraz z "waitAnditation (long timeout)" wygląda jak droga. Dlatego akceptuję tę odpowiedź. Chciałbym też móc przyjąć odpowiedź @ daveba. – Langali

+0

W takim przypadku można użyć przeciążonej wersji 'invokeAll', która pobiera parametr limitu czasu. Zaktualizuję moją odpowiedź, aby pokazać, jak. – erickson

+0

Dzięki. Jakoś to przeoczyłem. – Langali

34

Podejście kanoniczne polega na użyciu Callable i ExecutorService. submit ting a Callable na ExecutorService zwraca a (typesafe) Future, z którego można uzyskać wynik w postaci get.

class TaskAsCallable implements Callable<Result> { 
    @Override 
    public Result call() { 
     return a new Result() // this is where the work is done. 
    } 
} 

ExecutorService executor = Executors.newFixedThreadPool(300); 
Future<Result> task = executor.submit(new TaskAsCallable()); 
Result result = task.get(); // this blocks until result is ready 

W twoim przypadku, to prawdopodobnie chcesz użyć invokeAll która zwraca List z Futures lub utworzyć tę listę siebie w miarę dodawania zadań wykonawcy. Aby zebrać wyniki, po prostu zadzwoń pod numer get na każdym z nich.

+0

dobra wiadomość, +1 dla szczegółu. – fastcodejava

1

Musisz zapisać wynik w coś podobnego do singleton. To musi być odpowiednio zsynchronizowane.

EDYCJA : Wiem, że to nie najlepsza rada, ponieważ nie jest dobrym pomysłem poradzić sobie z surowym Threads. Ale biorąc pod uwagę pytanie, że to zadziała, prawda? Mogę nie zostać przegłosowanym, ale po co głosować?

1

Można utworzyć kolejkę lub listę, które można przekazać do utworzonych wątków, wątki dodać wynik do listy, która zostanie opróżniona przez konsumenta, który wykonuje wsadowy wsad.

1

Najprostszym podejściem jest przekazanie obiektu do każdego wątku (jeden obiekt na wątek), który będzie zawierał wynik później. Główny wątek powinien zachować odniesienie do każdego obiektu wynikowego. Po połączeniu wszystkich wątków możesz użyć wyników.

1
public class TopClass { 
    List<User> users = new ArrayList<User>(); 
    void addUser(User user) { 
     synchronized(users) { 
      users.add(user); 
     } 
    } 
    void store() throws SQLException { 
     //storing code goes here 
    } 
    class MyThread extends Thread { 
      MyService service; 
      String id; 
      public MyThread(String id) { 
       this.id = node; 
      } 
      public void run() { 
       User user = service.getUser(id) 
       addUser(user); 
      } 
     } 
} 
1

Można utworzyć klasę, która rozszerza obserwowalne. Wtedy twój wątek może wywołać metodę w klasie Observable, która powiadomi wszystkie klasy zarejestrowane w tym obserwatorze, wywołując Observable.notifyObservers (Object).

Klasa obserwująca zaimplementuje Obserwatora i zarejestruje się w Obserwowalnym. Następnie należy zaimplementować metodę aktualizacji (Observable, Object), która zostanie wywołana, gdy wywołane zostanie Observerable.notifyObservers (Object).

4

Zapisz wynik w swoim obiekcie. Po jej zakończeniu upuść się w zsynchronizowaną kolekcję (synchronizowana kolejka przychodzi na myśl).

Gdy chcesz zebrać swoje wyniki do przesłania, chwyć wszystko z kolejki i odczytaj wyniki z obiektów.Każdy obiekt może nawet wiedzieć, jak "opublikować" własne wyniki w bazie danych, w ten sposób można przesłać różne klasy i wszystkie obsługiwane z tą samą małą, elegancką pętlą.

W JDK jest wiele narzędzi do pomocy, ale jest to naprawdę łatwe, gdy zaczniesz myśleć o swoim wątku jako prawdziwym obiekcie, a nie tylko o bzdurach wokół metody "uruchomienia". Kiedy zaczniesz myśleć o obiektach w ten sposób programowanie staje się znacznie prostsze i bardziej satysfakcjonujące.

+0

+1. Mój wstępny pomysł był podobny, ale Java 5 sprawiła, że ​​stało się to o wiele prostsze! – Langali

+0

+1. To jest piękne. –

+0

Zrobiłem to dla systemu, który pingował zakres adresów klasy B. To było bardzo eleganckie i skróciło czas z kilku godzin do kilku minut. (Cóż, zmieniono także z Ping na SNMP get, Java naprawdę nie obsługuje Ping) –

2

W języku Java 8 jest lepszy sposób na wykonanie tego przy użyciu CompletableFuture. Powiedzieć, że mamy klasę uzyskać identyfikator z bazy danych, dla uproszczenia możemy po prostu wrócić numer jak poniżej,

static class GenerateNumber implements Supplier<Integer>{ 

    private final int number; 

    GenerateNumber(int number){ 
     this.number = number; 
    } 
    @Override 
    public Integer get() { 
     try { 
      TimeUnit.SECONDS.sleep(1); 
     }catch (InterruptedException e){ 
      e.printStackTrace(); 
     } 
     return this.number; 
    } 
} 

Teraz możemy dodać wynik do równoczesnego odbioru raz wyniki każdego przyszłości jest gotowy.

Collection<Integer> results = new ConcurrentLinkedQueue<>(); 
int tasks = 10; 
CompletableFuture<?>[] allFutures = new CompletableFuture[tasks]; 
for (int i = 0; i < tasks; i++) { 
    int temp = i; 
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()-> new GenerateNumber(temp).get(), executor); 
    allFutures[i] = future.thenAccept(results::add); 
} 

Teraz możemy dodać oddzwanianie, gdy wszystkie kontrakty są gotowe,

CompletableFuture.allOf(allFutures).thenAccept(c->{ 
    System.out.println(results); // do something with result 
}); 
Powiązane problemy