2012-06-13 13 views
28

, przy użyciu Reactive Extensions, chcę zignorować wiadomości pochodzących z mojego strumienia zdarzeń, które występują podczas, gdy moja metoda Subscribe jest uruchomiony. To znaczy. Czasami zajmuje mi to dłużej przetwarzanie wiadomości niż czasu między wiadomościami, więc chcę upuścić wiadomości, których nie mam czasu na przetwarzanie.W przypadku Rx, jak zignorować wszystkie-z wyjątkiem-najnowszą wartość, gdy moja metoda subskrypcji jest uruchomiony

Jednak po zakończeniu mojej metody Subscribe, jeśli pojawiły się jakieś komunikaty, chcę przetworzyć ostatnią. Dlatego zawsze przetwarzam najnowszą wiadomość.

Więc, jeśli mam jakiś kod, który robi:

messages.OnNext(100); 
messages.OnNext(1); 
messages.OnNext(2); 

i jeśli założymy „100” zajmuje dużo czasu, aby proces. Następnie chcę, aby "2" było przetwarzane po zakończeniu "100". "1" powinno być ignorowane, ponieważ zostało zastąpione przez "2", podczas gdy "100" było nadal przetwarzane.

Oto przykład wynik chcę używając zadanie tła i Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100)); 

Task.Factory.StartNew(() => 
{ 
    foreach(var n in messages.Latest()) 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(250)); 
     Console.WriteLine(n); 
    } 
}); 

Jednakże najnowsze() jest wezwaniem blokowanie i wolałbym nie mieć gwint siedzi czekając na kolejne wartości w ten sposób (czasami będą występować bardzo długie przerwy między wiadomościami).

mogę również uzyskać wynik chcę za pomocą BroadcastBlock z TPL Dataflow, takiego:

var buffer = new BroadcastBlock<long>(n => n); 
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n)); 

buffer.AsObservable() 
    .Subscribe(n => 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(250)); 
     Console.WriteLine(n); 
    }); 

ale czuje się jak powinno być możliwe bezpośrednio w Rx. Jaki jest najlepszy sposób na zrobienie tego?

+0

Brzmi jak zadanie dla Window (), chociaż ktoś może zaproponować prostsze rozwiązanie. –

+0

Twoje wydarzenia muszą być generowane niezależnie od subskrypcji. –

Odpowiedz

3

Dzięki Lee Campbell (z Intro To Rx FAME), mam teraz roztwór roboczy przy użyciu tej metody rozszerzenia:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler) 
{ 
    return Observable.Create<T>(observer => 
    { 
     Notification<T> outsideNotification = null; 
     var gate = new object(); 
     bool active = false; 
     var cancelable = new MultipleAssignmentDisposable(); 
     var disposable = source.Materialize().Subscribe(thisNotification => 
     { 
      bool alreadyActive; 
      lock (gate) 
      { 
       alreadyActive = active; 
       active = true; 
       outsideNotification = thisNotification; 
      } 

      if (!alreadyActive) 
      { 
       cancelable.Disposable = scheduler.Schedule(self => 
       { 
        Notification<T> localNotification = null; 
        lock (gate) 
        { 
         localNotification = outsideNotification; 
         outsideNotification = null; 
        } 
        localNotification.Accept(observer); 
        bool hasPendingNotification = false; 
        lock (gate) 
        { 
         hasPendingNotification = active = (outsideNotification != null); 
        } 
        if (hasPendingNotification) 
        { 
         self(); 
        } 
       }); 
      } 
     }); 
     return new CompositeDisposable(disposable, cancelable); 
    }); 
} 
+0

Jaki jest cel "materializowania" i używania "powiadomień" a samo przechowywanie samej wartości? Z moich testów wynika, że ​​działa zgodnie z oczekiwaniami, aby śledzić samą wartość - ale być może brakuje mi podstaw. –

+2

@AndrewHanlon za pomocą powiadomienia zamiast samej wartości służy do obsługi wyjątków, w przeciwnym razie nie zostaną one poprawnie przekazane do kanału OnError. – Wilka

+0

Ach, to ma sens! Dziękuję Ci. –

3

Oto próba użycia "tylko" Rx. Licznik czasu i subskrybent są utrzymywane niezależnie przez obserwację w wątku i użyłem podmiotu do wyrażenia opinii na temat wykonania zadania.

