2013-06-11 29 views
6

Próbuję użyć rozszerzeń reaktywnych (Rx), aby buforować wyliczanie zadań po ich zakończeniu. Czy ktoś wie, czy jest to czysty wbudowany sposób robienia tego? Metoda rozszerzenia ToObservable po prostu sprawi, że IObservable<Task<T>>, co nie jest tym, czego chcę, chcę IObservable<T>, że mogę następnie użyć Buffer na.Konwertuj IEnumerable <Task<T>> do IObservable <T>

wymyślony przykład:

//Method designed to be awaitable 
public static Task<int> makeInt() 
{ 
    return Task.Run(() => 5); 
} 

//In practice, however, I don't want to await each individual task 
//I want to await chunks of them at a time, which *should* be easy with Observable.Buffer 
public static void Main() 
{ 
    //Make a bunch of tasks 
    IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt()); 

    //Is there a built in way to turn this into an Observable that I can then buffer? 
    IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //???? 

    buffered.Subscribe(ints => { 
     Console.WriteLine(ints.Count()); //Should be 15 
    }); 
} 
+0

http://stackoverflow.com/questions/13500456/how-to-convert-an-ienumerabletaskt-to-iobservablet –

Odpowiedz

7

można wykorzystywać fakt, że Task można przekształcić do zaobserwowania przy użyciu another overload of ToObservable().

Gdy masz kolekcję obserwowanych obiektów (pojedynczej pozycji), możesz utworzyć pojedynczą obserwację, która zawiera elementy po ich wypełnieniu przy użyciu Merge().

Tak, kod może wyglądać następująco:

futureInts.Select(t => t.ToObservable()) 
      .Merge() 
      .Buffer(15) 
      .Subscribe(ints => Console.WriteLine(ints.Count)); 
Powiązane problemy