Podczas Lubię komponenty TPL Dataflow (które sugeruje użycie svick), przejście do tego systemu wymaga dużego zaangażowania - nie jest to coś, co można dodać do istniejącego projektu. Oferuje znaczne korzyści, jeśli przetwarza duże ilości procesora i przetwarza dane i chce wykorzystać wiele rdzeni procesora. Ale najlepsze z tego jest nietrywialne.
Jego inne sugestie, używając Rx, mogą być łatwiejsze do zintegrowania z istniejącym rozwiązaniem. (Zobacz original documentation, ale do najnowszego kodu użyj pakietu nuget Rx-Main lub, jeśli chcesz spojrzeć na źródło, zobacz the Rx CodePlex site) Możliwe, że kod wywołujący będzie nadal używany przy użyciu IEnumerable<Symbol>
, jeśli chcesz - możesz użyć Rx wyłącznie jako szczegółów implementacji, [edit 2013/11/09, aby dodać:] chociaż jak svick wskazał, to prawdopodobnie nie jest dobry pomysł, biorąc pod uwagę twój cel końcowy.
Zanim pokażę Ci przykład, chcę jasno powiedzieć, co dokładnie robimy. Twój przykład miał metodę z tego podpisu: „Jest to metoda, która wytwarza pojedynczy wynik typu IEnumerable<Symbol>
, a może nie od razu produkować tego rezultatu”
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
tego typu, Task<IEnumerable<Symbol>>
powrócić zasadniczo mówi
Jest to jeden wynik , który wydaje mi się być przyczyną smutku, ponieważ tak naprawdę nie jest to, czego chcesz. A Task<T>
(bez względu na to, co może być T
) reprezentuje pojedynczą operację asynchroniczną. Może mieć wiele kroków (wiele zastosowań await
, jeśli zaimplementujesz go jako metodę C# async
), ale ostatecznie tworzy jedną rzecz. Chcesz produkować wiele rzeczy, w różnych czasach, więc Task<T>
nie jest dobrym rozwiązaniem.
Jeśli naprawdę zamierzałeś zrobić to, co obiecuje twój podpis metodologiczny - ostatecznie dajesz jeden wynik - jednym ze sposobów, w jaki możesz to zrobić, jest stworzenie metody asynchronicznej, która utworzy listę, a następnie wytworzy ją jako wynik, gdy będzie dobrze i będzie gotowy:
// Note: this first example is *not* what you want.
// However, it is what your method's signature promises to do.
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();
foreach (var symbol in await _listSymbols)
{
historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
}
var results = new List<Symbol>();
while (historicalFinancialTask.Count > 0)
{
var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
historicalFinancialTask.Remove(historicalFinancial);
results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
}
return results;
}
Ta metoda wykonuje swoją sygnaturę: asynchronicznie tworzy sekwencję symboli.
Prawdopodobnie chciałbyś stworzyć IEnumerable<Symbol>
, który produkuje przedmioty, gdy tylko będą dostępne, zamiast czekać, aż wszystkie będą dostępne. (W przeciwnym razie możesz równie dobrze użyć WhenAll
.) Możesz to zrobić, ale nie jest to właściwa metoda.
Krótko mówiąc, myślę, że chcesz utworzyć listę asynchroniczną. Jest w tym pewien typ: IObservable<T>
wyraża dokładnie to, co, jak wierzę, masz nadzieję wyrazić za pomocą swojego Task<IEnumerable<Symbol>>
: jest to sekwencja elementów (podobnie jak IEnumerable<T>
), ale jest asynchroniczna.
To może pomóc zrozumieć przez analogię:
public Symbol GetSymbol() ...
jest
public Task<Symbol> GetSymbolAsync() ...
jak
public IEnumerable<Symbol> GetSymbols() ...
jest:
public IObservable<Symbol> GetSymbolsObservable() ...
(Niestety, w przeciwieństwie do Task<T>
, nie istnieje typowa konwencja nazewnictwa dla tego, jak wywołać metodę asynchronicznej sekwencji. Dodałem tutaj "Observable", ale to nie jest powszechna praktyka. . I na pewno nie nazwałbym tego GetSymbolsAsync
ponieważ ludzie będą oczekiwać, że do zwróci Task
)
Ujmując to w inny sposób, Task<IEnumerable<T>>
mówi „będę produkować tę kolekcję, kiedy jestem dobry i gotowy”, natomiast IObservable<T>
mówi: "Oto kolekcja. Produkuję każdy przedmiot, kiedy jestem dobry i gotowy".
Tak więc, chcesz metodę, która zwraca sekwencję obiektów Symbol
, gdzie obiekty te są produkowane asynchronicznie. To mówi nam, że naprawdę powinieneś zwrócić IObservable<Symbol>
.Oto realizacja:
// Unlike this first example, this *is* what you want.
public IObservable<Symbol> GetSymbolsRx()
{
return Observable.Create<Symbol>(async obs =>
{
var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();
foreach (var symbol in await _listSymbols)
{
historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
}
while (historicalFinancialTask.Count > 0)
{
var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
historicalFinancialTask.Remove(historicalFinancial);
obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
}
});
}
Jak widać, to pozwala pisać dość dużo co liczyliśmy napisać - ciało tego kodu jest niemal identyczny do Ciebie. Jedyną różnicą jest to, że gdy używałeś yield return
(który nie kompilował), wywołuje to metodę OnNext
na obiekcie dostarczanym przez Rx.
Po napisane, że można łatwo owinąć to w IEnumerable<Symbol>
([redakcją 2013/11/29 dodać:] chociaż prawdopodobnie nie naprawdę chcesz to zrobić - patrz dodatek na końcu odpowiedź):
public IEnumerable<Symbol> GetSymbols()
{
return GetSymbolsRx().ToEnumerable();
}
To może nie wyglądać asynchronicznie, ale w rzeczywistości pozwala na działanie asynchronicznego kodu źródłowego. Gdy wywołasz tę metodę, nie będzie ona blokować - nawet jeśli podstawowy kod, który wykonuje pracę pobierania informacji finansowych, nie może natychmiast uzyskać wyniku, ta metoda natychmiast zwróci wartość IEnumerable<Symbol>
. Oczywiście każdy kod, który spróbuje iterować przez tę kolekcję, zostanie zablokowany, jeśli dane nie są jeszcze dostępne. Ale krytyczny jest to, że robi to, co myślę, że pierwotnie były próby osiągnięcia:
- Otrzymasz napisać metodę
async
że działa (delegata w moim przykładzie, przekazany jako argument do Observable.Create<T>
ale można Napisać autonomicznego async
metodę, jeśli wolisz)
- Kod wywołujący nie będą blokowane jedynie jako rezultat z prośbą, aby rozpocząć pobieranie symbole
- Powstały
IEnumerable<Symbol>
będzie produkować każdy pojedynczy element, jak tylko stanie się ona dostępna
Dzieje się tak, ponieważ metoda Rx ToEnumerable
zawiera pewien sprytny kod, który wypełnia lukę pomiędzy synchronicznym widokiem świata IEnumerable<T>
a asynchroniczną produkcją wyników. (Innymi słowy, robi to dokładnie to, co rozczarowałeś, odkrywając, że C# nie był w stanie zrobić dla ciebie).
Jeśli jesteś ciekawy, możesz spojrzeć na źródło. Kod, który leży u podstaw tego, co robi ToEnumerable
można znaleźć na stronie https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs
[Edited 29.11.2013 dodać:]
svick zauważył w komentarzach czegoś mi brakowało: Twój ostatecznym celem jest wprowadzenie zawartość do ObservableCollection<Symbol>
. Jakoś tego nie widziałem. To oznacza, że IEnumerable<T>
jest niewłaściwą drogą - chcesz zapełnić kolekcję, gdy elementy stają się dostępne, zamiast wykonywać pętlę foreach
. Więc po prostu wykonaj to:
GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
lub coś podobnego. To spowoduje dodanie elementów do kolekcji, gdy będą dostępne.
Wszystko zależy od tego, co się dzieje na nitce interfejsu użytkownika. Tak długo, jak to jest, twój kod asynchroniczny powinien kończyć się na wątku UI, co oznacza, że gdy elementy zostaną dodane do kolekcji, dzieje się tak również w wątku UI.Ale jeśli z jakiegoś powodu kończysz uruchamianie rzeczy z wątku roboczego (lub jeśli używasz ConfigureAwait
w którymkolwiek z oczekiwań, a tym samym zerwania połączenia z wątkiem UI), musisz zaaranżować obsługę elementów z Rx stream na prawym gwintem:
GetSymbolsRx()
.ObserveOnDispatcher()
.Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
Jeśli jesteś w wątku UI, gdy to zrobisz, to będzie podnieść aktualny dyspozytora, i zapewnienia, że wszystkie notyfikacje przybywają przez nią. Jeśli jesteś już w niewłaściwym wątku, gdy zasubskrybujesz, możesz użyć przeciążenia ObserveOn
, które pobiera dyspozytora. (Te wymagają, aby mieć odniesienie do System.Reactive.Windows.Threading
A to rozszerzenie metod, więc trzeba mieć using
ich zawierającego nazw, który jest nazywany również System.Reactive.Windows.Threading
.)
BTW, aby przetworzyć kolekcję 'Zadania' według kolejności ich realizacji, spójrz na [' OrderByCompletion() '] (http://nitoasyncex.codeplex.com/wikipage?title=TaskExtensions&referringTitle=Documentation) z Nito AsyncEx. – svick
Czy obejrzałeś [Reactive Extensions (Rx)] (http://msdn.microsoft.com/en-us/data/gg577609.aspx)? –
Mam, ale krzywa podszewki jest dla mnie zbyt wysoka i właściwie nie rozumiem, co Microsoft planuje na temat przyszłości tej technologii, ponieważ mają również TPL Dataflow, który wygląda na to samo. Problem, o który tutaj pytam, jest tak prosty, więc rozwiązanie powinno być również proste. I nadal nie rozumiem, dlaczego nie zawierają RX ani Dataflow w .NET Framework, wygląda na to, że nawet oni nie ufają wystarczająco do tych dwóch frameworków, aby je uwzględnić. – Gui