2016-08-31 6 views
6

Użyłem parallelStream, aby uzyskać najdłuższy ciąg w tablicy, kod jest następujący, za każdym razem, gdy działa, otrzymuję inny wynik. AtomicReference zakłada, że ​​jest bezpieczny dla wątków, nawet jeśli jest używany w środowisku parallelStream? Ale dlaczego tak się dzieje?Czy wątek AtomicReference java jest bezpieczny podczas używania w ramach parallelStream?

public static void main(String[] args) { 
    AtomicReference<String> longest = new AtomicReference<>(); 
    LongAccumulator accumulator = new LongAccumulator(Math::max, 0); 
    List<String> words = Arrays.asList("him", "he", "thanks", "strings", "congratulations", "platform"); 
    words.parallelStream().forEach(
      next -> longest.updateAndGet(
        current -> { 
         String result = next.length() > accumulator.intValue() ? next : current; 
         accumulator.accumulate(next.length()); 
         return result; 
        } 
       ) 
); 
    System.out.println(longest.get()); 
} 

po raz pierwszy wydrukowano "gratulacje", a czasem wydrukowano "platformę".

+3

Czy jest możliwość, aby użyć 'words.parallelStream() max (Comparator.comparingInt (String :: długość));' zamiast..? Próbowałem tego i zawsze zwracałem to samo słowo. Jednak nie jestem w 100% pewien, czy zostanie wykonany równolegle. – Clayn

+1

@Clayn: to jest problem ze współbieżnością - nigdy nie możesz być tego pewien. Ale mogę was zapewnić, że wasz wariant jest poprawny. – Holger

+1

@ Dziękujemy dzięki za twierdzenie. Pierwszą rzeczą, która przyszła mi do głowy, było: jestem pewien, że można to zrobić bardziej elegancko, używając Stream API – Clayn

Odpowiedz

8

Jesteś powołując LongAccumulator.intValue() który jest udokumentowany jako:

Zwraca current value jako int po zwężającej prymitywnego nawrócenia.

i po Linke do metody get() będziemy się:

Zwraca bieżącą wartość. Zwrócona wartość to NOT snapshot atomowy; wywołanie w przypadku braku równoczesnych aktualizacji zwraca dokładny wynik, ale współbieżne aktualizacje występujące podczas obliczania wartości mogą nie zostać włączone.

Więc gdy operacja AtomicReference.updateAndGet jest bezpieczne wątek, Twój jednoczesne wywołanie LongAccumulator.intValue() i LongAccumulator.accumulate nie jest. A LongAccumulator jest przeznaczony do wykonywania równoczesnych operacji accumulate, a następnie pobierania wyniku po zakończeniu wszystkich operacji akumulacji. Zauważ, że nawet jeśli get() zwrócił poprawną migawkę, fakt, że wywołanie intValue() i następujących po nim accumulate() są dwiema różnymi, a więc nieatomowymi operacjami, uczyniło operację wciąż podatną na wyścigi danych.

W większości przypadków, jeśli próbujesz manipulować strukturami danych w forEach, używasz niewłaściwego narzędzia do pracy, czyniąc kod niepotrzebnie skomplikowanym i podatnym na błędy. Jako Clayn hinted in a comment, words.parallelStream().max(Comparator.comparingInt(String::l‌​ength)) wykona pracę zwięźle i poprawnie.

3

Właściwie wspominałem o problemie, który napisał @Holger, byłem trochę spóźniony, ale piszę w każdym razie jako dowód odpowiedzi @ Holgera. Możesz użyć akumulatora AtomicReference;

Oto przykładowy kod:

public static void main(String[] args) 
    {    
      AtomicReference<String> longest = new AtomicReference<>(); 
      List<String> words = Arrays.asList("him", "he", "thanks", "strings", "congratulations", "platform"); 

      words.parallelStream().forEach(next -> {     
       longest.accumulateAndGet(next, (a,b) -> 
         a != null && a.length() > b.length() ? a : b 
        );     
      }); 

      System.out.println(longest.get()); 
    } 
Powiązane problemy