2015-07-03 12 views
6

Mam projekt C# pracy z wejściowego audio Stream z Kinect 1, Kinect 2, Mikrofon lub cokolwiek innego.W jaki sposób można podzielić i potokować wiele strumieni NAudio

waveIn.DataAvailable += (object sender, WaveInEventArgs e) => { 
    lock(buffer){ 
    var pos = buffer.Position; 
       buffer.Write(e.Buffer, 0, e.BytesRecorded); 
       buffer.Position = pos; 
    } 
}; 

Zmienna buforowa to strumień ze składnika A, który zostanie przetworzony przez komponent B do rozpoznawania mowy działający na strumieniach.

dodam nowe składniki C, D, E, praca na Streams obliczyć boiska, wykrywanie dźwięku, wykonaj drukowanie palca, lub cokolwiek innego ...

Jak można powielać, że Stream dla składników C, D, E?

  • Składnik A wysłać zdarzenie „Mam Stream robić to, co chcesz” Nie chcę, aby odwrócić logikę przez zdarzenie „Daj mi swoje strumienie”

  • szukam na "MultiStream", który mógłby dać mi Stream instancji i będzie obsługiwać zadanie

Składnik a

var MultiStream buffer = new MultiStream() 
... 
SendMyEventWith(buffer) 

Komponent B, C, D, E

public void HandleMyEvent(MultiStream buffer){ 
    var stream = buffer.GetNewStream(); 
    var engine = new EngineComponentB() 
     engine.SetStream(stream); 
} 
  • MultiStream musi być Stream zawinąć metody write() (bo Stream nie ma dostępnych danych mechaniki)?
  • Jeśli strumień jest usuwany() przez komponent B, MultiStream powinien usunąć go z tablicy?
  • MultiStream musi wyjątek na read() wymagają użycia GetNewStream()