Nie sądzę, że jest to proste rozwiązanie, ale mam nadzieję, że może dać ci pomysły na ulepszenia.

messages. 
    Buffer(() => feedback). 
    Select(l => l.LastOrDefault()). 
    ObserveOn(Scheduler.ThreadPool). 
    Subscribe(n => 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(250)); 
     Console.WriteLine(n); 
     feedback.OnNext(Unit.Default); 
    }); 

feedback.OnNext(Unit.Default); 

Jest jeden drobny problem - bufor jest najpierw zamknięty, gdy jest pusty, więc generuje wartość domyślną. Prawdopodobnie możesz rozwiązać ten problem, wysyłając opinię po pierwszej wiadomości.


Tu jest jako funkcja rozszerzenia:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action) 
{ 
    var feedback = new Subject<Unit>(); 

    var sub = source. 
     Buffer(() => feedback). 
     ObserveOn(Scheduler.ThreadPool). 
     Subscribe(l => 
     { 
      action(l.LastOrDefault()); 
      feedback.OnNext(Unit.Default); 
     }); 

    feedback.OnNext(Unit.Default); 

    return sub; 
} 

i użytkowania:

messages.SubscribeWithoutOverlap(n => 
    { 
     Thread.Sleep(1000); 
     Console.WriteLine(n); 
    }); 
+0

Nie chcesz użyć 'LastOrDefault' zamiast' FirstOrDefault'? – yamen

+0

@yamen Prawdopodobnie jest to rozsądne. –

8

Oto metoda, która jest podobna do Dave ale używa Sample zamiast (co jest bardziej odpowiednie niż bufor). Dołączyłem podobną metodę rozszerzenia do tej, którą dodałem do odpowiedzi Dave'a.

Rozszerzenie:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action) 
{ 
    var sampler = new Subject<Unit>(); 

    var sub = source. 
     Sample(sampler). 
     ObserveOn(Scheduler.ThreadPool). 
     Subscribe(l => 
     { 
      action(l); 
      sampler.OnNext(Unit.Default); 
     }); 

    // start sampling when we have a first value 
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default)); 

    return sub; 
} 

Należy pamiętać, że jest to prostsze, a nie ma „pusty” bufor, który jest zwolniony. Pierwszy element wysyłany do akcji faktycznie pochodzi z samego strumienia.

Użycie jest bardzo proste:

messages.SubscribeWithoutOverlap(n => 
{ 
    Console.WriteLine("start: " + n); 
    Thread.Sleep(500); 
    Console.WriteLine("end: " + n); 
}); 

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing 

i wyniki:

source: 0 
start: 0 
source: 1 
source: 2 
source: 3 
source: 4 
source: 5 
end: 0 
start: 5 
source: 6 
source: 7 
source: 8 
source: 9 
source: 10 
end: 5 
start: 10 
source: 11 
source: 12 
source: 13 
source: 14 
source: 15 
end: 10 
+3

Ma to problem polegający na tym, że jeśli źródło nie umieściło niczego w buforze próbki w punkcie, w którym znajduje się sampler.OnNext, system przechodzi do stanu, w którym nie będzie generował więcej wartości. Zrobiłem wariację na ten temat, używając Switch zamiast przykładowego http://stackoverflow.com/a/15876519/158285 – bradgonesurfing

+0

nie powinien zwracać * IDisposable * również dbać o pozbycie się wewnętrznego * Temat *? – superjos

1

oto realizacja Task bazie, z semantyką Odwołanie, które nie korzystają z tematu. Wywołanie dyspozycji umożliwia zasubskrybowanym działaniom anulowanie przetwarzania, jeśli jest to konieczne.

public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action) 
    { 
     var cancellation = new CancellationDisposable(); 
     var token = cancellation.Token; 
     Task task = null; 

     return new CompositeDisposable(
      cancellation, 
      observable.Subscribe(value => 
      { 
       if (task == null || task.IsCompleted) 
        task = Task.Factory.StartNew(() => action(value, token), token); 
      }) 
     ); 
    } 

Oto prosty test:

Observable.Interval(TimeSpan.FromMilliseconds(150)) 
         .SampleSubscribe((v, ct) => 
         { 
          //cbeck for cancellation, do work 
          for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++) 
           Thread.Sleep(100); 

          Console.WriteLine(v); 
         }); 

Wyjście:

