2012-04-23 21 views
15

Mam USECASE gdzie mampołączyć putIfAbsent i zastąpić ConcurrentMap

  • wstawić nową wartość, jeśli klucz nie istnieje w ConcurrentHashMap
  • zastąpić starą wartość na nową wartość, jeśli klucz istnieje już w ConcurrentHashMap, gdzie nowa wartość pochodzi od starej wartości (nie kosztowna operacja)

mam następujący kod do zaoferowania:

public void insertOrReplace(String key, String value) { 
     boolean updated = false; 
     do { 
      String oldValue = concurrentMap.get(key); 
      if (oldValue == null) { 
       oldValue = concurrentMap.putIfAbsent(key, value); 
       if (oldValue == null) { 
        updated = true; 
       } 
      } 
      if (oldValue != null) { 
       final String newValue = recalculateNewValue(oldValue, value); 
       updated = concurrentMap.replace(key, oldValue, newValue); 
      } 
     } while (!updated); 
    } 

Czy uważasz, że jest poprawny i bezpieczny dla wątków?

Czy istnieje prostszy sposób?

Odpowiedz

6

Możesz zrobić to trochę krócej z poniższym kodem, który jest równoważny twojemu. Testowałem go trochę stresując się, gdy tysiące wątków łączy się z nim jednocześnie: działa zgodnie z oczekiwaniami, wykonując szereg prób (pętli) (oczywiście, nigdy nie można udowodnić poprawności przy testowaniu w współbieżnym świecie).

+0

Rozumiem, dziękuję. To trochę bardziej kompaktowe. – Holger

+0

Co powiesz na pominięcie 'String's i umieszczenie wszystkiego bezpośrednio w' if'? – elect

+1

Jeśli inny wątek zmieni wartość między putIfAbsent i zastąpi cię niespójną wartością. – Uberto

2

Twoja metoda wydaje się bezpieczna dla wątków. Jeśli nie potrzebujesz korzyści płynących z ConcurrentHashMap, zastanów się zamiast tego używając zwykłego HashMap i zsynchronizuj cały dostęp do niego. Twoja metoda jest podobna do AtomicInteger.getAndSet (int), więc powinno być dobrze. Wątpię, czy jest łatwiejszy sposób na zrobienie tego, chyba że szukasz biblioteki wzywającej do wykonania pracy za ciebie.

+0

Chodziło o to, aby uniknąć blokowania/synchronizacji, ponieważ dostęp do niego będzie wydajność krytycznych. – Holger

2

Nie sądzę, że to prawda. Jak rozumiem, metoda merge() byłaby odpowiednim narzędziem do pracy. Obecnie mam ten sam problem i napisałem mały test, aby zobaczyć wyniki.

Ten test rozpoczyna się od 100 pracowników. Każda z nich zwiększa wartość na mapie 100 razy. Tak więc oczekiwany wynik byłby 10000.

Istnieją dwa rodzaje pracowników. Jeden, który używa algorytmu zamieniającego i na tym używa scalania. Test jest uruchamiany dwa razy przy różnych implementacjach.

import java.util.concurrent.ArrayBlockingQueue;                  
import java.util.concurrent.ConcurrentHashMap;                  
import java.util.concurrent.ConcurrentMap;                   
import java.util.concurrent.ExecutorService;                   
import java.util.concurrent.ThreadPoolExecutor;                  
import java.util.concurrent.TimeUnit;                    

public class ConcurrentMapTest                      
{                             

    private static ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();         

    private final class ReplaceWorker implements Runnable                
    {                             
     public void run()                        
     {                            
     for(int i = 0; i<100; i++)                     
     {                           
      Integer putIfAbsent = map.putIfAbsent("key", Integer.valueOf(1));          
      if(putIfAbsent == null)                     
       return;                        
      map.replace("key", putIfAbsent + 1);                  
     }                           
     }                            
    }                             

    private final class MergeWorker implements Runnable                
    {                             
     public void run()                        
     {                            
     for(int i = 0; i<100; i++)                     
     {                           
      map.merge("key", Integer.valueOf(1), (ov, nv) -> {              
       return ov + 1;                      
      });                          
     }                           
     }                            
    }                             

    public MergeWorker newMergeWorker()                    
    {                             
     return new MergeWorker();                      
    }                             

    public ReplaceWorker newReplaceWorker()                   
    {                             
     return new ReplaceWorker();                     
    }                             

