2014-12-27 8 views
5

Szukam pomocy, jak użyć słowa kluczowego yield, aby zwrócić IEnumberable w blokach równoległych lub bloku zadań. Poniżej znajduje się kod pseudojak używać yield, aby zwrócić kolekcję Item w bloku równoległym lub Task

public IEnumerable<List<T>> ReadFile() 
{ 
    foreach (string filepath in lstOfFiles) 
    { 
     var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read); 
     foreach (var item in ReadStream(stream)) 
      yield return item; //where item is of type List<string> 
    } 
} 

Chcę przekształcić powyższy kod do bloku równoległego jak poniżej

lstOfFiles.AsParallel() 
      .ForAll(filepath => 
{ 
    var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read); 
    foreach (var item in ReadStream(Stream)) 
     yield return item; 
}); 

ale kompilator zgłasza błąd że wydajność nie może być stosowany w blokach równoległych lub anonimowego delegata. Próbowałem również z bloku zadań, wydajność nie jest dozwolona w zadania anonimowy delegat

Każdy sugeruje mi prosty i najlepszy sposób na uzyskanie zwrotu danych w równoległych blokach lub zadaniu.

Przeczytałem, że RX 2.0 lub TPL są dobre do użycia w powyższym scenariuszu. Mam wątpliwości, czy użyć biblioteki RX lub TPL do asynchronicznego zwrotu wydajności wartości. Czy ktoś może mi zasugerować, która jest lepsza albo Rx, albo TPL.

Jeśli korzystam z Rx, czy konieczne jest utworzenie subskrypcji i konwersji równoległego bloku AsObservable.

Odpowiedz

0

Wygląda na to, że chcesz użyć SelectMany. Nie można używać yield w sposób anonimowy, ale można się do tego przełamać nowej metody, tak jak poniżej:

IEnumerable<Item> items = lstOfFiles.AsParallel() 
    .SelectMany((filepath) => ReadItems(filepath)); 

IEnumerable<Item> ReadItems(string filePath) 
{ 
    using(var Stream = new FileStream(filePath, FileMode.Open, FileAccess.Read)) 
    { 
     foreach (var item in ReadStream(Stream)) 
      yield return item; 
    } 
} 
+0

Cześć Nelson, za udzielenie pomocy, wygląda na to, że błąd kompilacji kompilatora nie może zostać wywnioskowany z użycia. Wyjaśnienie argumentu jawnie. – user145610

+0

Chcesz wyjaśnić sprawę? –

1

Aby korzystać Rx, będziesz musiał użyć IObservable<T> zamiast IEnumerable<T>.

public IObservable<T> ReadFiles() 
{ 
    return from filepath in lstOfFiles.ToObservable() 
     from item in Observable.Using(() => File.OpenRead(filepath), ReadStream) 
     select item; 
} 

Za każdym razem, że zadzwonisz Subscribe na obserwowalne zwrócony przez ReadFiles będzie iteracyjne nad wszystkie struny w lstOfFiles i równolegle *, czytać każdy strumień pliku.

Po kolei zapytanie otwiera każdy strumień plików i przekazuje go do ReadStream, który jest odpowiedzialny za generowanie asynchronicznej sekwencji elementów dla danego strumienia.

ReadFiles kwerenda, która używa operatora SelectMany napisany w składni zapytania zrozumieniem, łączy każdy „element”, który jest generowany przez wszystkie ReadStream obserwabli w jednym zaobserwowania sekwencji, z poszanowaniem asynchrony źródła.

Powinieneś zdecydowanie rozważyć napisanie async iterator dla swojej metody ReadStream, tak jak tutaj pokazałem; w przeciwnym razie, jeśli musisz zwrócić wartość IEnumerable<T>, musisz ją przekonwertować, stosując operator ToObservable(scheduler) za pomocą programu do planowania współbieżności, który może być mniej wydajny.

public IObservable<Item> ReadStream(Stream stream) 
{ 
    return Observable.Create<Item>(async (observer, cancel) => 
    { 
    // Here's one example of reading a stream with fixed item lengths. 

    var buffer = new byte[itemLength]; // TODO: Define itemLength 
    var remainder = itemLength; 
    int read; 

    do 
    { 
     read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel) 
         .ConfigureAwait(false); 

     remainder -= read; 

     if (read == 0) 
     { 
     if (remainder < itemLength) 
     { 
      throw new InvalidOperationException("End of stream unexpected."); 
     } 
     else 
     { 
      break; 
     } 
     } 
     else if (remainder == 0) 
     { 
     observer.OnNext(ReadItem(buffer)); // TODO: Define ReadItem 

     remainder = itemLength; 
     } 
    } 
    while (true); 
    }); 
} 

