2012-06-11 20 views
11

Ideą jest posiadanie zmiennej liczby kanałów w wycinku, przepychanie każdej otrzymanej z nich wartości do jednego kanału i zamykanie tego kanału wyjściowego po zamknięciu ostatniego z kanałów wejściowych. Coś takiego, ale dla wielu kanałów więcej niż dwa:Czy można multipleksować kilka kanałów w jeden?

func multiplex(cin1, cin2, cout chan int) { 
    n := 2 
    for { 
     select { 
     case v, ok := <-cin1: 
      if ok { 
       cout <- v 
      } else { 
       n -= 1 
      } 

     case v, ok := <-cin2: 
      if ok { 
       cout <- v 
      } else { 
       n -= 1 
      } 
     } 

     if n == 0 { 
      close(cout) 
      break 
     } 
    } 
} 

Powyższy kod unika zajęty pętle, ponieważ nie ma default sprawa, która jest dobra (EDIT: wygląda na to, obecności „ok” sprawia, że ​​instrukcja select nie blokuje się, a pętla jest zajęta, ale dla dobra przykładu myśl o kodzie tak, jakby blokowała). Czy ten sam rodzaj funkcjonalności można również osiągnąć za pomocą dowolnej liczby kanałów wejściowych? Oczywiście można tego dokonać, redukując kawałek pary do pojedynczego kanału, ale w miarę możliwości byłbym bardziej zainteresowany prostszym rozwiązaniem.

Odpowiedz

24

Uważam ten fragment robi to, co szukasz. Zmieniłem sygnaturę, aby było jasne, że wejścia i wyjścia powinny być używane tylko do komunikacji w jednym kierunku. Zauważ, że dodatek sync.WaitGroup jest potrzebny, aby wszystkie wejścia zasygnalizowały, że zostały zakończone, a to całkiem proste.

func combine(inputs []<-chan int, output chan<- int) { 
    var group sync.WaitGroup 
    for i := range inputs { 
    group.Add(1) 
    go func(input <-chan int) { 
     for val := range input { 
     output <- val 
     } 
     group.Done() 
    } (inputs[i]) 
    } 
    go func() { 
    group.Wait() 
    close(output) 
    }() 
} 
+1

Ah, bardzo ładne rozwiązanie, jasne i zwięzłe. Dziękuję Ci! – elpres

+0

