2013-02-15 13 views
10

Mam sposób asynchroniczny źródłowe takiego:Jak używać "Gdzie" z predykatem asynchronicznym?

private async Task<bool> MeetsCriteria(Uri address) 
{ 
    //Do something involving awaiting an HTTP request. 
} 

że mam kolekcję Uri S:

var addresses = new[] 
{ 
    new Uri("http://www.google.com/"), 
    new Uri("http://www.stackoverflow.com/") //etc. 
}; 

Chcę filtrować addresses użyciu MeetsCriteria. Chcę to zrobić asynchronicznie; Chcę, aby wiele wywołań do predykatu było uruchamianych asynchronicznie, a ja chcę poczekać, aż wszystkie z nich zakończą i wygenerują przefiltrowany zestaw wyników. Niestety, LINQ nie wydaje się wspierać asynchronicznych predykaty, więc coś w tym nie pracy:

var filteredAddresses = addresses.Where(MeetsCriteria); 

istnieje podobnie wygodny sposób to zrobić?

+2

Czego można się spodziewać, gdyby zostało to zasłonięte? Zwłaszcza podczas iteracji 'filtersAddresses', która jest po wywołaniu' MeetsCriteria'. –

+0

@DanielHilgarth: Dzięki; trafne spostrzeżenie. To naprawdę nie pasuje do LINQ. – Sam

Odpowiedz

6

Myślę, że jednym z powodów, dla niczego, jak to jest w ramach jest, że istnieje wiele możliwych wariantów, a każdy wybór będzie w sam raz pod pewnymi warunkami:

  • Gdyby predykaty wykonać równolegle, lub w serii?
    • Jeśli wykonywane są równolegle, czy wszystkie powinny zostać wykonane jednocześnie, czy też powinien być ograniczony stopień równoległości?
    • Jeśli są wykonywane równolegle, czy wyniki powinny być w tej samej kolejności, co w kolekcji oryginalnej, w kolejności jej wykonywania lub w niezdefiniowanej kolejności?
      • Jeśli powinny być zwrócone w kolejności ukończenia, czy istnieje jakiś sposób, aby (asynchronicznie) uzyskać wyniki po ich ukończeniu? (Będzie to wymagać zmiany typu powrocie z Task<IEnumerable<T>> do czegoś innego.)

Mówiłeś chcesz predykaty wykonać równolegle. W tym przypadku najprostszym rozwiązaniem jest wykonać je wszystkie na raz i zwrócić je w kolejności wykonania:

static async Task<IEnumerable<T>> Where<T>(
    this IEnumerable<T> source, Func<T, Task<bool>> predicate) 
{ 
    var results = new ConcurrentQueue<T>(); 
    var tasks = source.Select(
     async x => 
     { 
      if (await predicate(x)) 
       results.Enqueue(x); 
     }); 
    await Task.WhenAll(tasks); 
    return results; 
} 

Można wtedy wykorzystać go tak:

var filteredAddresses = await addresses.Where(MeetsCriteria); 
+1

Użyłbym innej nazwy metody, więc różne semantyki (w szczególności ponowne zamówienie) stają się jasne. – CodesInChaos

+0

@CodesInChaos Tak, możliwe, ale nie jestem pewien, co byłoby dobrym imieniem. 'AsyncParallelWhereOrderedByCompletion()' opisuje, co robi ta metoda, ale jest to straszna nazwa. – svick

+0

Być może nadają się nazwy takie jak 'WspółbieżnieFilterAsync'. – Sam

5

Pierwsze podejście: problem wszystko prosi jeden po drugim, a następnie czeka, aż wszystkie żądania wrócą, a następnie odfiltruje wynik. (Kod svick też to zrobił, ale tutaj robię to bez pośredniego ConcurrentQueue).

// First approach: massive fan-out 
var tasks = addresses.Select(async a => new { A = a, C = await MeetsCriteriaAsync(a) }); 
var addressesAndCriteria = await Task.WhenAll(tasks); 
var filteredAddresses = addressAndCriteria.Where(ac => ac.C).Select(ac => ac.A); 

Drugie podejście: wykonywać żądania jeden po drugim. To potrwa dłużej, ale będzie to upewnij się, aby nie uderzać się usługa z ogromnym naporem żądań (zakładając, że MeetsCriteriaAsync wychodzi do usługa ...)

// Second approach: one by one 
var filteredAddresses = new List<Uri>(); 
foreach (var a in filteredAddresses) 
{ 
    if (await MeetsCriteriaAsync(a)) filteredAddresses.Add(a); 
} 

Trzecie podejście: jak na sekundę, ale przy użyciu hipotetyczna funkcja C# 8 "strumienie asynchroniczne". C# 8 nie jest jeszcze dostępny, a strumienie asynchroniczne nie są jeszcze zaprojektowane, ale możemy marzyć! Typ IAsyncEnumerable już istnieje w RX i mam nadzieję, że dodadzą do niego więcej kombinatorów.Zaletą IAsyncEnumerable jest to, że możemy zacząć zużywać kilka pierwszych przefiltrowanych adresów, gdy tylko nadejdą, zamiast czekać, aż wszystko zostanie najpierw przefiltrowane.

// Third approach: ??? 
IEnumerable<Uri> addresses = {...}; 
IAsyncEnumerable<Uri> filteredAddresses = addresses.WhereAsync(MeetsCriteriaAsync); 

czwarte podejście: może nie chcemy wbijać się usługa z wszystkich żądań naraz, ale jesteśmy szczęśliwi wydać więcej niż jeden wniosek na raz. Może przeprowadziliśmy eksperymenty i okazało się, że "trzy na raz" były szczęśliwym medium. UWAGA: ten kod zakłada jedno-wątkowy kontekst wykonania, na przykład w programowaniu interfejsu użytkownika lub ASP.NET. Jeśli jest uruchamiany w wielowątkowym kontekście wykonania, potrzebuje zamiast tego ConcurrentQueue i ConcurrentList.

// Fourth approach: throttle to three-at-a-time requests 
var addresses = new Queue<Uri>(...); 
var filteredAddresses = new List<Uri>(); 
var worker1 = FilterAsync(addresses, filteredAddresses); 
var worker2 = FilterAsync(addresses, filteredAddresses); 
var worker3 = FilterAsync(addresses, filteredAddresses); 
await Task.WhenAll(worker1, worker2, worker3); 

async Task FilterAsync(Queue<Uri> q, List<Uri> r) 
{ 
    while (q.Count > 0) 
    { 
    var item = q.Dequeue(); 
    if (await MeetsCriteriaAsync(item)) r.Add(item); 
    } 
} 

Istnieją sposoby na czwarte podejście z wykorzystaniem biblioteki przepływu danych TPL.

Powiązane problemy