2015-03-03 11 views
5

Chcę ograniczyć liczbę elementów opublikowanych w potoku przepływu danych. Liczba elementów zależy od środowiska produkcyjnego. Obiekty te zużywają dużą ilość pamięci (obrazów), więc chciałbym opublikować je, gdy ostatni blok rurociągu wykonał swoją pracę.Przepływ danych w TPL: Jak dławić cały rurociąg?

Próbowałem użyć SemaphoreSlim do dławienia producenta i zwolnienia go w ostatnim bloku potoku. Działa, ale jeśli w trakcie procesu zostanie zgłoszony wyjątek, program czeka na zawsze, a wyjątek nie zostanie przechwycony.

Oto przykład, który wygląda jak nasz kod. Jak mogę to zrobić?

static void Main(string[] args) 
{ 
    SemaphoreSlim semaphore = new SemaphoreSlim(1, 2); 

    var downloadString = new TransformBlock<string, string>(uri => 
    { 
     Console.WriteLine("Downloading '{0}'...", uri); 
     return new WebClient().DownloadString(uri); 
    }); 

    var createWordList = new TransformBlock<string, string[]>(text => 
    { 
     Console.WriteLine("Creating word list..."); 

     char[] tokens = text.ToArray(); 
     for (int i = 0; i < tokens.Length; i++) 
     { 
      if (!char.IsLetter(tokens[i])) 
       tokens[i] = ' '; 
     } 
     text = new string(tokens); 

     return text.Split(new char[] { ' ' }, 
      StringSplitOptions.RemoveEmptyEntries); 
    }); 

    var filterWordList = new TransformBlock<string[], string[]>(words => 
    { 
     Console.WriteLine("Filtering word list..."); 
     throw new InvalidOperationException("ouch !"); // explicit for test 
     return words.Where(word => word.Length > 3).OrderBy(word => word) 
      .Distinct().ToArray(); 
    }); 

    var findPalindromes = new TransformBlock<string[], string[]>(words => 
    { 
     Console.WriteLine("Finding palindromes..."); 

     var palindromes = new ConcurrentQueue<string>(); 

     Parallel.ForEach(words, word => 
     { 
      string reverse = new string(word.Reverse().ToArray()); 

      if (Array.BinarySearch<string>(words, reverse) >= 0 && 
       word != reverse) 
      { 
       palindromes.Enqueue(word); 
      } 
     }); 

     return palindromes.ToArray(); 
    }); 

    var printPalindrome = new ActionBlock<string[]>(palindromes => 
    { 
     try 
     { 
      foreach (string palindrome in palindromes) 
      { 
       Console.WriteLine("Found palindrome {0}/{1}", 
        palindrome, new string(palindrome.Reverse().ToArray())); 
      } 
     } 
     finally 
     { 
      semaphore.Release(); 
     } 
    }); 

    downloadString.LinkTo(createWordList); 
    createWordList.LinkTo(filterWordList); 
    filterWordList.LinkTo(findPalindromes); 
    findPalindromes.LinkTo(printPalindrome); 


    downloadString.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)createWordList).Fault(t.Exception); 
     else createWordList.Complete(); 
    }); 
    createWordList.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)filterWordList).Fault(t.Exception); 
     else filterWordList.Complete(); 
    }); 
    filterWordList.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws 
     else findPalindromes.Complete(); 
    }); 
    findPalindromes.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched 
     else printPalindrome.Complete(); 
    }); 

    try 
    { 
     for (int i = 0; i < 10; i++) 
     { 
      Console.WriteLine(i); 

      downloadString.Post("http://www.google.com"); 
      semaphore.Wait(); // waits here when an exception throws 
     } 

     downloadString.Complete(); 

     printPalindrome.Completion.Wait(); 
    } 
    catch (AggregateException agg) 
    { 
     Console.WriteLine("An error has occured : " + agg); 
    } 
    Console.WriteLine("Done"); 
    Console.ReadKey(); 
} 

Odpowiedz

2

Powinieneś po prostu poczekać jednocześnie na semafor i zadanie zakończenia. W ten sposób, jeśli blok kończy się przedwcześnie (albo przez wyjątek, albo anulowanie), wtedy wyjątek zostanie ponownie zgłoszony, a jeśli nie, poczekasz na semafor, aż pojawi się miejsce, by opublikować więcej.

można zrobić z Task.WhenAny i SemaphoreSlim.WaitAsync:

for (int i = 0; i < 10; i++) 
{ 
    Console.WriteLine(i); 
    downloadString.Post("http://www.google.com"); 

    if (printPalindrome.Completion.IsCompleted) 
    { 
     break; 
    } 

    Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait(); 
} 

Uwaga: Używanie Task.Wait jest jedynym właściwym w tym przypadku, jak to Main. Zwykle powinno to być metoda async i powinieneś/powinnaś wykonać zadanie zwrócone przez await z Task.WhenAny.

+0

Dzięki, ta część działa świetnie. Jednak pętla nadal produkuje elementy do dwóch pierwszych bloków, które nie są oznaczone jako błędne. Jeśli modifiy tej części kodu: 'findPalindromes.Completion.ContinueWith (T => {if (t.IsFaulted) { ((IDataflowBlock) printPalindrome) .Fault (t.Exception) ((IDataflowBlock) downloadString) .Fault (t.Exception); // zaznacz, że pierwszy blok popełnił błąd, } else printPalindrome.Complete(); }); 'działa. Ale nie jestem pewien, czy jest to lepszy sposób na zrobienie tego. – n3bula

+0

Czy ta trasa nie będzie po prostu powodować, że kod działa synchronicznie, skoro tylko główny wątek ma czekać? – moarboilerplate

+0

@ n3bula Możesz po prostu sprawdzić, czy ukończono zadanie ukończenia. Spójrz na moją aktualizację. – i3arnon

0

W ten sposób radzę sobie z dławieniem lub tylko 10 elementów w bloku źródłowym w tym samym czasie. Mógłbyś to zmodyfikować, aby mieć 1. Upewnij się, że również blokujesz wszystkie inne bloki w potoku, w przeciwnym razie możesz otrzymać blok źródłowy z 1, a następny blok z dużo więcej.

var sourceBlock = new BufferBlock<string>(
    new ExecutionDataflowBlockOptions() { 
     SingleProducerConstrained = true, 
     BoundedCapacity = 10 }); 

Następnie producent robi to:

sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken); 

Jeśli używasz asynchronicznie/Oczekujcie, tylko czekają na wezwanie SendAsync.