5

The bottom of this article opisuje, jak użycie GetOrAdd może spowodować (jeśli rozumiem to poprawnie) uszkodzenie/nieoczekiwane wyniki.Unikanie starych (logicznie uszkodzonych) danych podczas korzystania z "ConcurrentDictionary.GetOrAdd()", kod Repro dołączony

ciach/

ConcurrentDictionary przeznaczony jest dla wielowątkowych scenariuszy. Nie musisz używać blokad w swoim kodzie, aby dodawać lub usuwać elementy z kolekcji. Zawsze jest jednak możliwe, że jeden wątek pobiera wartość, a inny wątek natychmiast aktualizuje kolekcję o , nadając temu samemu kluczowi nową wartość.

Ponadto, mimo że wszystkie metody ConcurrentDictionary są bezpieczne dla wątków, nie wszystkie metody są atomowe, konkretnie GetOrAdd i AddOrUpdate. Użytkownik przekazany do tych metod to wywoływany poza wewnętrzną blokadą słownika. (Ma to na celu zapobiec nieznany kod blokuje wszystkie wątki). Dlatego jest możliwość ta sekwencja zdarzeń występują:

1) threadA wzywa GetOrAdd, nie znajduje żadnego elementu i tworzy nowy element do dodania przez wywoływanie delegata valueFactory.

2) threadB wzywa GetOrAdd jednocześnie jego delegat valueFactory jest powoływać i przybywa na zamek wewnętrznego przed threadA, a więc jego nową parę klucz-wartość jest dodawana do słownika.

3) przenosić użytkownik threadA zakończy się, a nić przybywa na zamek , ale teraz widzi, że element już istnieje

4) threadA wykonuje „Get” i zwraca dane, które wcześniej dodanej przez threadB.

Dlatego nie można zagwarantować, że dane zwrócone przez GetOrAdd są tymi samymi danymi, które zostały utworzone przez wartość wątku: wartośćFactory. Podobna sekwencja zdarzeń może wystąpić po wywołaniu AddOrUpdate .

Pytanie

Co to jest poprawny sposób zweryfikować dane i ponowić aktualizację? Ładne podejście byłoby metodą rozszerzenia, aby spróbować/ponowić operację na podstawie zawartości starej wartości.

W jaki sposób zostanie to wdrożone? Czy mogę polegać na wyniku (verify) jako poprawnym stanie-końcowym, czy też muszę ponowić i ponownie pobrać wartości, używając innej metody?

Kod

Poniższy kod ma sytuacji wyścigu podczas aktualizacji wartości. Pożądanym zachowaniem jest to, że AddOrUpdateWithoutRetrieving() będzie zwiększać różne wartości na różne sposoby (przy użyciu ++ lub Interlocked.Increment()).

Chcę również wykonać wiele operacji w terenie w jednym urządzeniu i ponowić aktualizację, jeśli poprzednia aktualizacja nie "wzięła" z powodu stanu wyścigu.

Uruchom kod, a zobaczysz, że każda wartość pojawia się w konsoli zaczyna się zwiększając o jeden, ale każda z wartości będzie dryfować, a niektóre będą kilka kolejnych iteracji.

namespace DictionaryHowTo 
{ 
    using System; 
    using System.Collections.Concurrent; 
    using System.Collections.Generic; 
    using System.Linq; 
    using System.Text; 
    using System.Threading; 
    using System.Threading.Tasks; 

    // The type of the Value to store in the dictionary: 
    class FilterConcurrentDuplicate 
    { 
     // Create a new concurrent dictionary. 
     readonly ConcurrentDictionary<int, TestData> eventLogCache = 
      new ConcurrentDictionary<int, TestData>(); 

     static void Main() 
     { 
      FilterConcurrentDuplicate c = new FilterConcurrentDuplicate(); 

      c.DoRace(null); 
     } 

