2013-08-17 10 views
25

Mam dwa projekty w moim rozwiązaniu: projekt WPF i biblioteka klas.Jak uzyskać zwrot produktu podczas wykonywania zadania. WhenAny

w mojej klasie Biblioteka:

Mam listę Symbol:

class Symbol 
{ 
    Identifier Identifier {get;set;} 
    List<Quote> HistoricalQuotes {get;set;} 
    List<Financial> HistoricalFinancials {get;set;} 
} 

dla każdego symbolu, kwerendy usługę finansową w celu pobrania historycznych danych finansowych dla każdego z moich symboli przy użyciu WebRequest . (WebClient.DownloadStringTaskAsync (uri);)

Więc oto moja metoda to zrobić:

public async Task<IEnumerable<Symbol>> GetSymbolsAsync() 
    { 
     var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>(); 

     foreach (var symbol in await _listSymbols) 
     { 
      historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); 
     } 

     while (historicalFinancialTask.Count > 0) 
     { 
      var historicalFinancial = await Task.WhenAny(historicalFinancialTask); 
      historicalFinancialTask.Remove(historicalFinancial); 

      // the line below doesn't compile, which is understandable because method's return type is a Task of something 
      yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); 
     } 
    } 

    private async Task<HistoricalFinancialResult> GetFinancialsQueryAsync(Symbol symbol) 
    { 
     var result = new HistoricalFinancialResult(); 
     result.Symbol = symbol; 
     result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously 
     return result; 
    } 

    private class HistoricalFinancialResult 
    { 
     public Symbol Symbol { get; set; } 
     public IEnumerable<Financial> Data { get; set; } 

     // equality members 
    } 

Jak widać, chcę, że za każdym razem mogę pobrać dane finansowe za historyczny symbol, z wytworzeniem wynik zamiast czekać na zakończenie wszystkich moich połączeń z usługami finansowymi.

I w moim WPF, oto co chciałbym zrobić:

foreach(var symbol in await _service.GetSymbolsAsync()) 
{ 
     SymbolsObservableCollection.Add(symbol); 
} 

Wydaje się, że nie może przynieść powrót w sposób asynchroniczny, to jakie rozwiązanie można używać? Poza przeniesieniem mojej metody GetSymbols do mojego projektu WPF.

+0

