2015-11-25 16 views
17

Mam wyliczenie elementów (RunData.Demand), z których każda przedstawia niektóre prace związane z wywoływaniem API przez HTTP. Działa to świetnie, jeśli tylko przez cały czas będę je wywoływać i wywoływać API podczas każdej iteracji. Jednak każda iteracja trwa sekundę lub dwie, więc chciałbym uruchomić 2-3 wątki i podzielić pracę między nimi. Oto, co robię:Jak prawidłowo ustawiać kolejki zadań do uruchomienia w języku C#

ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads 
var tasks = RunData.Demand 
    .Select(service => Task.Run(async delegate 
    { 
     var availabilityResponse = await client.QueryAvailability(service); 
     // Do some other stuff, not really important 
    })); 

await Task.WhenAll(tasks); 

client.QueryAvailability wezwanie zasadzie wywołuje API przy użyciu klasy HttpClient:

public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request) 
{ 
    var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request); 

    if (response.IsSuccessStatusCode) 
    { 
     return await response.Content.ReadAsAsync<QueryAvailabilityResponse>(); 
    } 

    throw new HttpException((int) response.StatusCode, response.ReasonPhrase); 
} 

Działa to doskonale na chwilę, ale w końcu wszystko zaczyna odmierzanie. Jeśli ustawię limit czasu HttpClient na godzinę, wtedy zaczną się dziwne błędy wewnętrznego serwera.

Zacząłem od ustawienia stopera w metodzie QueryAvailability, aby sprawdzić, co się dzieje.

Co się dzieje, to 1200 pozycji w RunData.Demand są tworzone jednocześnie i wszystkie metody 1200 await client.PostAsJsonAsync są wywoływane. Wygląda na to, że używa 2 wątków do powolnego sprawdzania zadań, więc pod koniec mam zadania, które czekają na 9 lub 10 minut.

Oto zachowanie Chciałbym:

Chciałbym stworzyć 1200 zadań, a następnie uruchomić je 3-4 naraz stają się dostępne wątki. Robię , a nie chcę od razu kolejkować 1 200 połączeń HTTP.

Czy jest dobry sposób na robienie tego?

+0

Nie wydaje się, aby utworzyć nowego 'klienta' dla każdego połączenia. Wiesz, że 'System.Net.Http.HttpClient' nie jest wątkiem bezpiecznym dla wywołań instancji? Dla każdego wywołania należy utworzyć nową instancję dla (i usuwać po). – Enigmativity

+0

Metoda 'QueryAvailability' jest faktycznie w klasie, która tworzy' HttpClient', który jest prywatnym członkiem tej instancji. Nie wiedziałem, że to nie jest wątek bezpieczne, mogę definitywnie utworzyć go przed każdym wywołaniem. Przyjrzę się temu więcej, dzięki! –

+0

Hmm, zrobiłem trochę badań i wygląda na to, że to, co robię, jest bezpieczne dla wątków. Zobacz [tutaj] (http://stackoverflow.com/questions/11178220/is-httpclient-safe-to-use-concrentrently) i [tutaj] (http://www.tomdupont.net/2014/11/net- 45-httpclient-is-thread-safe.html) –

Odpowiedz

17

Jak zawsze polecam .. to czego potrzebujesz to TPL Dataflow (do instalacji: Install-Package Microsoft.Tpl.Dataflow).

Tworzysz ActionBlock z akcją do wykonania na każdym przedmiocie. Ustaw MaxDegreeOfParallelism dla dławienia. Rozpocząć delegowania do niego i czeka na jego zakończenie:

var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service => 
{ 
    var availabilityResponse = await client.QueryAvailability(service); 
    // ... 
}, 
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); 

foreach (var service in RunData.Demand) 
{ 
    block.Post(service); 
} 

block.Complete(); 
await block.Completion; 
+0