     readonly ConcurrentDictionary<int, TestData> concurrentCache = 
      new ConcurrentDictionary<int, TestData>(); 
     void DoRace(string[] args) 
     { 
      int max = 1000; 

      // Add some key/value pairs from multiple threads. 
      Task[] tasks = new Task[3]; 

      tasks[0] = Task.Factory.StartNew(() => 
      { 

       System.Random RandNum = new System.Random(); 
       int MyRandomNumber = RandNum.Next(1, 500); 

       Thread.Sleep(MyRandomNumber); 
       AddOrUpdateWithoutRetrieving(); 

      }); 

      tasks[1] = Task.Factory.StartNew(() => 
      { 
       System.Random RandNum = new System.Random(); 
       int MyRandomNumber = RandNum.Next(1, 1000); 

       Thread.Sleep(MyRandomNumber); 

       AddOrUpdateWithoutRetrieving(); 

      }); 

      tasks[2] = Task.Factory.StartNew(() => 
      { 
       AddOrUpdateWithoutRetrieving(); 

      }); 
      // Output results so far. 
      Task.WaitAll(tasks); 

      AddOrUpdateWithoutRetrieving(); 

      Console.WriteLine("Press any key."); 
      Console.ReadKey(); 
     } 
     public class TestData : IEqualityComparer<TestData> 
     { 
      public string aStr1 { get; set; } 
      public Guid? aGud1 { get; set; } 
      public string aStr2 { get; set; } 
      public int aInt1 { get; set; } 
      public long? aLong1 { get; set; } 

      public DateTime aDate1 { get; set; } 
      public DateTime? aDate2 { get; set; } 

      //public int QueryCount { get; set; } 
      public int QueryCount = 0;// 

      public string zData { get; set; } 
      public bool Equals(TestData x, TestData y) 
      { 
       return x.aStr1 == y.aStr1 && 
        x.aStr2 == y.aStr2 && 
         x.aGud1 == y.aGud1 && 
         x.aStr2 == y.aStr2 && 
         x.aInt1 == y.aInt1 && 
         x.aLong1 == y.aLong1 && 
         x.aDate1 == y.aDate1 && 
         x.QueryCount == y.QueryCount ; 
      } 

      public int GetHashCode(TestData obj) 
      { 
       TestData ci = (TestData)obj; 
       // http://stackoverflow.com/a/263416/328397 
       return 
        new { 
         A = ci.aStr1, 
         Aa = ci.aStr2, 
         B = ci.aGud1, 
         C = ci.aStr2, 
         D = ci.aInt1, 
         E = ci.aLong1, 
         F = ci.QueryCount , 
         G = ci.aDate1}.GetHashCode(); 
      } 
     } 
     private void AddOrUpdateWithoutRetrieving() 
     { 
      // Sometime later. We receive new data from some source. 
      TestData ci = new TestData() 
      { 
       aStr1 = "Austin", 
       aGud1 = new Guid(), 
       aStr2 = "System", 
       aLong1 = 100, 
       aInt1 = 1000, 
       QueryCount = 0, 
       aDate1 = DateTime.MinValue 
      }; 

      TestData verify = concurrentCache.AddOrUpdate(123, ci, 
       (key, existingVal) => 
       { 
        existingVal.aStr2 = "test1" + existingVal.QueryCount; 
        existingVal.aDate1 = DateTime.MinValue; 
        Console.WriteLine 
        ("Thread:" + Thread.CurrentThread.ManagedThreadId + 
          " Query Count A:" + existingVal.QueryCount); 
        Interlocked.Increment(ref existingVal.QueryCount); 
        System.Random RandNum = new System.Random(); 
        int MyRandomNumber = RandNum.Next(1, 1000); 

        Thread.Sleep(MyRandomNumber); 
        existingVal.aInt1++; 
        existingVal.aDate1 = 
         existingVal.aDate1.AddSeconds 
         (existingVal.aInt1); 
        Console.WriteLine(
          "Thread:" + Thread.CurrentThread.ManagedThreadId + 
          " Query Count B:" + existingVal.QueryCount); 
        return existingVal; 
       }); 


      // After each run, every value here should be ++ the previous value 
      Console.WriteLine(
       "Thread:"+Thread.CurrentThread.ManagedThreadId + 
       ": Query Count returned:" + verify.QueryCount + 
       " eid:" + verify.aInt1 + " date:" + 
       verify.aDate1.Hour + " " + verify.aDate1.Second + 
       " NAME:" + verify.aStr2 
       ); 
     } 

    } 
} 

Wyjście

Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System 

Thread:12 Query Count A:0 
Thread:13 Query Count A:1 
Thread:12 Query Count B:2 
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11 

Thread:12 Query Count A:2 
Thread:13 Query Count B:3 
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12 

