2013-02-11 15 views
11

Mam aplikację Java, która ma wątki robocze do przetwarzania zadań. Pracownik produkuje obiektu wynik, powiedzieć coś w stylu:Przechodzenie obiektów kolekcji z jednego wątku do drugiego w wątku Java

class WorkerResult{ 
    private final Set<ResultItems> items; 
    public Worker(Set<ResultItems> pItems){ 
     items = pItems; 
    } 
} 

Gdy pracownik kończy, to jednak ta operacja:

... 
final Set<ResultItems> items = new SomeNonThreadSafeSetImplSet<ResultItems>(); 
for(Item producedItem : ...){ 
     items.add(item); 
} 
passToGatherThread(items); 

Zestaw items jest rodzajem "unit-of-work" tutaj . Metoda passToGatherThread przekazuje zestaw items do wątku gromadzenia, którego tylko jeden istnieje w środowisku wykonawczym.

Synchronizacja nie jest tu potrzebna, ponieważ warunki wyścigu nie mogą wystąpić, ponieważ tylko jeden wątek (Gather-thread) odczytuje zestaw items. AFAICS, Gather-thread może nie widzieć wszystkich elementów, ponieważ zestaw nie jest bezpieczny dla wątków, prawda?

Załóżmy, że nie mogę zsynchronizować passToGatherThread, powiedzmy, ponieważ jest to biblioteka innej firmy. Zasadniczo obawiam się, że wątek "gather" nie widzi wszystkich przedmiotów z powodu buforowania, optymalizacji VM itp. Pojawia się więc pytanie: jak przekazać elementy ustawione w sposób bezpieczny dla wątków, tak, że wątek Zbierz "widzi" właściwy zestaw przedmiotów?

+0

Nie jestem pewien, ale może [PipedStreams] (http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html) może ci pomóc? – oliholz

+1

Jaka jest definicja "passToGatherthread"? Myślę, że jest to kluczowe dla zrozumienia, czy podane poniżej odpowiedzi są poprawne. W jaki dokładnie sposób 'items' przekazywane są do wątku gather? –

+0

Czy naprawdę masz problemy z widocznością obiektów lub czy tylko podejrzewasz takie zachowanie? Twoje obawy wydają się dość naciągane w tych okolicznościach. – Dariusz

Odpowiedz

1

myślałem o (i omawiane) to pytanie dużo i mam wymyślić inną odpowiedź, która, mam nadzieję, będzie najlepszym rozwiązaniem.

Przekazywanie zsynchronizowanej kolekcji nie jest dobre pod względem wydajności, ponieważ każda kolejna operacja na tej kolekcji będzie zsynchronizowana - w przypadku wielu operacji może to być handicapem.

Do punktu: zróbmy pewne założenia (które nie zgadzam się z):

  • wspomniany passToGatherThread metoda jest rzeczywiście niebezpieczne, jednak mało prawdopodobne wydaje
  • kompilator może zmienić kolejność zdarzeń w kodzie tak że passToGatherThread nazywa się przed zebraniem zapełnione

Najprostszym, najczystszych i możliwie najbardziej efektywny sposób, aby upewnić się, że zbiór przekazany do metody zbierackich jest gotowa i pełna jest t O umieścić push zbiórki w zsynchronizowany bloku tak:

synchronized(items) { 
    passToGatherThread(items); 
} 

ten sposób możemy zapewnić synchronizację pamięci i poprawną sekwencję zdarzyć, zanim przed zebraniem jest przekazywana, w ten sposób upewniając się, że wszystkie obiekty są przekazywane prawidłowo.

+1

Myślę, że masz rację i ostatecznie to właśnie napotkałem ten problem. Wygląda trochę brzydko, prawda? Synchronizacja na nowo utworzonym zestawie, który dla większości osób wygląda po prostu głupio ... ;-) Dziękuję za odpowiedź. – xSNRG

1

Wygląda na to, że nie ma tu problemu z synchronizacją. Tworzysz nowy obiekt Set dla każdego passToGatherThread i robisz to po modyfikacji zestawu. Żadne przedmioty nie zostaną utracone.