Istnieje teraz pakiet z funkcją (https://godoc.org/github.com/eapache/channels#Multiplex), który rozwiązuje problem za pomocą odbicia zamiast wielu goroutines. – Evan

0

Używając gorutines, wyprodukowałem to. Czy tego chcesz?

package main 

import (
    "fmt" 
) 

func multiplex(cin []chan int, cout chan int) { 
    n := len(cin) 
    for _, ch := range cin { 
     go func(src chan int) { 
      for { 
       v, ok := <-src 
       if ok { 
        cout <- v 
       } else { 
        n-- // a little dangerous. Maybe use a channel to avoid missed decrements 
        if n == 0 { 
         close(cout) 
        } 
        break 
       } 
      } 
     }(ch) 
    } 
} 

// a main to test the multiplex 
func main() { 
    cin := make([]chan int, 3) 
    cin[0] = make(chan int, 2) 
    cin[1] = make(chan int, 2) 
    cin[2] = make(chan int, 2) 
    cout := make(chan int, 2) 
    multiplex(cin, cout) 
    cin[1] <- 1 
    cin[0] <- 2 
    cin[2] <- 3 
    cin[1] <- 4 
    cin[0] <- 5 
    close(cin[1]) 
    close(cin[0]) 
    close(cin[2]) 
    for { 
     v, ok := <-cout 
     if ok { 
      fmt.Println(v) 
     } else { 
      break 
     } 
    } 
} 

Edycja: Literatura:

http://golang.org/ref/spec#Receive_operator

http://golang.org/ref/spec#Close

+0

Dokumenty mówią, że jeśli odczytasz wartość z kanału z ", ok", operacja nie zostanie zablokowana. Wartość "ok" jest po prostu "false", a wykonanie jest kontynuowane.Jeśli jest to poprawne (jestem nowy w Go i nie mogę dokładnie powiedzieć), to jeśli kanał jest pusty, ale jeszcze nie zamknięty, linia 'if ok' oceniłaby jako 'false' i wykonała gałąź' else'. Ale jeśli zastąpisz "v, ok: = <- src" i "if" za pomocą instrukcji select, to może działać. Muszę to przetestować. Dziękuję za odpowiedź, przy okazji. – elpres

+1

Gdzie przeczytałeś, że operacja nie blokuje? Nie znajduję tego i nie wydaje się pasować do tego, co obserwuję. Czytałem z dokumentu, że nie blokuje * po zamknięciu kanału *. –

+1

Wydaje się, że pochodzi ze starszej wersji specyfikacji, np. [tutaj] (http://go.googlecode.com/hg/doc/go_spec.html?r=c64e293#Communication_operators), spójrz na ostatni akapit przed "Wyrażenia metod". W obecnej wersji ten fragment jest nieco zmieniony i mówi, że "zwrócono wartość zerową, ponieważ kanał jest zamknięty i pusty (fałsz)". Brzmi to tak, że 'false' jest zwracane dopiero po wyczerpaniu i zamknięciu kanałów, prawda? To by znaczyło, że się mylę. – elpres

2

Edycja: dodano parami przykład zmniejszenie kodu i ponownie uporządkowane części odpowiedzi.

Preferowanym rozwiązaniem jest brak odpowiedzi na "restrukturyzację, aby nie można było uzyskać fragmentu kanałów". Restrukturyzacja może często korzystać z funkcji, którą wiele goroutines może wysyłać do jednego kanału. Zamiast więc, aby każdy z twoich źródeł wysyłał osobne kanały, a następnie miał do czynienia z otrzymywaniem z kilku kanałów, po prostu stwórz jeden kanał i niech wszystkie źródła wyślą na ten kanał.

Go nie oferuje funkcji do odbioru z fragmentu kanałów. Jest to często zadawane pytanie i chociaż podane rozwiązanie jest preferowane, istnieją sposoby jego zaprogramowania. Rozwiązanie, które myślałem, że sugerujesz w swoim oryginalnym pytaniu, mówiąc "zmniejszanie liczby parami", to rozwiązanie polegające na dzieleniu i podbiciu binarnym. Działa to dobrze, o ile masz rozwiązanie do multipleksowania dwóch kanałów w jeden. Twój przykładowy kod jest bardzo bliski pracy.

Po prostu brakuje jednej sztuczki, aby twój przykładowy kod zadziałał. W przypadku zmniejszenia wartości n dodaj linię, aby ustawić zmienną kanału na zero. Na przykład kod został odczytany jako

case v, ok := <-cin1: 
     if ok { 
      cout <- v 
     } else { 
      n-- 
      cin1 = nil 
     } 
    case v, ok := <-cin2: 
     if ok { 
      cout <- v 
     } else { 
      n-- 
      cin2 = nil 
     } 
    } 

To rozwiązanie robi to, co chcesz i nie jest zajęte czekaniem.

Więc, pełny przykład wprowadzenie tego rozwiązania do funkcji, które multipleksy kawałek:

package main 

import (
    "fmt" 
    "time" 
) 

func multiplex(cin []chan int, cout chan int) { 
    var cin0, cin1 chan int 
    switch len(cin) { 
    case 2: 
     cin1 = cin[1] 
     fallthrough 
    case 1: 
     cin0 = cin[0] 
    case 0: 
    default: 
     cin0 = make(chan int) 
     cin1 = make(chan int) 
     half := len(cin)/2 
     go multiplex(cin[:half], cin0) 
     go multiplex(cin[half:], cin1) 
    } 
    for cin0 != nil || cin1 != nil { 
     select { 
     case v, ok := <-cin0: 
      if ok { 
       cout <- v 
      } else { 
       cin0 = nil 
      } 
     case v, ok := <-cin1: 
      if ok { 
       cout <- v 
      } else { 
       cin1 = nil 
      } 
     } 
    } 
    close(cout) 
} 

func main() { 
    cin := []chan int{ 
     make(chan int), 
     make(chan int), 
     make(chan int), 
    } 
    cout := make(chan int) 
    for i, c := range cin { 
     go func(x int, cx chan int) { 
      for i := 1; i <= 3; i++ { 
       time.Sleep(100 * time.Millisecond) 
       cx <- x*10 + i 
      } 
      close(cx) 
     }(i, c) 
    } 
    go multiplex(cin, cout) 
    for { 
     select { 
     case v, ok := <-cout: 
      if ok { 
       fmt.Println("main gets", v) 
      } else { 
       return 
      } 
     } 
    } 
} 
+1

Nie, niezupełnie. Czego szukam jako funkcji z sygnaturą 'func multiplex (cin [] chan int, cout chan int)', tj. Taki, który może działać na dowolnej liczbie kanałów wejściowych zamiast być zakodowany na stałe do dwóch. – elpres

Powiązane problemy