BTW, aby przetworzyć kolekcję 'Zadania' według kolejności ich realizacji, spójrz na [' OrderByCompletion() '] (http://nitoasyncex.codeplex.com/wikipage?title=TaskExtensions&referringTitle=Documentation) z Nito AsyncEx. – svick

+0

Czy obejrzałeś [Reactive Extensions (Rx)] (http://msdn.microsoft.com/en-us/data/gg577609.aspx)? –

+0

Mam, ale krzywa podszewki jest dla mnie zbyt wysoka i właściwie nie rozumiem, co Microsoft planuje na temat przyszłości tej technologii, ponieważ mają również TPL Dataflow, który wygląda na to samo. Problem, o który tutaj pytam, jest tak prosty, więc rozwiązanie powinno być również proste. I nadal nie rozumiem, dlaczego nie zawierają RX ani Dataflow w .NET Framework, wygląda na to, że nawet oni nie ufają wystarczająco do tych dwóch frameworków, aby je uwzględnić. – Gui

Odpowiedz

39

Podczas Lubię komponenty TPL Dataflow (które sugeruje użycie svick), przejście do tego systemu wymaga dużego zaangażowania - nie jest to coś, co można dodać do istniejącego projektu. Oferuje znaczne korzyści, jeśli przetwarza duże ilości procesora i przetwarza dane i chce wykorzystać wiele rdzeni procesora. Ale najlepsze z tego jest nietrywialne.

Jego inne sugestie, używając Rx, mogą być łatwiejsze do zintegrowania z istniejącym rozwiązaniem. (Zobacz original documentation, ale do najnowszego kodu użyj pakietu nuget Rx-Main lub, jeśli chcesz spojrzeć na źródło, zobacz the Rx CodePlex site) Możliwe, że kod wywołujący będzie nadal używany przy użyciu IEnumerable<Symbol>, jeśli chcesz - możesz użyć Rx wyłącznie jako szczegółów implementacji, [edit 2013/11/09, aby dodać:] chociaż jak svick wskazał, to prawdopodobnie nie jest dobry pomysł, biorąc pod uwagę twój cel końcowy.

Zanim pokażę Ci przykład, chcę jasno powiedzieć, co dokładnie robimy. Twój przykład miał metodę z tego podpisu: „Jest to metoda, która wytwarza pojedynczy wynik typu IEnumerable<Symbol>, a może nie od razu produkować tego rezultatu”

public async Task<IEnumerable<Symbol>> GetSymbolsAsync() 

tego typu, Task<IEnumerable<Symbol>> powrócić zasadniczo mówi

Jest to jeden wynik , który wydaje mi się być przyczyną smutku, ponieważ tak naprawdę nie jest to, czego chcesz. A Task<T> (bez względu na to, co może być T) reprezentuje pojedynczą operację asynchroniczną. Może mieć wiele kroków (wiele zastosowań await, jeśli zaimplementujesz go jako metodę C# async), ale ostatecznie tworzy jedną rzecz. Chcesz produkować wiele rzeczy, w różnych czasach, więc Task<T> nie jest dobrym rozwiązaniem.

Jeśli naprawdę zamierzałeś zrobić to, co obiecuje twój podpis metodologiczny - ostatecznie dajesz jeden wynik - jednym ze sposobów, w jaki możesz to zrobić, jest stworzenie metody asynchronicznej, która utworzy listę, a następnie wytworzy ją jako wynik, gdy będzie dobrze i będzie gotowy:

// Note: this first example is *not* what you want. 
// However, it is what your method's signature promises to do. 
public async Task<IEnumerable<Symbol>> GetSymbolsAsync() 
{ 
    var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>(); 

    foreach (var symbol in await _listSymbols) 
    { 
     historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); 
    } 

    var results = new List<Symbol>(); 
    while (historicalFinancialTask.Count > 0) 
    { 
     var historicalFinancial = await Task.WhenAny(historicalFinancialTask); 
     historicalFinancialTask.Remove(historicalFinancial); 

     results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); 
    } 

    return results; 
} 

Ta metoda wykonuje swoją sygnaturę: asynchronicznie tworzy sekwencję symboli.

Prawdopodobnie chciałbyś stworzyć IEnumerable<Symbol>, który produkuje przedmioty, gdy tylko będą dostępne, zamiast czekać, aż wszystkie będą dostępne. (W przeciwnym razie możesz równie dobrze użyć WhenAll.) Możesz to zrobić, ale nie jest to właściwa metoda.

Krótko mówiąc, myślę, że chcesz utworzyć listę asynchroniczną. Jest w tym pewien typ: IObservable<T> wyraża dokładnie to, co, jak wierzę, masz nadzieję wyrazić za pomocą swojego Task<IEnumerable<Symbol>>: jest to sekwencja elementów (podobnie jak IEnumerable<T>), ale jest asynchroniczna.

To może pomóc zrozumieć przez analogię:

public Symbol GetSymbol() ... 

jest

public Task<Symbol> GetSymbolAsync() ... 

jak

public IEnumerable<Symbol> GetSymbols() ... 

jest:

public IObservable<Symbol> GetSymbolsObservable() ... 

(Niestety, w przeciwieństwie do Task<T>, nie istnieje typowa konwencja nazewnictwa dla tego, jak wywołać metodę asynchronicznej sekwencji. Dodałem tutaj "Observable", ale to nie jest powszechna praktyka. . I na pewno nie nazwałbym tego GetSymbolsAsync ponieważ ludzie będą oczekiwać, że do zwróci Task)

Ujmując to w inny sposób, Task<IEnumerable<T>> mówi „będę produkować tę kolekcję, kiedy jestem dobry i gotowy”, natomiast IObservable<T> mówi: "Oto kolekcja. Produkuję każdy przedmiot, kiedy jestem dobry i gotowy".

Tak więc, chcesz metodę, która zwraca sekwencję obiektów Symbol, gdzie obiekty te są produkowane asynchronicznie. To mówi nam, że naprawdę powinieneś zwrócić IObservable<Symbol>.Oto realizacja:

// Unlike this first example, this *is* what you want. 
public IObservable<Symbol> GetSymbolsRx() 
{ 
    return Observable.Create<Symbol>(async obs => 
    { 
     var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>(); 

     foreach (var symbol in await _listSymbols) 
     { 
      historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); 
     } 

     while (historicalFinancialTask.Count > 0) 
     { 
      var historicalFinancial = await Task.WhenAny(historicalFinancialTask); 
      historicalFinancialTask.Remove(historicalFinancial); 

      obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); 
     } 
    }); 
} 

Jak widać, to pozwala pisać dość dużo co liczyliśmy napisać - ciało tego kodu jest niemal identyczny do Ciebie. Jedyną różnicą jest to, że gdy używałeś yield return (który nie kompilował), wywołuje to metodę OnNext na obiekcie dostarczanym przez Rx.

Po napisane, że można łatwo owinąć to w IEnumerable<Symbol> ([redakcją 2013/11/29 dodać:] chociaż prawdopodobnie nie naprawdę chcesz to zrobić - patrz dodatek na końcu odpowiedź):