Zestaw (i większość kolekcji Java) może być dostępny jednocześnie z wieloma wątkami pod warunkiem, że nie dokonano żadnych modyfikacji kolekcji. Po to jest Collections.unmodifiableCollection.

Ponieważ wspomniana metoda passToGatherThread służy jako komunikacja z innym wątkiem, musi używać jakiejś synchronizacji - a każda synchronizacja zapewnia spójność pamięci między wątkami.

Również - proszę zauważyć, że wszystkie zapisy do obiektów w przekazywanych kolekcjach są przed przekazywane do innego wątku. Nawet jeśli pamięć jest kopiowana do lokalnej pamięci podręcznej wątku, ma taką samą niezmienioną wartość jak w drugim wątku.

+1

AFAIK JVM może cacheować dane, gdy tylko jest to możliwe, w rejestrach rdzenia procesora. Podczas zapisywania danych bez "przepłukiwania" (za pomocą zsynchronizowanych/ulotnych/itp.) Inne wątki mogą widzieć "zużyte" wartości lub nawet wcale, ponieważ semantyka "zdarzyła się przed" jest ważna tylko w wątku generującym dane. – xSNRG

+0

@xSNRG dodał akapit. Pod warunkiem, że metoda pass jest ważna, nie powinno się martwić. – Dariusz

+0

Ponadto - gdyby to nie zadziałało, piekło wielu aplikacji nie działałoby. – Dariusz

1

Można po prostu użyć jednej z bezpiecznych dla wątków implementacji Set, którą Java udostępnia dla Twojego WorkerResult. Patrz na przykład:

Inną opcją jest użycie Collections.synchronizedSet().

+0

Dlaczego jest to potrzebne? Co jest złego w korzystaniu z implementacji "Set" bez wątków? –

+0

Myślę, że PO obawia się buforowania. Wątki mogą buforować dane inne niż "niestabilne", a jeśli zapisy nie zostaną zsynchronizowane, nigdy nie zobaczą aktualizacji. – joergl

+1

Tak, Joergl ma absolutną rację. Zobacz mój komentarz poniżej odpowiedzi Dariusza Wawera. – xSNRG

0

Pracownik realizuje wymagalne i zwraca WorkerResult:

class Worker implements Callable<WorkerResult> { 
    private WorkerInput in; 

    public Worker(WorkerInput in) { 
     this.in = in; 
    } 

    public WorkerResult call() { 
     // do work here 
    } 
} 

Następnie używamy ExecutorService zarządzać puli wątków, i zbierać wyniki poprzez używając przyszłości.

public class PooledWorkerController { 

    private static final int MAX_THREAD_POOL = 3; 
    private final ExecutorService pool = 
     Executors.newFixedThreadPool(MAX_THREAD_POOL); 

    public Set<ResultItems> process(List<WorkerInput> inputs) 
      throws InterruptedException, ExecutionException{   
     List<Future<WorkerResult>> submitted = new ArrayList<>(); 
     for (WorkerInput in : inputs) { 
      Future<WorkerResult> future = pool.submit(new Worker(in)); 
      submitted.add(future); 
     } 
     Set<ResultItems> results = new HashSet<>(); 
     for (Future<WorkerResult> future : submitted) { 
      results.addAll(future.get().getItems()); 
     } 
     return results; 
    } 
} 
+0

Tworzysz wątek przy każdym uruchomieniu zadania. -1 za to. – Dariusz

+0

W swoim poście wprowadzono niewłaściwą praktykę tworzenia wątku za każdym razem, gdy wywoływana jest paczka zadań. Tworzenie wątków jest bardzo kosztowne i zawsze, gdy to możliwe, wątki powinny być tworzone raz i ponownie wykorzystane. Twój kod byłby IMO dużo lepszy, gdybyś miał jedną zainicjowaną statycznie pulę wątków, która byłaby używana w 'process'. Tworzenie nowej puli wątków dla każdego wywołania "process" nie zmienia się zbytnio. – Dariusz

Powiązane problemy