* Rx nie wprowadza tutaj żadnej współbieżności. Paralelizacja jest po prostu wynikiem asynchronicznej natury bazowego API, więc jest bardzo wydajna. Odczytanie asynchronicznie ze strumienia plików może spowodować, że system Windows użyje portu zakończenia operacji wejścia/wyjścia jako optymalizacji, powiadamiając o wspólnym wątku, gdy każdy bufor stanie się dostępny. Zapewnia to, że system Windows ponosi całkowitą odpowiedzialność za planowanie wywołań zwrotnych do aplikacji, a nie do licencji TPL lub do Ciebie.

Rx jest wolnodłonowy, więc każde powiadomienie do obserwatora może znajdować się na innym wątku; jednak z powodu kontraktu serializacyjnego Rx (§4.2 Rx Design Guidelines), nie będziesz otrzymywać zachodzących na siebie powiadomień w swoim obserwatorze, gdy zadzwonisz pod numer Subscribe, więc nie musisz zapewniać jawnej synchronizacji, takiej jak blokowanie.

Jednak ze względu na zrównolegloną naturę tego zapytania można zaobserwować naprzemienne powiadomienia dotyczące każdego pliku, ale nigdy zachodzące na siebie powiadomienia.

Jeśli wolisz otrzymać wszystkie przedmioty dla danego pliku na raz, jak pan zasugerował w swoim pytaniu, można po prostu zastosować operator ToList do kwerendy i zmienić typ powrotu:

public IObservable<IList<T>> ReadFiles() 
{ 
    return from filepath in lstOfFiles.ToObservable() 
     from items in Observable.Using(() => File.OpenRead(filepath), ReadStream) 
           .ToList() 
     select items; 
} 

Jeśli chcesz obserwować powiadomienia z powinowactwem wątku (na przykład w wątku GUI), musisz przekazać powiadomienia, ponieważ będą przychodzić na wątku. Ponieważ to zapytanie nie wprowadza samej współbieżności, najlepszym sposobem na osiągnięcie tego jest zastosowanie operatora ObserveOnDispatcher (WPF, Store Apps, Phone, Silverlight) lub przeciążenia ObserveOn(SynchronizationContext) (WinForms, ASP.NET, itp.). Po prostu nie zapomnij dodać odnośnika do odpowiedniego pakietu NuGet specyficznego dla platformy; np. Rx-Wpf, Rx-WinForms, Rx-WindowsStore, itp.

Możesz ulec pokusie, aby przekonwertować obserwowalne z powrotem na IEnumerable<T> zamiast dzwonić pod numer Subscribe. Nie rób tego. W większości przypadków jest to niepotrzebne, może być nieefektywne, aw najgorszym przypadku potencjalnie może powodować martwe blokady. Gdy wkroczysz w świat asynchronii, powinieneś spróbować pozostać w nim. Dotyczy to nie tylko Rx, ale także async/await.

+0

Hej Dave, jestem nowy Rx, ale jak możemy mieć subskrybować zamiast tolist(), który może spowodować zakleszczenie, jak na oświadczeniu ur Chciałbym wiedzieć, jak subskrybować, gdy mutliple z klauzuli są dostępne – user145610

+0

Operator 'ToList', że I "Używane w moim przykładzie jest zdefiniowany w klasie" System.Reactive.Linq.Observable' i działa na 'IObservable ', a nie 'IEnumerable '. Wartość zwracana to 'IObservable >', więc jest bezpieczna. Również nie powinieneś wywoływać 'Subscribe' w środku zapytania. Zadzwoń tylko na sam koniec. –

+0

Thnx Dave, znalazłem tolist z LINQ.Observable po opublikowaniu mojego komentarza, przetestowałem go, nie blokując. Dodałem subskrypcję jak poniżej, która działa idealnie ReadFiles (childfiles) .ObserveOn (Scheduler.Default). Subskrybuj ((wynik) => {Console.WriteLine (result.Count); Console.WriteLine ("Identyfikator wątku subskrypcji: {0} ", Thread.CurrentThread.ManagedThreadId);}); Mam jeszcze jedno pytanie, kiedy wykonuję powyższy kod, zawsze widzę, że wszystkie metody "ReadStream" działają w jednym wątku i subskrybenta w innym wątku, Czy możliwe jest uruchamianie wszystkich plików (mówi 4 pliki) w różnych wątkach i jednym subskrybowanym wątku – user145610

Powiązane problemy