public IEnumerable<Symbol> GetSymbols() 
{ 
    return GetSymbolsRx().ToEnumerable(); 
} 

To może nie wyglądać asynchronicznie, ale w rzeczywistości pozwala na działanie asynchronicznego kodu źródłowego. Gdy wywołasz tę metodę, nie będzie ona blokować - nawet jeśli podstawowy kod, który wykonuje pracę pobierania informacji finansowych, nie może natychmiast uzyskać wyniku, ta metoda natychmiast zwróci wartość IEnumerable<Symbol>. Oczywiście każdy kod, który spróbuje iterować przez tę kolekcję, zostanie zablokowany, jeśli dane nie są jeszcze dostępne. Ale krytyczny jest to, że robi to, co myślę, że pierwotnie były próby osiągnięcia:

  • Otrzymasz napisać metodę async że działa (delegata w moim przykładzie, przekazany jako argument do Observable.Create<T> ale można Napisać autonomicznego async metodę, jeśli wolisz)
  • Kod wywołujący nie będą blokowane jedynie jako rezultat z prośbą, aby rozpocząć pobieranie symbole
  • Powstały IEnumerable<Symbol> będzie produkować każdy pojedynczy element, jak tylko stanie się ona dostępna

Dzieje się tak, ponieważ metoda Rx ToEnumerable zawiera pewien sprytny kod, który wypełnia lukę pomiędzy synchronicznym widokiem świata IEnumerable<T> a asynchroniczną produkcją wyników. (Innymi słowy, robi to dokładnie to, co rozczarowałeś, odkrywając, że C# nie był w stanie zrobić dla ciebie).

Jeśli jesteś ciekawy, możesz spojrzeć na źródło. Kod, który leży u podstaw tego, co robi ToEnumerable można znaleźć na stronie https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs

[Edited 29.11.2013 dodać:]

svick zauważył w komentarzach czegoś mi brakowało: Twój ostatecznym celem jest wprowadzenie zawartość do ObservableCollection<Symbol>. Jakoś tego nie widziałem. To oznacza, że ​​IEnumerable<T> jest niewłaściwą drogą - chcesz zapełnić kolekcję, gdy elementy stają się dostępne, zamiast wykonywać pętlę foreach. Więc po prostu wykonaj to:

GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol)); 

lub coś podobnego. To spowoduje dodanie elementów do kolekcji, gdy będą dostępne.

Wszystko zależy od tego, co się dzieje na nitce interfejsu użytkownika. Tak długo, jak to jest, twój kod asynchroniczny powinien kończyć się na wątku UI, co oznacza, że ​​gdy elementy zostaną dodane do kolekcji, dzieje się tak również w wątku UI.Ale jeśli z jakiegoś powodu kończysz uruchamianie rzeczy z wątku roboczego (lub jeśli używasz ConfigureAwait w którymkolwiek z oczekiwań, a tym samym zerwania połączenia z wątkiem UI), musisz zaaranżować obsługę elementów z Rx stream na prawym gwintem:

GetSymbolsRx() 
    .ObserveOnDispatcher() 
    .Subscribe(symbol => SymbolsObservableCollection.Add(symbol)); 

Jeśli jesteś w wątku UI, gdy to zrobisz, to będzie podnieść aktualny dyspozytora, i zapewnienia, że ​​wszystkie notyfikacje przybywają przez nią. Jeśli jesteś już w niewłaściwym wątku, gdy zasubskrybujesz, możesz użyć przeciążenia ObserveOn, które pobiera dyspozytora. (Te wymagają, aby mieć odniesienie do System.Reactive.Windows.Threading A to rozszerzenie metod, więc trzeba mieć using ich zawierającego nazw, który jest nazywany również System.Reactive.Windows.Threading.)

+4

Chociaż zgadzam się z większością odpowiedzi, myślę, że 'ToEnumerable()' nie jest właściwym rozwiązaniem. Zgodnie z pytaniem, celem końcowym jest wypełnienie "ObservabaleCollection", gdy elementy staną się dostępne. Ponieważ zwykle musisz to zrobić z wątku UI, użycie 'ToEnumerable()' zmusiłoby Cię do zablokowania wątku UI przez cały czas. – svick

+1

Nie wiem, jak to przegapiłem - dzięki. Edytowałem to w świetle Twojego komentarza. –

+1

+1 za dobrze napisaną odpowiedź i blog. W ten sposób inspiruje mnie stacoverflow :) –

0

Uważam, że nie można również zastosować metody iteracyjnej w metodzie async. To jest ograniczenie .NET. Spójrz na korzystanie z Task Parallel Library Dataflow, może być używany do przetwarzania danych, gdy staną się dostępne. A także Reactive Extensions.

