The bottom of this article opisuje, jak użycie GetOrAdd może spowodować (jeśli rozumiem to poprawnie) uszkodzenie/nieoczekiwane wyniki.Unikanie starych (logicznie uszkodzonych) danych podczas korzystania z "ConcurrentDictionary.GetOrAdd()", kod Repro dołączony
ciach/
ConcurrentDictionary przeznaczony jest dla wielowątkowych scenariuszy. Nie musisz używać blokad w swoim kodzie, aby dodawać lub usuwać elementy z kolekcji. Zawsze jest jednak możliwe, że jeden wątek pobiera wartość, a inny wątek natychmiast aktualizuje kolekcję o , nadając temu samemu kluczowi nową wartość.
Ponadto, mimo że wszystkie metody ConcurrentDictionary są bezpieczne dla wątków, nie wszystkie metody są atomowe, konkretnie GetOrAdd i AddOrUpdate. Użytkownik przekazany do tych metod to wywoływany poza wewnętrzną blokadą słownika. (Ma to na celu zapobiec nieznany kod blokuje wszystkie wątki). Dlatego jest możliwość ta sekwencja zdarzeń występują:
1) threadA wzywa GetOrAdd, nie znajduje żadnego elementu i tworzy nowy element do dodania przez wywoływanie delegata valueFactory.
2) threadB wzywa GetOrAdd jednocześnie jego delegat valueFactory jest powoływać i przybywa na zamek wewnętrznego przed threadA, a więc jego nową parę klucz-wartość jest dodawana do słownika.
3) przenosić użytkownik threadA zakończy się, a nić przybywa na zamek , ale teraz widzi, że element już istnieje
4) threadA wykonuje „Get” i zwraca dane, które wcześniej dodanej przez threadB.
Dlatego nie można zagwarantować, że dane zwrócone przez GetOrAdd są tymi samymi danymi, które zostały utworzone przez wartość wątku: wartośćFactory. Podobna sekwencja zdarzeń może wystąpić po wywołaniu AddOrUpdate .
Pytanie
Co to jest poprawny sposób zweryfikować dane i ponowić aktualizację? Ładne podejście byłoby metodą rozszerzenia, aby spróbować/ponowić operację na podstawie zawartości starej wartości.
W jaki sposób zostanie to wdrożone? Czy mogę polegać na wyniku (verify
) jako poprawnym stanie-końcowym, czy też muszę ponowić i ponownie pobrać wartości, używając innej metody?
Kod
Poniższy kod ma sytuacji wyścigu podczas aktualizacji wartości. Pożądanym zachowaniem jest to, że AddOrUpdateWithoutRetrieving() będzie zwiększać różne wartości na różne sposoby (przy użyciu ++
lub Interlocked.Increment()
).
Chcę również wykonać wiele operacji w terenie w jednym urządzeniu i ponowić aktualizację, jeśli poprzednia aktualizacja nie "wzięła" z powodu stanu wyścigu.
Uruchom kod, a zobaczysz, że każda wartość pojawia się w konsoli zaczyna się zwiększając o jeden, ale każda z wartości będzie dryfować, a niektóre będą kilka kolejnych iteracji.
namespace DictionaryHowTo
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
// The type of the Value to store in the dictionary:
class FilterConcurrentDuplicate
{
// Create a new concurrent dictionary.
readonly ConcurrentDictionary<int, TestData> eventLogCache =
new ConcurrentDictionary<int, TestData>();
static void Main()
{
FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();
c.DoRace(null);
}
readonly ConcurrentDictionary<int, TestData> concurrentCache =
new ConcurrentDictionary<int, TestData>();
void DoRace(string[] args)
{
int max = 1000;
// Add some key/value pairs from multiple threads.
Task[] tasks = new Task[3];
tasks[0] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 500);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[1] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[2] = Task.Factory.StartNew(() =>
{
AddOrUpdateWithoutRetrieving();
});
// Output results so far.
Task.WaitAll(tasks);
AddOrUpdateWithoutRetrieving();
Console.WriteLine("Press any key.");
Console.ReadKey();
}
public class TestData : IEqualityComparer<TestData>
{
public string aStr1 { get; set; }
public Guid? aGud1 { get; set; }
public string aStr2 { get; set; }
public int aInt1 { get; set; }
public long? aLong1 { get; set; }
public DateTime aDate1 { get; set; }
public DateTime? aDate2 { get; set; }
//public int QueryCount { get; set; }
public int QueryCount = 0;//
public string zData { get; set; }
public bool Equals(TestData x, TestData y)
{
return x.aStr1 == y.aStr1 &&
x.aStr2 == y.aStr2 &&
x.aGud1 == y.aGud1 &&
x.aStr2 == y.aStr2 &&
x.aInt1 == y.aInt1 &&
x.aLong1 == y.aLong1 &&
x.aDate1 == y.aDate1 &&
x.QueryCount == y.QueryCount ;
}
public int GetHashCode(TestData obj)
{
TestData ci = (TestData)obj;
// http://stackoverflow.com/a/263416/328397
return
new {
A = ci.aStr1,
Aa = ci.aStr2,
B = ci.aGud1,
C = ci.aStr2,
D = ci.aInt1,
E = ci.aLong1,
F = ci.QueryCount ,
G = ci.aDate1}.GetHashCode();
}
}
private void AddOrUpdateWithoutRetrieving()
{
// Sometime later. We receive new data from some source.
TestData ci = new TestData()
{
aStr1 = "Austin",
aGud1 = new Guid(),
aStr2 = "System",
aLong1 = 100,
aInt1 = 1000,
QueryCount = 0,
aDate1 = DateTime.MinValue
};
TestData verify = concurrentCache.AddOrUpdate(123, ci,
(key, existingVal) =>
{
existingVal.aStr2 = "test1" + existingVal.QueryCount;
existingVal.aDate1 = DateTime.MinValue;
Console.WriteLine
("Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count A:" + existingVal.QueryCount);
Interlocked.Increment(ref existingVal.QueryCount);
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
existingVal.aInt1++;
existingVal.aDate1 =
existingVal.aDate1.AddSeconds
(existingVal.aInt1);
Console.WriteLine(
"Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count B:" + existingVal.QueryCount);
return existingVal;
});
// After each run, every value here should be ++ the previous value
Console.WriteLine(
"Thread:"+Thread.CurrentThread.ManagedThreadId +
": Query Count returned:" + verify.QueryCount +
" eid:" + verify.aInt1 + " date:" +
verify.aDate1.Hour + " " + verify.aDate1.Second +
" NAME:" + verify.aStr2
);
}
}
}
Wyjście
Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System
Thread:12 Query Count A:0
Thread:13 Query Count A:1
Thread:12 Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11
Thread:12 Query Count A:2
Thread:13 Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12
Thread:13 Query Count A:3
Thread:11 Query Count A:4
Thread:11 Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14
Thread:11 Query Count A:5
Thread:13 Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15
....
Thread:11 Query Count A:658
Thread:11 Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658
Thread:11 Query Count A:659
Thread:11 Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659
Thread:11 Query Count A:660
Thread:11 Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660
Thread:11 Query Count A:661
Thread:11 Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661
W tym kodem "eid" powinna być zawsze liczyć ponad 1000 zapytań, ale w ciągu iteracji różnica waha od 1 do 7 między tymi dwoma. Ta niespójność może spowodować awarię niektórych aplikacji lub zgłosić nieprawidłowe dane.
'ConcurrentDictionary' jest bezpieczny tylko dla wątków w odniesieniu do własnych niezmienników. to znaczy, że nie spowoduje uszkodzenia własnych danych. Jeśli masz inne niezmienniki, to nie możesz o nich wiedzieć wcześniej ani oczekiwać, że zrekompensujesz je. Musisz spójnie zdefiniować, czym jest twój niezmiennik i chronić go jako transakcję z jakąś synchronizacją wątków. –