0 
7 
14 
21 
28 
35 
1

Z Rx 2.0 RC można użyć Chunkify uzyskać IEnumerable list, z których każda zawiera co było obserwowane od ostatni tekst MoveNext.

Następnie można użyć ToObservable, aby przekonwertować to z powrotem do IObservable i zwracać uwagę tylko na ostatni wpis na każdej niepustej liście.

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100)); 

messages.Chunkify() 
     .ToObservable(Scheduler.TaskPool) 
     .Where(list => list.Any()) 
     .Select(list => list.Last()) 
     .Subscribe(n => 
     { 
      Thread.Sleep(TimeSpan.FromMilliseconds(250)); 
      Console.WriteLine(n); 
     }); 
+2

To działa, ale pozostawia obracanie się wątku, aby wyciągnąć rzeczy z obserwowalnego (więc jeden z moich procesorów zostanie wyczerpany) – Wilka

+0

I buduje listę pełną wartości, które zignorujesz. Rozszerzenie ObserveLatestOn pozwala tego uniknąć - brak listy, brak przydziału z rosnącej listy, brak referencji utrzymujących przy życiu stare powiadomienia. –

2

Przykład użycia funkcji Observable.Switch. Obsługuje także przypadek po ukończeniu zadania, ale w kolejce nie ma nic innego, jak .

using System.Reactive.Linq; 
using System.Reactive.Subjects; 
using System.Reactive.Concurrency; 
using System.Reactive.Disposables; 

namespace System.Reactive 
{ 
    public static class RXX 
    { 
     public static IDisposable SubscribeWithoutOverlap<T> 
     (this IObservable<T> source 
     , Action<T> action 
     , IScheduler scheduler = null) 
     { 
      var sampler = new Subject<Unit>(); 
      scheduler = scheduler ?? Scheduler.Default; 
      var p = source.Publish(); 
      var connection = p.Connect(); 

      var subscription = sampler.Select(x=>p.Take(1)) 
       .Switch() 
       .ObserveOn(scheduler) 
       .Subscribe(l => 
       { 
        action(l); 
        sampler.OnNext(Unit.Default); 
       }); 

      sampler.OnNext(Unit.Default); 

      return new CompositeDisposable(connection, subscription); 
     } 
    } 
} 
+0

Właśnie zauważyłem, że to może spudłować wartości. To znaczy. nie zawsze przetwarza najnowsze pola wartości w kolejce, gdy już coś robi. na przykład https://gist.github.com/WilkaH/5403360 drukuje tylko "Zrobione 100", a nie "Zrobione 2" później (1 powinno zostać usunięte, ponieważ zostało zastąpione) – Wilka

+0

Powinno zignorować elementy, które lądują w kolejce, gdy jest aktualnie przetwarzanie. Nie jestem pewny co masz na myśli. – bradgonesurfing

+0

W takim przypadku nie wyjaśniłem tego w moim pierwotnym pytaniu. Zawsze chcę, aby najnowszy element był przetwarzany, więc jeśli przychodzi, gdy coś innego jest przetwarzane, to ten element powinien zostać przetworzony po zakończeniu bieżącego (zamiast pominięcia). – Wilka

3

pisałem na blogu o tym roztworem, który używa CAS zamiast zamków i unika rekursji. Kod jest poniżej, ale można znaleźć kompletne wyjaśnienie tutaj: http://www.zerobugbuild.com/?p=192

public static IObservable<TSource> ObserveLatestOn<TSource>(
    this IObservable<TSource> source, 
    IScheduler scheduler) 
{ 
    return Observable.Create<TSource>(observer => 
    { 
     Notification<TSource> pendingNotification = null; 
     var cancelable = new MultipleAssignmentDisposable(); 

     var sourceSubscription = source.Materialize() 
      .Subscribe(notification => 
      { 
       var previousNotification = Interlocked.Exchange(
        ref pendingNotification, notification); 

       if (previousNotification != null) return; 

       cancelable.Disposable = scheduler.Schedule(() => 
        { 
         var notificationToSend = Interlocked.Exchange(
          ref pendingNotification, null); 
         notificationToSend.Accept(observer); 
        }); 
      }); 
      return new CompositeDisposable(sourceSubscription, cancelable); 
    }); 
} 
2

Tylko skończony (i już całkowicie zmienione) moje własne rozwiązanie problemu, który mam zamiar wykorzystać w produkcji.