EDIT: Kinect 1 zapewniają samego Stream ... :-(należy używać wątek pumpit do MultiStream ?

Czy ktoś ma tego rodzaju MultiStream klasy?

Dzięki

Odpowiedz

1

Nie jestem pewien, czy jest to najlepszy sposób na to, czy jest lepszy niż poprzednia odpowiedź, i nie gwarantuję, że ten kod jest idealny , ale zakodowałem coś, o co prosiłeś dosłownie, ponieważ było fajnie - klasa MultiStream.

można znaleźć kod dla klasy tutaj: http://pastie.org/10289142

Usage Przykład:

MultiStream ms = new MultiStream(); 

Stream copy1 = ms.CloneStream(); 
ms.Read(...); 

Stream copy2 = ms.CloneStream(); 
ms.Read(...); 

copy1 i copy2 będzie zawierać takie same dane po przykładem jest ran, i będą nadal aktualizowane jako MultiStream jest zapisywane. Możesz czytać, aktualizować pozycję i usuwać sklonowane strumienie indywidualnie. Po usunięciu sklonowane strumienie zostaną usunięte z MultiStream, a usunięcie Multistream spowoduje zamknięcie wszystkich powiązanych i sklonowanych strumieni (możesz to zmienić, jeśli nie jest to zachowanie, które chcesz). Próba napisania do sklonowanych strumieni spowoduje odrzucenie nieobsługiwanego wyjątku.

1

Jakoś nie sądzę strumienie naprawdę pasuje, co próbujesz zrobić. ty konfigurowania sytuację, w której długi uruchomi się program o stale rozszerzać wymagania dotyczące danych bez wyraźnego powodu.

Proponuję model pub/sub, który opublikuje odebrane dane dźwiękowe dla subskrybentów, najlepiej za pomocą podejścia wielowątkowego, aby zminimalizować wpływ złego subskrybenta. Niektóre pomysły można znaleźć here.

Zrobiłem to już wcześniej z klasą procesorów, która implementuje IObserver<byte[]> i używa Queue<byte[]> do przechowywania bloków próbek, aż wątek procesu będzie dla nich gotowy. Oto są klasy bazowe:

public abstract class BufferedObserver<T> : IObserver<T>, IDisposable 
{ 
    private object _lck = new object(); 

    private IDisposable _subscription = null; 
    public bool Subscribed { get { return _subscription != null; } } 

    private bool _completed = false; 
    public bool Completed { get { return _completed; } } 

    protected readonly Queue<T> _queue = new Queue<T>(); 

    protected bool DataAvailable { get { lock(_lck) { return _queue.Any(); } } } 
    protected int AvailableCount { get { lock (_lck) { return _queue.Count; } } } 

    protected BufferedObserver() 
    { 
    } 

    protected BufferedObserver(IObservable<T> observable) 
    { 
     SubscribeTo(observable); 
    } 

    public virtual void Dispose() 
    { 
     if (_subscription != null) 
     { 
      _subscription.Dispose(); 
      _subscription = null; 
     } 
    } 

    public void SubscribeTo(IObservable<T> observable) 
    { 
     if (_subscription != null) 
      _subscription.Dispose(); 
     _subscription = observable.Subscribe(this); 
     _completed = false; 
    } 

    public virtual void OnCompleted() 
    { 
     _completed = true; 
    } 

    public virtual void OnError(Exception error) 
    { } 

    public virtual void OnNext(T value) 
    { 
     lock (_lck) 
      _queue.Enqueue(value); 
    } 

    protected bool GetNext(ref T buffer) 
    { 
     lock (_lck) 
     { 
      if (!_queue.Any()) 
       return false; 
      buffer = _queue.Dequeue(); 
      return true; 
     } 
    } 

    protected T NextOrDefault() 
    { 
     T buffer = default(T); 
     GetNext(ref buffer); 
     return buffer; 
    } 
} 

public abstract class Processor<T> : BufferedObserver<T> 
{ 
    private object _lck = new object(); 
    private Thread _thread = null; 

    private object _cancel_lck = new object(); 
    private bool _cancel_requested = false; 
    private bool CancelRequested 
    { 
     get { lock(_cancel_lck) return _cancel_requested; } 
     set { lock(_cancel_lck) _cancel_requested = value; } 
    } 

    public bool Running { get { return _thread == null ? false : _thread.IsAlive; } } 
    public bool Finished { get { return _thread == null ? false : !_thread.IsAlive; } } 

    protected Processor(IObservable<T> observable) 
     : base(observable) 
    { } 

    public override void Dispose() 
    { 
     if (_thread != null && _thread.IsAlive) 
     { 
      //CancelRequested = true; 
      _thread.Join(5000); 
     } 
     base.Dispose(); 
    } 

    public bool Start() 
    { 
     if (_thread != null) 
      return false; 

     _thread = new Thread(threadfunc); 
     _thread.Start(); 
     return true; 
    } 

    private void threadfunc() 
    { 
     while (!CancelRequested && (!Completed || _queue.Any())) 
     { 
      if (DataAvailable) 
      { 
       T data = NextOrDefault(); 
       if (data != null && !data.Equals(default(T))) 
        ProcessData(data); 
      } 
      else 
       Thread.Sleep(10); 
     } 
    } 

    // implement this in a sub-class to process the blocks 
    protected abstract void ProcessData(T data); 
} 

ten sposób jesteś utrzymując jedynie dane tak długo, jak jest to potrzebne, można dołączyć jako wiele wątków procesu, jak trzeba do tego samego źródła danych do zaobserwowania.


I przez wzgląd na kompletność, oto ogólna klasa, która implementuje IObservable<T> więc można zobaczyć, jak to wszystko do siebie pasuje. Ten jeden ma nawet komentarze:

/// <summary>Generic IObservable implementation</summary> 
/// <typeparam name="T">Type of messages being observed</typeparam> 
public class Observable<T> : IObservable<T> 
{ 
    /// <summary>Subscription class to manage unsubscription of observers.</summary> 
    private class Subscription : IDisposable 
    { 
     /// <summary>Observer list that this subscription relates to</summary> 
     public readonly ConcurrentBag<IObserver<T>> _observers; 

     /// <summary>Observer to manage</summary> 
     public readonly IObserver<T> _observer; 

     /// <summary>Initialize subscription</summary> 
     /// <param name="observers">List of subscribed observers to unsubscribe from</param> 
     /// <param name="observer">Observer to manage</param> 
     public Subscription(ConcurrentBag<IObserver<T>> observers, IObserver<T> observer) 
     { 
      _observers = observers; 
      _observer = observer; 
     } 

     /// <summary>On disposal remove the subscriber from the subscription list</summary> 
     public void Dispose() 
     { 
      IObserver<T> observer; 
      if (_observers != null && _observers.Contains(_observer)) 
       _observers.TryTake(out observer); 
     } 
    } 

    // list of subscribed observers 
    private readonly ConcurrentBag<IObserver<T>> _observers = new ConcurrentBag<IObserver<T>>(); 

    /// <summary>Subscribe an observer to this observable</summary> 
    /// <param name="observer">Observer instance to subscribe</param> 
    /// <returns>A subscription object that unsubscribes on destruction</returns> 
    /// <remarks>Always returns a subscription. Ensure that previous subscriptions are disposed 
    /// before re-subscribing.</remarks> 
    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     // only add observer if it doesn't already exist: 
     if (!_observers.Contains(observer)) 
      _observers.Add(observer); 

     // ...but always return a new subscription. 
     return new Subscription(_observers, observer); 
    } 

    // delegate type for threaded invocation of IObserver.OnNext method 
    private delegate void delNext(T value); 

    /// <summary>Send <paramref name="data"/> to the OnNext methods of each subscriber</summary> 
    /// <param name="data">Data object to send to subscribers</param> 
    /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks> 
    public void Notify(T data) 
    { 
     foreach (var observer in _observers) 
     { 
      delNext handler = observer.OnNext; 
      handler.BeginInvoke(data, null, null); 
     } 
    } 

    // delegate type for asynchronous invocation of IObserver.OnComplete method 
    private delegate void delComplete(); 

    /// <summary>Notify all subscribers that the observable has completed</summary> 
    /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks> 
    public void NotifyComplete() 
    { 
     foreach (var observer in _observers) 
     { 
      delComplete handler = observer.OnCompleted; 
      handler.BeginInvoke(null, null); 
     } 
    } 
} 