To również wygląda obiecująco i wygląda na to, że działa dobrze z metodami "asynchronicznymi" (np. Mogę "czekać" na ukończenie bloku). Dam to strzał. –

+0

@MikeChristensen to jest. Jest to jedna z niewielu bibliotek w .Net, które zostały napisane specjalnie z myślą o oczekiwaniu na asynchronizm. – i3arnon

+2

Działa bezbłędnie! Jestem teraz fanem. Akceptacja. –

3

Używasz połączeń asynchronicznych HTTP, więc ograniczenie liczby wątków nie pomoże (nie będzie ParallelOptions.MaxDegreeOfParallelism w Parallel.ForEach jako jedna z odpowiedzi wskazuje). Nawet pojedynczy wątek może inicjować wszystkie żądania i przetwarzać wyniki po ich otrzymaniu.

Jednym ze sposobów rozwiązania tego problemu jest wykorzystanie przepływu danych TPL.

Innym miłym rozwiązaniem jest podzielenie źródło IEnumerable do przegród i elementów procesowych w każdym sekwencyjnie działowej, jak opisano w this blog post:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll(
     from partition in Partitioner.Create(source).GetPartitions(dop) 
     select Task.Run(async delegate 
     { 
      using (partition) 
       while (partition.MoveNext()) 
        await body(partition.Current); 
     })); 
} 
3

Choć Dataflow biblioteka jest super, myślę, że to trochę ciężkie, gdy nie jest używany skład bloku. Chciałbym użyć czegoś podobnego do metody rozszerzenia poniżej.

Również, w przeciwieństwie do metody Partitioner, uruchamia metody asynchroniczne w kontekście wywołującym - co jest zastrzeżeniem, że jeśli twój kod nie jest prawdziwie asynchroniczny lub przyjmuje "szybką ścieżkę", to będzie skutecznie działał synchronicznie, ponieważ nie ma wątków są jawnie tworzone.

public static async Task RunParallelAsync<T>(this IEnumerable<T> items, Func<T, Task> asyncAction, int maxParallel) 
{ 
    var tasks = new List<Task>(); 

    foreach (var item in items) 
    { 
     tasks.Add(asyncAction(item)); 

     if (tasks.Count < maxParallel) 
       continue; 

     var notCompleted = tasks.Where(t => !t.IsCompleted).ToList(); 

     if (notCompleted.Count >= maxParallel) 
      await Task.WhenAny(notCompleted); 
    } 

    await Task.WhenAll(tasks); 
} 
3

Stare pytanie, ale chciałbym zaproponować alternatywną lekkie rozwiązanie przy użyciu klasy SemaphoreSlim. Wystarczy odwołać się do System.Threading.

SemaphoreSlim sem = new SemaphoreSlim(4,4); 

foreach (var service in RunData.Demand) 
{ 

    await sem.WaitAsync(); 
    Task t = Task.Run(async() => 
    { 
     var availabilityResponse = await client.QueryAvailability(serviceCopy));  
     // do your other stuff here with the result of QueryAvailability 
    } 
    t.ContinueWith(sem.Release()); 
} 

Semafor działa jak mechanizm blokujący. Możesz wejść do semafora tylko przez wywołanie Czekaj (WaitAsync), które odejmuje jeden od licznika. Wywołanie wywołania dodaje jeden do liczby.

+1

Jeśli dobrze rozumiem, w zależności od tego, w jaki sposób się to stosuje, może działać dobrze z 'sem.Wait()' zamiast 'czekać na sem.WaitASync()'. Spowoduje to zablokowanie wątku wywołującego, więc nie powinno się tego robić w wątku interfejsu użytkownika, ale w dowolnym innym wątku, może to być najprostszy sposób zarządzania pracą do wykonania. W szczególności, jeśli jeden z 4 sektorów jest dostępny, nastąpi to natychmiast. Jeśli nie, czeka, aż będzie dostępna. – ToolmakerSteve

Powiązane problemy