2012-05-24 18 views
6

Próbuję zaimplementować ConcurrentDictionary, zawijając go w BlockingCollection, ale nie wydaje się być skuteczne.Jak zawinąć ConcurrentDictionary w BlockingCollection?

Rozumiem, że jedna zmienna deklaracje współpracy z BlockingCollection takich jak ConcurrentBag<T>, ConcurrentQueue<T> itp

Tak więc, aby stworzyć ConcurrentBag zawinięte w BlockingCollection chciałbym zadeklarować i wystąpień tak:

BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>()); 

ale jak to zrobić dla ConcurrentDictionary? Potrzebuję funkcji blokowania BlockingCollection zarówno po stronie producenta, jak i konsumenta.

+0

Słownik (i ConcurrentDictionary też) nie zachowuje kolejności przedmiotów. Czy możesz opisać swój scenariusz producent-konsument? – Dennis

+0

@Dennis, jestem tego świadomy. Producent przechowuje KeyValuePairs w concurrentDictionary, a zadanie konsumenta inkrementuje int i usuwa KeyValuePair, jeśli int pasuje do odpowiedniego klucza. Robię to, ponieważ zadania robocze zapełniają concurrentDictionary wartościami, ale w dowolnej kolejności, zadanie klienta zapewnia, że ​​odebrane wartości są przekazywane/przetwarzane we właściwej kolejności. Czy ConcurrentDictionary może być opakowany w BlockingCollection? –

+0

Jakie rozwiązanie wymyśliłeś? Próbuję znaleźć dobre rozwiązanie podobnego problemu, gdy producent nie produkuje przedmiotów w kolejności wymaganej przez konsumenta. (stary post wiem, ale warto spróbować) – Kim

Odpowiedz

1

Musisz napisać własną klasę adaptera - coś takiego:

public class ConcurrentDictionaryWrapper<TKey,TValue> : IProducerConsumerCollection<KeyValuePair<TKey,TValue>> 
{ 
    private ConcurrentDictionary<TKey, TValue> dictionary; 

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
    { 
     return dictionary.GetEnumerator(); 
    } 

    IEnumerator IEnumerable.GetEnumerator() 
    { 
     return GetEnumerator(); 
    } 

    public void CopyTo(Array array, int index) 
    { 
     throw new NotImplementedException(); 
    } 

    public int Count 
    { 
     get { return dictionary.Count; } 
    } 

    public object SyncRoot 
    { 
     get { return this; } 
    } 

    public bool IsSynchronized 
    { 
     get { return true; } 
    } 

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
    { 
     throw new NotImplementedException(); 
    } 

    public bool TryAdd(KeyValuePair<TKey, TValue> item) 
    { 
     return dictionary.TryAdd(item.Key, item.Value); 
    } 

    public bool TryTake(out KeyValuePair<TKey, TValue> item) 
    { 
     item = dictionary.FirstOrDefault(); 
     TValue value; 
     return dictionary.TryRemove(item.Key, out value); 
    } 

    public KeyValuePair<TKey, TValue>[] ToArray() 
    { 
     throw new NotImplementedException(); 
    } 
} 
+1

Dzięki za sugestię kodu. Ale moim głównym celem w użyciu BlockingCollection była możliwość oznaczenia kolekcji jako Adding Completed i sprawdzenia jej statusu, a także tego, czy jest ona kompletna i pusta, podobnie jak w BlockingCollection. Jestem świadomy, że mogę łatwo dodać taką funkcjonalność, ale szukam sugestii, jak zrobić to bezpośrednio poprzez BlockingCollection. Do tej pory nie widzę powodu, dla którego nie mógłby bezpośrednio przejść przez kolekcję Blocking. Może to zajmuje tylko IProducerConsumerCollection ? –

4

Może trzeba współbieżne słownika blockingCollection

 ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>(); 
     int maxBoxes = 5; 

     CancellationTokenSource cancelationTokenSource = new CancellationTokenSource(); 
     CancellationToken cancelationToken = cancelationTokenSource.Token; 

     Random rnd = new Random(); 
     // Producer 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // put the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       box.Add("some message " + index, cancelationToken); 
       Console.WriteLine("Produced a letter to put in box " + index); 

       // Wait simulating a heavy production item. 
       Thread.Sleep(1000); 
      } 
     }); 

     // Consumer 1 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // get the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       var message = box.Take(cancelationToken); 
       Console.WriteLine("Consumed 1: " + message); 

       // consume a item cost less than produce it: 
       Thread.Sleep(50); 
      } 
     }); 

     // Consumer 2 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // get the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       var message = box.Take(cancelationToken); 
       Console.WriteLine("Consumed 2: " + message); 

       // consume a item cost less than produce it: 
       Thread.Sleep(50); 
      } 
     }); 

     Console.ReadLine(); 
     cancelationTokenSource.Cancel(); 

W ten sposób konsument, który spodziewałem się czegoś w skrzynce 5 poczeka, aż twórca umieści list w skrzynce 5.

Powiązane problemy