Aby korzystać Rx, będziesz musiał użyć IObservable<T>
zamiast IEnumerable<T>
.
public IObservable<T> ReadFiles()
{
return from filepath in lstOfFiles.ToObservable()
from item in Observable.Using(() => File.OpenRead(filepath), ReadStream)
select item;
}
Za każdym razem, że zadzwonisz Subscribe
na obserwowalne zwrócony przez ReadFiles
będzie iteracyjne nad wszystkie struny w lstOfFiles
i równolegle *, czytać każdy strumień pliku.
Po kolei zapytanie otwiera każdy strumień plików i przekazuje go do ReadStream
, który jest odpowiedzialny za generowanie asynchronicznej sekwencji elementów dla danego strumienia.
ReadFiles
kwerenda, która używa operatora SelectMany
napisany w składni zapytania zrozumieniem, łączy każdy „element”, który jest generowany przez wszystkie ReadStream
obserwabli w jednym zaobserwowania sekwencji, z poszanowaniem asynchrony źródła.
Powinieneś zdecydowanie rozważyć napisanie async iterator dla swojej metody ReadStream
, tak jak tutaj pokazałem; w przeciwnym razie, jeśli musisz zwrócić wartość IEnumerable<T>
, musisz ją przekonwertować, stosując operator ToObservable(scheduler)
za pomocą programu do planowania współbieżności, który może być mniej wydajny.
public IObservable<Item> ReadStream(Stream stream)
{
return Observable.Create<Item>(async (observer, cancel) =>
{
// Here's one example of reading a stream with fixed item lengths.
var buffer = new byte[itemLength]; // TODO: Define itemLength
var remainder = itemLength;
int read;
do
{
read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel)
.ConfigureAwait(false);
remainder -= read;
if (read == 0)
{
if (remainder < itemLength)
{
throw new InvalidOperationException("End of stream unexpected.");
}
else
{
break;
}
}
else if (remainder == 0)
{
observer.OnNext(ReadItem(buffer)); // TODO: Define ReadItem
remainder = itemLength;
}
}
while (true);
});
}
* Rx nie wprowadza tutaj żadnej współbieżności. Paralelizacja jest po prostu wynikiem asynchronicznej natury bazowego API, więc jest bardzo wydajna. Odczytanie asynchronicznie ze strumienia plików może spowodować, że system Windows użyje portu zakończenia operacji wejścia/wyjścia jako optymalizacji, powiadamiając o wspólnym wątku, gdy każdy bufor stanie się dostępny. Zapewnia to, że system Windows ponosi całkowitą odpowiedzialność za planowanie wywołań zwrotnych do aplikacji, a nie do licencji TPL lub do Ciebie.
Rx jest wolnodłonowy, więc każde powiadomienie do obserwatora może znajdować się na innym wątku; jednak z powodu kontraktu serializacyjnego Rx (§4.2 Rx Design Guidelines), nie będziesz otrzymywać zachodzących na siebie powiadomień w swoim obserwatorze, gdy zadzwonisz pod numer Subscribe
, więc nie musisz zapewniać jawnej synchronizacji, takiej jak blokowanie.
Jednak ze względu na zrównolegloną naturę tego zapytania można zaobserwować naprzemienne powiadomienia dotyczące każdego pliku, ale nigdy zachodzące na siebie powiadomienia.
Jeśli wolisz otrzymać wszystkie przedmioty dla danego pliku na raz, jak pan zasugerował w swoim pytaniu, można po prostu zastosować operator ToList
do kwerendy i zmienić typ powrotu:
public IObservable<IList<T>> ReadFiles()
{
return from filepath in lstOfFiles.ToObservable()
from items in Observable.Using(() => File.OpenRead(filepath), ReadStream)
.ToList()
select items;
}
Jeśli chcesz obserwować powiadomienia z powinowactwem wątku (na przykład w wątku GUI), musisz przekazać powiadomienia, ponieważ będą przychodzić na wątku. Ponieważ to zapytanie nie wprowadza samej współbieżności, najlepszym sposobem na osiągnięcie tego jest zastosowanie operatora ObserveOnDispatcher
(WPF, Store Apps, Phone, Silverlight) lub przeciążenia ObserveOn(SynchronizationContext)
(WinForms, ASP.NET, itp.). Po prostu nie zapomnij dodać odnośnika do odpowiedniego pakietu NuGet specyficznego dla platformy; np. Rx-Wpf, Rx-WinForms, Rx-WindowsStore, itp.
Możesz ulec pokusie, aby przekonwertować obserwowalne z powrotem na IEnumerable<T>
zamiast dzwonić pod numer Subscribe
. Nie rób tego. W większości przypadków jest to niepotrzebne, może być nieefektywne, aw najgorszym przypadku potencjalnie może powodować martwe blokady. Gdy wkroczysz w świat asynchronii, powinieneś spróbować pozostać w nim. Dotyczy to nie tylko Rx, ale także async/await
.
Cześć Nelson, za udzielenie pomocy, wygląda na to, że błąd kompilacji kompilatora nie może zostać wywnioskowany z użycia. Wyjaśnienie argumentu jawnie. – user145610
Chcesz wyjaśnić sprawę? –