Thread:13 Query Count A:3 
Thread:11 Query Count A:4 
Thread:11 Query Count B:5 
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14 

Thread:11 Query Count A:5 
Thread:13 Query Count B:6 
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15 

....

Thread:11 Query Count A:658 
Thread:11 Query Count B:659 
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658 

Thread:11 Query Count A:659 
Thread:11 Query Count B:660 
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659 

Thread:11 Query Count A:660 
Thread:11 Query Count B:661 
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660 

Thread:11 Query Count A:661 
Thread:11 Query Count B:662 
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661 

W tym kodem "eid" powinna być zawsze liczyć ponad 1000 zapytań, ale w ciągu iteracji różnica waha od 1 do 7 między tymi dwoma. Ta niespójność może spowodować awarię niektórych aplikacji lub zgłosić nieprawidłowe dane.

+0

'ConcurrentDictionary' jest bezpieczny tylko dla wątków w odniesieniu do własnych niezmienników. to znaczy, że nie spowoduje uszkodzenia własnych danych. Jeśli masz inne niezmienniki, to nie możesz o nich wiedzieć wcześniej ani oczekiwać, że zrekompensujesz je. Musisz spójnie zdefiniować, czym jest twój niezmiennik i chronić go jako transakcję z jakąś synchronizacją wątków. –

Odpowiedz

4

Argumentacja ta opiera się na błędnym zrozumieniu uwagach na dole artykułu „Jak Dodawanie i usuwanie elementów z ConcurrentDictionaryhttp://msdn.microsoft.com/en-us/library/dd997369.aspx i na podstawowym błędem współbieżności - równoczesna modyfikacja non-atomowy obiekt współdzielony.

Najpierw wyjaśnijmy, co naprawdę łączy się z tym artykułem. Użyję AddOrUpdate jako przykładu, ale rozumowanie GetOrAdd jest równoważne.

Powiedz, że wywołujesz AddOrUpdate z kilku wątków i określasz ten sam klucz. Załóżmy, że wpis z tym kluczem już istnieje. Każdy wątek pojawi się, zauważ, że istnieje już wpis z określonym kluczem i że aktualizacja części AddOrUpdate jest istotna. W ten sposób żaden wątek nie zablokuje słownika. Zamiast tego użyje niektórych powiązanych ze sobą instrukcji, by sprawdzić atomowo, czy istnieje klucz dostępu, czy nie.

Nasze kilka wątków zauważyło, że klucz istnieje i trzeba wywołać funkcję updateValueFactory. Ten delegat jest przekazywany do AddOrUpdate; pobiera referencje do istniejącego klucza i wartości i zwraca wartość aktualizacji. Teraz wszystkie wątki będą wywoływać fabrykę jednocześnie. Wszystkie one zakończą się w nieznanej wcześniej kolejności, a każdy wątek spróbuje użyć operacji atomowej (używając instrukcji blokujących), aby zastąpić istniejącą wartość wartością, którą właśnie obliczył. Nie ma sposobu, aby wiedzieć, który wątek "wygra". Wątek, który wygrywa, będzie przechowywał obliczoną wartość. Inni zauważą, że wartość w słowniku nie jest już wartością, która została przekazana do ich właściwości updateValueFactory jako argument. W odpowiedzi na tę realizację, porzucą operację i wyrzucą właśnie wyliczoną wartość.Właśnie to chcesz osiągnąć.

Następnie pozwala wyjaśnić, dlaczego masz dziwne wartości podczas uruchamiania próbki kodu wymienionych tutaj:

Przypomnijmy, że delegat updateValueFactory przekazane AddOrUpdate zaczyna Referencje istniejący klucz i wartość i zwraca wartość aktualizacji. Próbka kodu w swojej metodzie AddOrUpdateWithoutRetrieving() rozpoczyna wykonywanie operacji bezpośrednio na tym odwołaniu. Zamiast tworzyć nową wartość zastępczą i modyfikować THAT, modyfikuje ona wartości członków instancji istniejącegoVal - obiektu, który jest już w słowniku - i po prostu zwraca to odwołanie. I robi to nie atomowo - czyta niektóre wartości, aktualizuje niektóre wartości, czyta więcej, aktualizacje więcej. Oczywiście, widzieliśmy powyżej, że dzieje się to jednocześnie na kilku wątkach - wszystkie modyfikują obiekt SAME. Nic dziwnego, że wynikiem jest to, że w tym samym czasie (gdy przykład kodu wywołuje WriteLine), obiekt zawiera wartości instancji elementu, które pochodzą z różnych wątków.