    public static void main(String[] args)                   
    {                             
     map.put("key", 1);                        
     ConcurrentMapTest test = new ConcurrentMapTest();                
     ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQu 
     for(int i = 0; i<100; i++)                      
     {                            
     threadPool.submit(test.newMergeWorker());                 
     }                            
     awaitTermination(threadPool);                     
     System.out.println(test.map.get("key"));                  

     map.put("key", 1);                        
     threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000));  
     for(int i = 0; i<100; i++)                      
     {                            
     threadPool.submit(test.newReplaceWorker());                 
     }                            
     awaitTermination(threadPool);                     
     System.out.println(test.map.get("key"));                  
    }                             

    private static void awaitTermination(ExecutorService threadPool)             
    {                             
     try                           
     {                            
     threadPool.shutdown();                      
     boolean awaitTermination = threadPool.awaitTermination(1, TimeUnit.SECONDS);        
     System.out.println("terminted successfull: " + awaitTermination);           
     }                            
     catch (InterruptedException e)                     
     {                            
     // TODO Auto-generated catch block                   
     e.printStackTrace();                      
     }                            
    }                             
}                       
 
result: 
terminted successfull: true 
10000 
terminted successfull: true 
1743 

Problemem jest to, że istnieje luka pomiędzy GET i umieścić w swoim przypadku, więc przy jednoczesnym accsess do wyników map uzyskać nadpisane. Przy scaleniu jest to operacja atomowa, chociaż dokumentacja nie mówi nic na ten temat.

+2

Może warto zauważyć, że scalanie wymaga Java 8. – assylias

+0

To jedyne poprawne rozwiązanie. Przed wersją Javy 8 musisz założyć własne blokady – Uberto

1

Możesz użyć MutableMapIterable.updateValueWith(K key, Function0<? extends V> factory, Function2<? super V,? super P,? extends V> function, P parameter) z Eclipse Collections.

Argument factory tworzy wartość początkową, jeśli żadna nie jest na mapie. Argument function jest stosowany do wartości mapy wraz z dodatkowym parametrem, aby wymyślić nową wartość mapy. Ten parameter jest przekazywany jako ostatni argument do updateValueWith(). Funkcja jest wywoływana nawet w przypadku, gdy klucza nie ma na mapie. Tak więc początkowa wartość to naprawdę function zastosowana do wyjścia factory i parameter. The function nie może mutować wartości; powinien zwrócić nową wartość. W twoim przykładzie wartościami mapy są ciągi, które są niezmienne, więc nic nam nie jest.

W ConcurrentMaps takich jak org.eclipse.collections.impl.map.mutable.ConcurrentHashMap, implementacja updateValueWith() jest również wątkowa i atomowa. Ważne jest, aby function nie zmutował wartości mapy lub nie byłby bezpieczny dla wątków. Zamiast tego powinien zwrócić nowe wartości. W twoim przykładzie wartościami mapy są ciągi, które są niezmienne, więc nic nam nie jest.

Jeśli twoja metoda recalculateNewValue() po prostu wykonuje konkatenację ciągów, oto jak możesz użyć updateValueWith().

Function0<String> factory =() -> "initial "; 
Function2<String, String, String> recalculateNewValue = String::concat; 

MutableMap<String, String> map = new ConcurrentHashMap<>(); 
map.updateValueWith("test", factory, recalculateNewValue, "append1 "); 
Assert.assertEquals("initial append1 ", map.get("test")); 
map.updateValueWith("test", factory, recalculateNewValue, "append2"); 
Assert.assertEquals("initial append1 append2", map.get("test")); 

Można użyć Java 8 na ConcurrentMap.compute(K key, BiFunction remappingFunction) aby osiągnąć to samo, ale ma kilka wad.

ConcurrentMap<String, String> map = new ConcurrentHashMap<>(); 
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append1 "); 
Assert.assertEquals("initial append1 ", map.get("test")); 
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append2"); 
Assert.assertEquals("initial append1 append2", map.get("test")); 
  • Nie ma oddzielne fabryki obsłużyć przypadku kluczy nieobecnych więc ciało lambda ma do czynienia z wartościami i wartości początkowych.
  • Interfejs API nie umożliwia ponownego używania lambd. Każde połączenie z updateValueWith() ma te same lambdy, ale każde połączenie z compute() tworzy nowe śmieci na stercie.

Uwaga: Jestem committer dla Eclipse Kolekcje

Powiązane problemy