6

To, o co prosisz, nie ma większego sensu, ponieważ IEnumerable<T> jest interfejsem synchronicznym. Innymi słowy, jeśli element nie jest jeszcze dostępny, metoda MoveNext() musi zostać zablokowana, nie ma innego wyboru.

Potrzebna jest asynchroniczna wersja IEnumerable<T>. W tym celu możesz użyć IObservable<T> z bloku Rx lub (mój ulubiony) z TPL dataflow. Z tym, kod może wyglądać tak (ja również uległy zmianie niektórych zmiennych, aby lepiej nazwami):

public IReceivableSourceBlock<Symbol> GetSymbolsAsync() 
{ 
    var block = new BufferBlock<Symbol>(); 

    GetSymbolsAsyncCore(block).ContinueWith(
     task => ((IDataflowBlock)block).Fault(task.Exception), 
     TaskContinuationOptions.NotOnRanToCompletion); 

    return block; 
} 

private async Task GetSymbolsAsyncCore(ITargetBlock<Symbol> block) 
{ 
    // snip 

    while (historicalFinancialTasks.Count > 0) 
    { 
     var historicalFinancialTask = 
      await Task.WhenAny(historicalFinancialTasks); 
     historicalFinancialTasks.Remove(historicalFinancialTask); 
     var historicalFinancial = historicalFinancialTask.Result; 

     var symbol = new Symbol(
      historicalFinancial.Symbol.Identifier, 
      historicalFinancial.Symbol.HistoricalQuotes, 
      historicalFinancial.Data); 

     await block.SendAsync(symbol); 
    } 
} 

i użytkowania mogą być:

var symbols = _service.GetSymbolsAsync(); 
while (await symbols.OutputAvailableAsync()) 
{ 
    Symbol symbol; 
    while (symbols.TryReceive(out symbol)) 
     SymbolsObservableCollection.Add(symbol); 
} 

Lub:

var symbols = _service.GetSymbolsAsync(); 
var addToCollectionBlock = new ActionBlock<Symbol>(
    symbol => SymbolsObservableCollection.Add(symbol)); 
symbols.LinkTo(
    addToCollectionBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
await symbols.Completion; 
+0

dzięki za odpowiedź. Nigdy nie poświęciłem czasu, aby zagłębić się w TPL Dataflow i RX, które słyszałem tylko po imieniu. Całkowicie zaginęło, gdy przeczytałem przykład kodu podanego przy użyciu TPL Dataflow. Czy możesz polecić mi stronę internetową lub dowolną książkę, aby uzyskać szybki rozwój dzięki TPL Dataflow? Zrobiłem kilka badań na ten temat, a te ramy są już od 1 roku, obawiam się, że nie znajdę dużej ilości dokumentacji, kiedy utknę, by coś z tym zrobić. Czy wiesz, dlaczego nie jest on włączony w .NET Framework? – Gui

+0

@Gui Nie jest zawarty bezpośrednio w ramce, dzięki czemu można go aktualizować częściej. – svick

+0

Myślę, że chodziło o 'GetSymbolsAsyncCore (blok) .ContinueWith (...)' zamiast 'GetSymbolsAsyncCore(). ContinueWith (...)'. Kolejnym hitpick prawdopodobnie nie jest użycie sufiksu Async dla metod, które nie są asynchroniczne same, ale zwraca strukturę danych, która nadal zmienia się asynchronicznie. Używanie sufiksu Async daje wrażenie, że słowo kluczowe czeka na zastosowanie w tej metodzie. – ShitalShah

0

dlaczego nie coś takiego:

public async IEnumerable<Task<Symbol>> GetSymbolsAsync() 
{ 
    var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>(); 

    foreach (var symbol in await _listSymbols) 
    { 
     historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); 
    } 

    while (historicalFinancialTask.Count > 0) 
    { 
     var historicalFinancial = await Task.WhenAny(historicalFinancialTask); 
     historicalFinancialTask.Remove(historicalFinancial); 

     yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); 
    } 
} 
+0

Działa to, ale ma pewne problemy. Wysyła całą pracę asynchroniczną z góry, niezależnie od tego, czy ktoś kiedykolwiek wyliczy kolekcję, i jednocześnie wywołuje pracę asynchroniczną (która może być nieodpowiednia w zależności od źródła danych). W przypadku podejścia opartego na RX pierwszy problem znika, a drugi jest kwestią wyboru. – piers7

+0

Czy to działa? Wygląda mi na to, że GetSymbolsAsync musi zwrócić zadanie – Vidar

Powiązane problemy