2012-04-24 47 views
18

Tak więc w czasie smutnych dni w C# 4.0 stworzyłem następującą klasę "WorkflowExecutor", która zezwoliła na asynchroniczne przepływy pracy w wątku GUI, włamując się do kontynuacji IEnumerable "return return", aby czekać na obserwowalne. Tak więc poniższy kod, na przycisku 1 Kliknij, po prostu uruchom prosty przepływ pracy, który aktualizuje tekst, czeka, aż klikniesz przycisk2, i pętle po 1 sekundzie.oczekiwanie na obserwowalne

public sealed partial class Form1 : Form { 
    readonly Subject<Unit> _button2Subject = new Subject<Unit>(); 
    readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor(); 

    public Form1() { 
     InitializeComponent(); 
    } 

    IEnumerable<IObservable<Unit>> CreateAsyncHandler() { 
     Text = "Initializing"; 
     var scheduler = new ControlScheduler(this); 
     while (true) { 
      yield return scheduler.WaitTimer(1000); 
      Text = "Waiting for Click"; 
      yield return _button2Subject; 
      Text = "Click Detected!"; 
      yield return scheduler.WaitTimer(1000); 
      Text = "Restarting"; 
     } 
    } 

    void button1_Click(object sender, EventArgs e) { 
     _workflowExecutor.Run(CreateAsyncHandler()); 
    } 

    void button2_Click(object sender, EventArgs e) { 
     _button2Subject.OnNext(Unit.Default); 
    } 

    void button3_Click(object sender, EventArgs e) { 
     _workflowExecutor.Stop(); 
    } 
} 

public static class TimerHelper { 
    public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) { 
     return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default); 
    } 
} 

public sealed class WorkflowExecutor { 
    IEnumerator<IObservable<Unit>> _observables; 
    IDisposable _subscription; 

    public void Run(IEnumerable<IObservable<Unit>> actions) { 
     _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator(); 
     Continue(); 
    } 

    void Continue() { 
     if (_subscription != null) { 
      _subscription.Dispose(); 
     } 
     if (_observables.MoveNext()) { 
      _subscription = _observables.Current.Subscribe(_ => Continue()); 
     } 
    } 

    public void Stop() { 
     Run(null); 
    } 
} 

Inteligentny część pomysłu, używając „plastyczności” kontynuacje zrobić asynchroniczną pracę, został wzięty od Daniela Earwicker za AsyncIOPipe pomysł: http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/, następnie dodałam bierną ram na wierzchu.

Teraz mam problem z przepisaniem tego przy użyciu funkcji async w C# 5.0, ale wydaje się, że powinno to być proste rzeczy do zrobienia. Kiedy konwertuję obserwowalne na zadania, działają one tylko raz, a pętla while ulega awarii po raz drugi. Każde ustalenie pomocy byłoby wspaniałe.

Wszystko, co powiedział/zapytał, co daje mi mechanizm async/await, że WorkflowExecutor nie? Czy jest coś, co mogę zrobić z async/oczekiwaniem, którego nie mogę zrobić (biorąc pod uwagę podobną ilość kodu) z WorkflowExecutor?

+0

Jak dokładnie zrobiłeś, że konwersja do 'Task's? Jak wygląda wygląd? – svick

+1

A 'await' ma wiele zalet w porównaniu do tego rodzaju asynchronii, ale jedną z dużych różnic jest to, że powracają z oczekujących. Na przykład. 'string s = await client.DownloadStringAsync (url);'. – svick

Odpowiedz

24

Jak zauważyłeś, Zadanie jest czymś bardzo jednorazowym, w przeciwieństwie do "strumienia wydarzeń" Observable. Dobrym sposobem myślenia o tym (IMHO) jest wykres 2x2 na Rx team's post about 2.0 Beta:

2x2 chart for task vs observable

zależności od okoliczności (jednorazowe vs. „strumień” zdarzeń), utrzymując Obserwowalne może więcej sensu.

Jeśli możesz wskoczyć do Reactive 2.0 Beta, możesz "poczekać" na obserwowalne z tym. Na przykład, moja własna próba na „asynchroniczny/czekają” (przybliżonej) wersji kodu będzie:

public sealed partial class Form1 : Form 
{ 
    readonly Subject<Unit> _button2Subject = new Subject<Unit>(); 

    private bool shouldRun = false; 

    public Form1() 
    { 
     InitializeComponent(); 
    } 

    async Task CreateAsyncHandler() 
    { 
     Text = "Initializing"; 
     while (shouldRun) 
     { 
      await Task.Delay(1000); 
      Text = "Waiting for Click"; 
      await _button2Subject.FirstAsync(); 
      Text = "Click Detected!"; 
      await Task.Delay(1000); 
      Text = "Restarting"; 
     } 
    } 

    async void button1_Click(object sender, EventArgs e) 
    { 
     shouldRun = true; 
     await CreateAsyncHandler(); 
    } 

    void button2_Click(object sender, EventArgs e) 
    { 
     _button2Subject.OnNext(Unit.Default); 
    } 

    void button3_Click(object sender, EventArgs e) 
    { 
     shouldRun = false; 
    } 
} 
+0

'Zadanie' jest jednorazowym użyciem, ale możesz" czekać "na rzeczy, które nie są' Zadaniami'.Tak więc powinno być możliwe stworzenie godnego uwagi, który może reprezentować cały "IObservable ", a nie tylko jeden element. – svick

+0

To właśnie zrobiłem w próbce kodu. Z Rx 2.0 możesz czekać na obserwowalne. Domyślnym zachowaniem jest zwracanie ostatniego elementu obserwowalnego, dlatego robi to FirstAsync –

22

Jak wspomniano James, można oczekiwać się > sekwencję IObservable < T zaczynając Rx v2.0 Beta . Zachowanie polega na zwrocie ostatniego elementu (przed OnCompleted) lub rzuceniu obserwowanego OnError. Jeśli sekwencja nie zawiera elementów, otrzymasz wyjątek InvalidOperationException.

Wskazówka stosując ten sposób można uzyskać wszystkie inne pożądane zachowania:

  • uzyskać pierwszy element o oczekiwaniu xs.FirstAsync()
  • zapewnienie tylko pojedyncza wartość przez oczekiwaniu xs.SingleAsync()
  • Kiedy jesteś w porządku z pustym sekwencji, czekają xs.DefaultIfEmpty()
  • Aby uzyskać wszystkie elementy, czekają xs.ToArray() lub() czekają xs.ToList

Można zrobić jeszcze bardziej wymyślne rzeczy, jak obliczanie wyniku agregacji jednak przestrzegać wartości pośrednie za pomocą zrobić i skanowania:

var xs = Observable.Range(0, 10, Scheduler.Default); 

var res = xs.Scan((x, y) => x + y) 
      .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); }); 

Console.WriteLine("Done! The sum is {0}", await res); 
+1

. Jest to informacja, której szukałem po zaskoczeniu, widząc w niedawnym projekcie, że oczekiwanie na IObservable jest w porządku. Dziękuję za udostępnienie. – jpierson