Teraz można utworzyć Observable<byte[]> używać jako nadajnika do Process<byte[]> przypadkach, które są zainteresowane. Wyciągnij bloki danych ze strumienia wejściowego, czytnika audio itp. I przekaż je do metody Notify. Po prostu upewnij się, że wcześniej sklonowałeś tablice ...

+0

W obserwatorze Pub/Sub zapisuję dane ze strumienia na inny za pomocą wątku? To może naprawić problem z urządzeniem Kinect 1, który zapewnia strumień, który mogę przepompować do strumienia pamięci. –

+0

Ale to nie rozwiązuje problemu MultiStream. Po stronie mam strumień w komponencie A, a po drugiej mam czytniki B, C, D, E, które chcą czytać cały strumień. Więc to nie jest problem z blokadą, ale kwestia "odczytania" kursora Próbuję rozwiązać –

+0

W metodzie 'Notify' w' IObservable 'publikujesz odebrane informacje do każdego subskrybenta, więc każdy subskrybent otrzymuje pełną kopię danych pracować nad. Jeśli chcesz zachować historię danych w każdym obserwatorze, to użycie pamięci może trochę wystrzelić ... to chyba kompromis. Możesz * udostępnić * strumień pomiędzy różnymi subskrybentami, jeśli naprawdę chcesz, z obiektem kursora strumieniowego dla każdego subskrybenta, który obsługuje niezależne pozycjonowanie, którego wymagają. Byłoby fajnie napisać, ale wypróbuj Pub/Sub. – Corey

Powiązane problemy