ile planista używa bieżącego wątku, wzywa do OnNext, OnCompleted, OnError ze źródła powinna wrócić natychmiast; jeśli obserwator jest zajęty poprzednimi powiadomieniami, przechodzi do kolejki o określonym, maksymalnym rozmiarze, skąd będą powiadamiani za każdym razem, gdy poprzednie zgłoszenie zostanie przetworzone. Jeśli kolejka się zapełni, najmniej najnowsze elementy zostaną odrzucone. Zatem, maksymalny rozmiar kolejki wynoszący 0 powoduje ignorowanie wszystkich elementów wchodzących, gdy obserwator jest zajęty; rozmiar 1 pozwoli zawsze obserwować najnowszy przedmiot; rozmiar do int.MaxValue powoduje, że konsument jest zajęty, dopóki nie dogoni producenta.

Jeśli program planujący obsługuje długie działanie (tzn. Udostępnia własny wątek), planuję pętlę, aby powiadomić obserwatora; w przeciwnym razie korzystam z planowania rekurencyjnego.

Oto kod. Wszelkie komentarze są mile widziane.

partial class MoreObservables 
{ 
    /// <summary> 
    /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process. 
    /// </summary> 
    /// <param name="source">The source sequence.</param> 
    /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param> 
    /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param> 
    /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> 
    /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception> 
    /// <remarks> 
    /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready. 
    /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any. 
    /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>. 
    /// </remarks> 
    public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null) 
    { 
     if (source == null) throw new ArgumentNullException(nameof(source)); 
     if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize)); 
     if (scheduler == null) scheduler = Scheduler.Default; 

     return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer)); 
    } 

    private static class LatestImpl<TSource> 
    { 
     public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer) 
     { 
      if (observer == null) throw new ArgumentNullException(nameof(observer)); 

      var longrunningScheduler = scheduler.AsLongRunning(); 
      if (longrunningScheduler != null) 
       return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer); 

      return new RecursiveSubscription(source, maxQueueSize, scheduler, observer); 
     } 

     #region Subscriptions 

     /// <summary> 
     /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop. 
     /// </summary> 
     private sealed class LoopSubscription : IDisposable 
     { 
      private enum State 
      { 
       Idle, // nothing to notify 
       Head, // next notification is in _head 
       Queue, // next notifications are in _queue, followed by _completion 
       Disposed, // disposed 
      } 

      private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable(); 
      private readonly IObserver<TSource> _observer; 
      private State _state; 
      private TSource _head; // item in front of the queue 
      private IQueue _queue; // queued items 
      private Notification<TSource> _completion; // completion notification 

      public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer) 
      { 
       _observer = observer; 
       _queue = Queue.Create(maxQueueSize); 
       scheduler.ScheduleLongRunning(_ => Loop()); 
       _subscription.Disposable = source.Subscribe(
        OnNext, 
        error => OnCompletion(Notification.CreateOnError<TSource>(error)), 
        () => OnCompletion(Notification.CreateOnCompleted<TSource>())); 
      } 

      private void OnNext(TSource value) 
      { 
       lock (_subscription) 
       { 
        switch (_state) 
        { 
         case State.Idle: 
          _head = value; 
          _state = State.Head; 
          Monitor.Pulse(_subscription); 
          break; 
         case State.Head: 
         case State.Queue: 
          if (_completion != null) return; 
          try { _queue.Enqueue(value); } 
          catch (Exception error) // probably OutOfMemoryException 
          { 
           _completion = Notification.CreateOnError<TSource>(error); 
           _subscription.Dispose(); 
          } 
          break; 
        } 
       } 
      } 

      private void OnCompletion(Notification<TSource> completion) 
      { 
       lock (_subscription) 
       { 
        switch (_state) 
        { 
         case State.Idle: 
          _completion = completion; 
          _state = State.Queue; 
          Monitor.Pulse(_subscription); 
          _subscription.Dispose(); 
          break; 
         case State.Head: 
         case State.Queue: 
          if (_completion != null) return; 
          _completion = completion; 
          _subscription.Dispose(); 
          break; 
        } 
       } 
      } 

      public void Dispose() 
      { 
       lock (_subscription) 
       { 
        if (_state == State.Disposed) return; 

        _head = default(TSource); 
        _queue = null; 
        _completion = null; 
        _state = State.Disposed; 
        Monitor.Pulse(_subscription); 
        _subscription.Dispose(); 
       } 
      } 

      private void Loop() 
      { 
       try 
       { 
        while (true) // overall loop for all notifications 
        { 
         // next notification to emit 
         Notification<TSource> completion; 
         TSource next; // iff completion == null 

         lock (_subscription) 
         { 
          while (true) 
          { 
           while (_state == State.Idle) 
            Monitor.Wait(_subscription); 

           if (_state == State.Head) 
           { 
            completion = null; 
            next = _head; 
            _head = default(TSource); 
            _state = State.Queue; 
            break; 
           } 
           if (_state == State.Queue) 
           { 
            if (!_queue.IsEmpty) 
            { 
             completion = null; 
             next = _queue.Dequeue(); // assumption: this never throws 
             break; 
            } 
            if (_completion != null) 
            { 
             completion = _completion; 
             next = default(TSource); 
             break; 
            } 
            _state = State.Idle; 
            continue; 
           } 
           Debug.Assert(_state == State.Disposed); 
           return; 
          } 
         } 

         if (completion != null) 
         { 
          completion.Accept(_observer); 
          return; 
         } 
         _observer.OnNext(next); 
        } 
       } 
       finally { Dispose(); } 
      } 
     } 

     /// <summary> 
     /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively. 
     /// </summary> 
     private sealed class RecursiveSubscription : IDisposable 
     { 
      private enum State 
      { 
       Idle, // nothing to notify 
       Scheduled, // emitter scheduled or executing 
       Disposed, // disposed 
      } 

      private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable(); 
      private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action 
      private readonly IScheduler _scheduler; 
      private readonly IObserver<TSource> _observer; 
      private State _state; 
      private IQueue _queue; // queued items 
      private Notification<TSource> _completion; // completion notification 

      public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer) 
      { 
       _scheduler = scheduler; 
       _observer = observer; 
       _queue = Queue.Create(maxQueueSize); 
       _subscription.Disposable = source.Subscribe(
        OnNext, 
        error => OnCompletion(Notification.CreateOnError<TSource>(error)), 
        () => OnCompletion(Notification.CreateOnCompleted<TSource>())); 
      } 

      private void OnNext(TSource value) 
      { 
       lock (_subscription) 
       { 
        switch (_state) 
        { 
         case State.Idle: 
          _emitter.Disposable = _scheduler.Schedule(value, EmitNext); 
          _state = State.Scheduled; 
          break; 
         case State.Scheduled: 
          if (_completion != null) return; 
          try { _queue.Enqueue(value); } 
          catch (Exception error) // probably OutOfMemoryException 
          { 
           _completion = Notification.CreateOnError<TSource>(error); 
           _subscription.Dispose(); 
          } 
          break; 
        } 
       } 
      } 

      private void OnCompletion(Notification<TSource> completion) 
      { 
       lock (_subscription) 
       { 
        switch (_state) 
        { 
         case State.Idle: 
          _completion = completion; 
          _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion)); 
          _state = State.Scheduled; 
          _subscription.Dispose(); 
          break; 
         case State.Scheduled: 
          if (_completion != null) return; 
          _completion = completion; 
          _subscription.Dispose(); 
          break; 
        } 
       } 
      } 

      public void Dispose() 
      { 
       lock (_subscription) 
       { 
        if (_state == State.Disposed) return; 

        _emitter.Dispose(); 
        _queue = null; 
        _completion = null; 
        _state = State.Disposed; 
        _subscription.Dispose(); 
       } 
      } 

      private void EmitNext(TSource value, Action<TSource> self) 
      { 
       try { _observer.OnNext(value); } 
       catch { Dispose(); return; } 

       lock (_subscription) 
       { 
        if (_state == State.Disposed) return; 
        Debug.Assert(_state == State.Scheduled); 
        if (!_queue.IsEmpty) 
         self(_queue.Dequeue()); 
        else if (_completion != null) 
         _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion)); 
        else 
         _state = State.Idle; 
       } 
      } 

      private void EmitCompletion(Notification<TSource> completion) 
      { 
       try { completion.Accept(_observer); } 
       finally { Dispose(); } 
      } 
     } 

     #endregion 

     #region IQueue 

     /// <summary> 
     /// FIFO queue that discards least recent items if size limit is reached. 
     /// </summary> 
     private interface IQueue 
     { 
      bool IsEmpty { get; } 
      void Enqueue(TSource item); 
      TSource Dequeue(); 
     } 

     /// <summary> 
     /// <see cref="IQueue"/> implementations. 
     /// </summary> 
     private static class Queue 
     { 
      public static IQueue Create(int maxSize) 
      { 
       switch (maxSize) 
       { 
        case 0: return Zero.Instance; 
        case 1: return new One(); 
        default: return new Many(maxSize); 
       } 
      } 

      private sealed class Zero : IQueue 
      { 
       // ReSharper disable once StaticMemberInGenericType 
       public static Zero Instance { get; } = new Zero(); 
       private Zero() { } 

       public bool IsEmpty => true; 
       public void Enqueue(TSource item) { } 
       public TSource Dequeue() { throw new InvalidOperationException(); } 
      } 

      private sealed class One : IQueue 
      { 
       private TSource _item; 

       public bool IsEmpty { get; private set; } = true; 

       public void Enqueue(TSource item) 
       { 
        _item = item; 
        IsEmpty = false; 
       } 

       public TSource Dequeue() 
       { 
        if (IsEmpty) throw new InvalidOperationException(); 

        var item = _item; 
        _item = default(TSource); 
        IsEmpty = true; 
        return item; 
       } 
      } 

      private sealed class Many : IQueue 
      { 
       private readonly int _maxSize, _initialSize; 
       private int _deq, _enq; // indices of deque and enqueu positions 
       private TSource[] _buffer; 

       public Many(int maxSize) 
       { 
        if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize)); 

        _maxSize = maxSize; 
        if (maxSize == int.MaxValue) 
         _initialSize = 4; 
        else 
        { 
         // choose an initial size that won't get us too close to maxSize when doubling 
         _initialSize = maxSize; 
         while (_initialSize >= 7) 
          _initialSize = (_initialSize + 1)/2; 
        } 
       } 

       public bool IsEmpty { get; private set; } = true; 

       public void Enqueue(TSource item) 
       { 
        if (IsEmpty) 
        { 
         if (_buffer == null) _buffer = new TSource[_initialSize]; 
         _buffer[0] = item; 
         _deq = 0; 
         _enq = 1; 
         IsEmpty = false; 
         return; 
        } 
        if (_deq == _enq) // full 
        { 
         if (_buffer.Length == _maxSize) // overwrite least recent 
         { 
          _buffer[_enq] = item; 
          if (++_enq == _buffer.Length) _enq = 0; 
          _deq = _enq; 
          return; 
         } 

         // increse buffer size 
         var newSize = _buffer.Length >= _maxSize/2 ? _maxSize : 2 * _buffer.Length; 
         var newBuffer = new TSource[newSize]; 
         var count = _buffer.Length - _deq; 
         Array.Copy(_buffer, _deq, newBuffer, 0, count); 
         Array.Copy(_buffer, 0, newBuffer, count, _deq); 
         _deq = 0; 
         _enq = _buffer.Length; 
         _buffer = newBuffer; 
        } 
        _buffer[_enq] = item; 
        if (++_enq == _buffer.Length) _enq = 0; 
       } 

       public TSource Dequeue() 
       { 
        if (IsEmpty) throw new InvalidOperationException(); 

        var result = ReadAndClear(ref _buffer[_deq]); 
        if (++_deq == _buffer.Length) _deq = 0; 
        if (_deq == _enq) 
        { 
         IsEmpty = true; 
         if (_buffer.Length > _initialSize) _buffer = null; 
        } 
        return result; 
       } 

       private static TSource ReadAndClear(ref TSource item) 
       { 
        var result = item; 
        item = default(TSource); 
        return result; 
       } 
      } 
     } 

     #endregion 
    } 
} 
+0

Wow, to dużo dobrze udokumentowanych rzeczy. Chociaż tylko przeglądam i nie interesuję się tematem, myślę, że muszę ci podziękować za wniesienie kawałka pracy tutaj. – YakovL

+0

Nie ma za co. Chociaż nie zrobiłem tego z czystego altruizmu, mam nadzieję, że ktoś też uzna to za przydatne (i pomoże mi zdobyć 50 punktów reputacji, więc przynajmniej będę mógł komentować posty) – tinudu

Powiązane problemy