Słownik nie ma nic wspólnego z tym - kod po prostu modyfikuje obiekt, który jest współużytkowany między wątkami nieatomowymi. Jest to jeden z najczęstszych błędów współbieżności. Dwa najczęstsze obejścia zależą od scenariusza. Użyj blokady współdzielonej, aby dokonać całkowitej modyfikacji obiektu, lub najpierw rozprowadź atomowo cały obiekt, a następnie zmodyfikuj kopię lokalną.

Dla tych ostatnich, warto dodać to do klasy testdata:

private Object _copyLock = null; 

private Object GetLock() { 

    if (_copyLock != null) 
     return _copyLock; 

    Object newLock = new Object(); 
    Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null); 
    return (prevLock == null) ? newLock : prevLock; 
} 

public TestData Copy() { 

    lock (GetLock()) { 
     TestData copy = new TestData(); 
     copy.aStr1 = this.aStr1; 
     copy.aStr2 = this.aStr2; 
     copy.aLong1 = this.aLong1; 
     copy.aInt1 = this.aInt1; 
     copy.QueryCount = this.QueryCount; 
     copy.aDate1 = this.aDate1; 
     copy.aDate2 = this.aDate2; 
     copy.zData = this.zData; 

     return copy; 
    } 
} 

Następnie modyfikować fabrykę w następujący sposób:

TestData verify = concurrentCache.AddOrUpdate(123, ci, 
    (key, existingVal) => 
    { 
     TestData newVal = existingVal.Copy(); 
     newVal.aStr2 = "test1" + newVal.QueryCount; 
     newVal.aDate1 = DateTime.MinValue; 
     Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count A:" + newVal.QueryCount); 
     Interlocked.Increment(ref newVal.QueryCount); 
     System.Random RandNum = new System.Random(); 
     int MyRandomNumber = RandNum.Next(1, 1000); 

     Thread.Sleep(MyRandomNumber); 
     newVal.aInt1++; 
     newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1); 
     Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count B:" + newVal.QueryCount); 
     return newVal; 
    }); 

Mam nadzieję, że to pomaga.

3

Prawdopodobnie prawidłowym sposobem jest nie dbać o to, czy zwrócona wartość nie jest tą, która jest tworzona przez valueFactory. Jeśli nie jest to dopuszczalne, musisz użyć blokady.

+0

Zobacz załączony kod ... Myślę, że metoda rozszerzenia C# jest potrzebna, aby spróbować ... ponów próbę aktualizacji, aż stanie się spójna. To lub zawierają spinlock. Brak opieki nie jest odpowiedzią IMO – LamonteCristo

+0

Dopóki valueFactory nie ma skutków ubocznych (co jest najprawdopodobniej dobrym pomysłem), zazwyczaj nie powinno mieć to znaczenia. – erikkallen

2

Nie ma żadnej ogólnej ochrony, która zawsze działa. Ale wspólnym rozwiązaniem jest zwrócenie Lazy<T> zamiast T. W ten sposób tworzenie niepotrzebnych leniwych nie szkodzi, ponieważ nigdy się nie zacznie. Tylko jedna Lazy sprawi, że będzie to końcowa wartość odpowiadająca kluczowi. Tylko jedna konkretna instancja Lazy zostanie kiedykolwiek zwrócona.

+0

Czy wiesz, jak mogę to wdrożyć w metodzie rozszerzenia, która rozwiązuje problem? – LamonteCristo

+0

http://blogs.msdn.com/b/pfxteam/archive/2010/04/23/10001621.aspx – usr

1

Możesz użyć this implementation z GetOrAdd od samego człowieka. Należy pamiętać, że nawet tutaj fabryka może zostać wywołana bez dodania jej wyniku do słownika. Ale dostrzegłbyś, co się stało.

+0

Tak, funkcja rozszerzenia, która otacza GetOrAdd jest tym, czego szukam. Mam problemy z tworzeniem przeciążenia, które automatycznie ponawia próbę GetOrAdd, jeśli wystąpi uszkodzenie. – LamonteCristo

Powiązane problemy