2012-04-21 22 views
7

Muszę zaimplementować mechanizm dławiący (żądania na sekundę) podczas korzystania z HttpWebRequest do wysyłania równoległych żądań do jednego serwera aplikacji. Moja aplikacja C# musi wystawiać nie więcej niż 80 żądań na sekundę na serwer zdalny. Limit narzucany jest przez administratorów usług zdalnych nie jako twardy limit, ale jako "SLA" między moją platformą a ich usługami.Jak ograniczyć liczbę HttpWebRequest na sekundę do serwera WWW?

Jak kontrolować liczbę żądań na sekundę podczas korzystania z HttpWebRequest?

Odpowiedz

3

Miałem ten sam problem i nie mogłem znaleźć gotowego rozwiązania, więc zrobiłem je i oto jest. Chodzi o to, aby do dodawania elementów wymagających przetwarzania i używania rozszerzeń reaktywnych do subskrybowania za pomocą procesora o ograniczonej szybkości, należy użyć elementu BlockingCollection<T>.

przepustnicy klasa jest przemianowany wersja this rate limiter

public static class BlockingCollectionExtensions 
{ 
    // TODO: devise a way to avoid problems if collection gets too big (produced faster than consumed) 
    public static IObservable<T> AsRateLimitedObservable<T>(this BlockingCollection<T> sequence, int items, TimeSpan timePeriod, CancellationToken producerToken) 
    { 
     Subject<T> subject = new Subject<T>(); 

     // this is a dummyToken just so we can recreate the TokenSource 
     // which we will pass the proxy class so it can cancel the task 
     // on disposal 
     CancellationToken dummyToken = new CancellationToken(); 
     CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(producerToken, dummyToken); 

     var consumingTask = new Task(() => 
     { 
      using (var throttle = new Throttle(items, timePeriod)) 
      { 
       while (!sequence.IsCompleted) 
       { 
        try 
        { 
         T item = sequence.Take(producerToken); 
         throttle.WaitToProceed(); 
         try 
         { 
          subject.OnNext(item); 
         } 
         catch (Exception ex) 
         { 
          subject.OnError(ex); 
         } 
        } 
        catch (OperationCanceledException) 
        { 
         break; 
        } 
       } 
       subject.OnCompleted(); 
      } 
     }, TaskCreationOptions.LongRunning); 

     return new TaskAwareObservable<T>(subject, consumingTask, tokenSource); 
    } 

    private class TaskAwareObservable<T> : IObservable<T>, IDisposable 
    { 
     private readonly Task task; 
     private readonly Subject<T> subject; 
     private readonly CancellationTokenSource taskCancellationTokenSource; 

     public TaskAwareObservable(Subject<T> subject, Task task, CancellationTokenSource tokenSource) 
     { 
      this.task = task; 
      this.subject = subject; 
      this.taskCancellationTokenSource = tokenSource; 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      var disposable = subject.Subscribe(observer); 
      if (task.Status == TaskStatus.Created) 
       task.Start(); 
      return disposable; 
     } 

     public void Dispose() 
     { 
      // cancel consumption and wait task to finish 
      taskCancellationTokenSource.Cancel(); 
      task.Wait(); 

      // dispose tokenSource and task 
      taskCancellationTokenSource.Dispose(); 
      task.Dispose(); 

      // dispose subject 
      subject.Dispose(); 
     } 
    } 
} 

testów jednostkowych:

class BlockCollectionExtensionsTest 
{ 
    [Fact] 
    public void AsRateLimitedObservable() 
    { 
     const int maxItems = 1; // fix this to 1 to ease testing 
     TimeSpan during = TimeSpan.FromSeconds(1); 

     // populate collection 
     int[] items = new[] { 1, 2, 3, 4 }; 
     BlockingCollection<int> collection = new BlockingCollection<int>(); 
     foreach (var i in items) collection.Add(i); 
     collection.CompleteAdding(); 

     IObservable<int> observable = collection.AsRateLimitedObservable(maxItems, during, CancellationToken.None); 
     BlockingCollection<int> processedItems = new BlockingCollection<int>(); 
     ManualResetEvent completed = new ManualResetEvent(false); 
     DateTime last = DateTime.UtcNow; 
     observable 
      // this is so we'll receive exceptions 
      .ObserveOn(new SynchronizationContext()) 
      .Subscribe(item => 
       { 
        if (item == 1) 
         last = DateTime.UtcNow; 
        else 
        { 
         TimeSpan diff = (DateTime.UtcNow - last); 
         last = DateTime.UtcNow; 

         Assert.InRange(diff.TotalMilliseconds, 
          during.TotalMilliseconds - 30, 
          during.TotalMilliseconds + 30); 
        } 
        processedItems.Add(item); 
       }, 
       () => completed.Set() 
      ); 
     completed.WaitOne(); 
     Assert.Equal(items, processedItems, new CollectionEqualityComparer<int>()); 
    } 
} 
+0

coś złego stało się z adresem URL –

-1

Mój oryginalny wpis omawiał, jak dodać mechanizm dławiący do WCF poprzez rozszerzenia zachowania klienta, ale potem zauważono, że źle odczytałem pytanie (doh!).

Podejście można ogólnie sprawdzić w klasie, która określa, czy naruszamy limit stawki, czy też nie. Było już dużo dyskusji na temat sprawdzania naruszeń stawek.

Throttling method calls to M requests in N seconds

Jeżeli naruszenie limitu szybkości, potem spać odstępach naprawić i sprawdzić ponownie. Jeśli nie, należy wykonać wywołanie HttpWebRequest.

+0

W pytaniu nie odnoszę się do usługa WCF. Chodzi o proste użycie klasy HttpWebRequest. –

+0

Ah jest za późno i powinienem był przeczytać to pytanie jeszcze raz :) Nadal możesz spróbować podejść przed wykonaniem połączenia do HttpWebRequest, sprawdź z inną klasą, aby upewnić się, że nie naruszysz 80 żądań na sekundę. Zaktualizuję mój kod powyżej. –

+0

Prosi o C#, a nie Javę. – SmallChess

0

Metody rozszerzania przepustnicy() i próbki() pozwalają na regulację szybkiej sekwencji zdarzeń w "wolniejszą" sekwencję.

Here is a blog post with an example z Sample(Timespan), która zapewnia maksymalną szybkość.

+0

Problem z Sample() i Throttle() polega na tym, że pomijają/wyrzucają próbki w celu osiągnięcia określonej szybkości. – georgiosd

Powiązane problemy