I przyjął moje wykonanie równoległego/konsumenta w oparciu o kod w this questionParallel.ForEach utknęły w martwym punkcie, gdy zintegrowany z BlockingCollection
class ParallelConsumer<T> : IDisposable
{
private readonly int _maxParallel;
private readonly Action<T> _action;
private readonly TaskFactory _factory = new TaskFactory();
private CancellationTokenSource _tokenSource;
private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
private Task _task;
public ParallelConsumer(int maxParallel, Action<T> action)
{
_maxParallel = maxParallel;
_action = action;
}
public void Start()
{
try
{
_tokenSource = new CancellationTokenSource();
_task = _factory.StartNew(
() =>
{
Parallel.ForEach(
_entries.GetConsumingEnumerable(),
new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token },
(item, loopState) =>
{
Log("Taking" + item);
if (!_tokenSource.IsCancellationRequested)
{
_action(item);
Log("Finished" + item);
}
else
{
Log("Not Taking" + item);
_entries.CompleteAdding();
loopState.Stop();
}
});
},
_tokenSource.Token);
}
catch (OperationCanceledException oce)
{
System.Diagnostics.Debug.WriteLine(oce);
}
}
private void Log(string message)
{
Console.WriteLine(message);
}
public void Stop()
{
Dispose();
}
public void Enqueue(T entry)
{
Log("Enqueuing" + entry);
_entries.Add(entry);
}
public void Dispose()
{
if (_task == null)
{
return;
}
_tokenSource.Cancel();
while (!_task.IsCanceled)
{
}
_task.Dispose();
_tokenSource.Dispose();
_task = null;
}
}
A oto kod testowy
class Program
{
static void Main(string[] args)
{
TestRepeatedEnqueue(100, 1);
}
private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
{
bool[] flags = new bool[itemCount];
var consumer = new ParallelConsumer<int>(parallelCount,
(i) =>
{
flags[i] = true;
}
);
consumer.Start();
for (int i = 0; i < itemCount; i++)
{
consumer.Enqueue(i);
}
Thread.Sleep(1000);
Debug.Assert(flags.All(b => b == true));
}
}
Test zawsze kończy się niepowodzeniem - to zawsze zatrzymywało się na około 93 pozycji ze 100 testowanych. Każdy pomysł, która część mojego kodu spowodowała ten problem i jak to naprawić?
Dzięki. To rozwiązało mój problem. W każdym razie, kiedy testuję dalej, kod w moim OP nie zawiedzie, gdy numer pozycji jest członkiem tej sekwencji, [A200672] (http://oeis.org/A200672) np. 1, 2, 3, 5, 7, 9, 13, 17, 21, 29, 37, 45, 61, 77, 93, ... Jakiś pomysł dlaczego? po prostu ciekawy. – user69715
@ user69715 To dziwne zachowanie, które znalazłem, gdy próbowałem zrobić coś podobnego. Przypuszczam, że jest to po prostu dziwna interakcja między Parallel.ForEach() i podstawową BlockingCollection, ale naprawdę nie mogę tego wyjaśnić. –