2012-06-20 8 views
20

Chciałbym posłuchać rutynowo na dwóch kanałach, zablokowanych, gdy oba kanały są opróżnione. Jeśli jednak oba kanały zawierają dane, chcę, aby jeden został usunięty, zanim drugi zostanie obsłużony.Priorytet w obejściu oświadczenia Go select

W poniższym przykładzie roboczym chciałbym, aby cały out został opróżniony przed obsługą exit. Używam wersji select, która nie ma żadnej kolejności priorytetu. Jak mogę obejść ten problem, sprawiając, że wszystkie 10 out-wartości zostaną obsłużone przed wyjściem?

package main 

import "fmt" 

func sender(out chan int, exit chan bool){ 
    for i := 1; i <= 10; i++ { 
     out <- i 
    } 
    exit <- true 
} 

func main(){ 
    out := make(chan int, 10) 
    exit := make(chan bool) 

    go sender(out, exit) 

    L: 
    for { 
     select { 
      case i := <-out: 
       fmt.Printf("Value: %d\n", i) 
      case <-exit: 
       fmt.Println("Exiting") 
       break L 
     } 
    } 
    fmt.Println("Did we get all 10? Most likely not") 
} 

Odpowiedz

14
package main 

import "fmt" 

func sender(out chan int, exit chan bool) { 
    for i := 1; i <= 10; i++ { 
     out <- i 
    } 
    exit <- true 
} 

func main() { 
    out := make(chan int, 10) 
    exit := make(chan bool) 

    go sender(out, exit) 

    for { 
     select { 
     case i := <-out: 
      fmt.Printf("Value: %d\n", i) 
      continue 
     default: 
     } 
     select { 
     case i := <-out: 
      fmt.Printf("Value: %d\n", i) 
      continue 
     case <-exit: 
      fmt.Println("Exiting") 
     } 
     break 
    } 
    fmt.Println("Did we get all 10? I think so!") 
} 

Domyślny przypadek pierwszego wyboru powoduje, że nie jest blokowany. Wybór opróżni kanał wyjściowy bez patrzenia na kanał wyjściowy, ale w przeciwnym razie nie będzie czekać. Jeśli kanał wyjściowy jest pusty, natychmiast przechodzi do drugiego wyboru. Drugi wybór blokuje. Będzie czekać na dane na każdym z kanałów. Jeśli wyjście przychodzi, obsługuje je i umożliwia wyjście pętli. Jeśli dane przychodzą, wraca do góry pętli i przechodzi z powrotem do trybu drenowania.

+1

Pomysł jest bardzo podobny do mojego. Ale prawda, z "kontynuacją", pozbędziesz się potrzeby flagi. Mądry. Cóż, jest to prawdopodobnie równie dobra odpowiedź, jaką mogę założyć. Dzięki! – ANisus

+2

spowoduje to nieskończoną pętlę w pierwszym poleceniu wyboru, jeśli kanał wyjściowy jest zamknięty. – jorelli

+1

jorelli, całkiem prawdziwe. Jeśli chcesz pozwolić wrogim lub zapluskwionym gorsominom na zamknięcie kanału nieoczekiwanie, sprawdzisz status ok na otrzymaniu. – Sonia

5

Innym podejściem:

package main 

import "fmt" 

func sender(c chan int) chan int { 
     go func() { 
       for i := 1; i <= 15; i++ { 
         c <- i 
       } 
       close(c) 
     }() 
     return c 
} 

func main() { 
     for i := range sender(make(chan int, 10)) { 
       fmt.Printf("Value: %d\n", i) 
     } 
     fmt.Println("Did we get all 15? Surely yes") 
} 

$ go run main.go 
Value: 1 
Value: 2 
Value: 3 
Value: 4 
Value: 5 
Value: 6 
Value: 7 
Value: 8 
Value: 9 
Value: 10 
Value: 11 
Value: 12 
Value: 13 
Value: 14 
Value: 15 
Did we get all 15? Surely yes 
$ 
+1

Dzięki za sugestię! Jeśli rozumiem cię poprawnie, sugerujesz użycie tylko jednego kanału, wywołanie wyjścia przez zamknięcie kanału, a tym samym złamanie instrukcji 'for range'. To prawda, może to jest lepszy sposób na zrobienie tego, ale w moim przypadku pracuję z dwoma kanałami. – ANisus

1

Stworzyłem jeden dość proste obejście. Czyni to, co chcę, ale jeśli ktoś ma lepsze rozwiązanie, proszę dać mi znać:

exiting := false 
for !exiting || len(out)>0 { 
    select { 
     case i := <-out: 
      fmt.Printf("Value: %d\n", i) 
     case <-exit: 
      exiting = true 
      fmt.Println("Exiting") 
    } 
} 

Zamiast wychodzenia na odbieranie, że flaga wyjście, wyjście raz zrobiłem, że nic nie pozostawić w chan out .

+1

To działa i jest ładne i kompaktowe, ale wykorzystuje niektóre sztuczki, których powinieneś unikać. Flagi stają się mylące, ponieważ programy stają się coraz większe. Są trochę jak goto. Mówiąc bardziej poważnie, len (chan) może często wprowadzać rasy. W tej sytuacji wygląda to dobrze, ale w wielu przypadkach nie można podjąć decyzji na podstawie len (chan), ponieważ może ona ulec zmianie przed podjęciem działania. Wyobraź sobie przypadek, w którym uzyskasz len == 0, a następnie pojawi się wyjście, a następnie wybierz wyjście. Możesz wzruszyć ramionami i powiedzieć, że przybyli mniej więcej w tym samym czasie, ale w niektórych programach krytycznych może to mieć znaczenie. – Sonia

+0

Umm, może nadal działa w przypadku, który opisałem. Przepraszam, jeśli to zły przykład. W każdym razie staram się unikać używania lena w kodzie synchronizacyjnym. – Sonia

+0

Cześć znowu Sonia :). Dobry wkład. Tak, w moim przypadku nie ma to większego znaczenia. Chciałem tylko spłukać to, co się działo przed wyjściem. Jednak faktycznie zmieniłem kod za pomocą 'for range' i' close (out) '(zgodnie z sugestią jmnl). Wtedy tylko zdarzenia zewnętrzne umieszczone w kanale przed zamknięciem będą "przepłukane". Uniknę podejmowania decyzji na podstawie len (chan), jeśli Nasdaq kiedykolwiek poprosi mnie o wykonanie programu Go;) – ANisus

26

język obsługuje ten sposób i nie wymaga obejścia. To bardzo proste: kanał rezygnacji powinien być widoczny tylko dla producenta. Po zakończeniu producent zamyka kanał. Tylko wtedy, gdy kanał jest pusty i zamknięty, konsument przestaje działać. Jest to możliwe poprzez czytanie z kanału w następujący sposób:

v, ok := <-c 

będzie to ustawić ok na wartość logiczną wskazującą, czy wartość v faktycznie odczytać kanał (ok == true), lub jeśli v został ustawiony na wartość zero typu obsługiwanego przez kanał c, ponieważ c jest zamknięty i pusty (ok == false). Gdy kanał jest zamknięty i nie jest pusty, v będzie prawidłową wartością, a ok będzie true. Gdy kanał jest zamknięty i pusty, v będzie wartością zerową typu obsługiwanego przez kanał c, a ok będzie false, co oznacza, że ​​v jest bezużyteczny.

Oto przykład, aby zilustrować:

package main 

import (
    "fmt" 
    "math/rand" 
    "time" 
) 

var (
    produced = 0 
    processed = 0 
) 

func produceEndlessly(out chan int, quit chan bool) { 
    defer close(out) 
    for { 
     select { 
     case <-quit: 
      fmt.Println("RECV QUIT") 
      return 
     default: 
      out <- rand.Int() 
      time.Sleep(time.Duration(rand.Int63n(5e6))) 
      produced++ 
     } 
    } 
} 

func quitRandomly(quit chan bool) { 
    d := time.Duration(rand.Int63n(5e9)) 
    fmt.Println("SLEEP", d) 
    time.Sleep(d) 
    fmt.Println("SEND QUIT") 
    quit <- true 
} 

func main() { 
    vals, quit := make(chan int, 10), make(chan bool) 
    go produceEndlessly(vals, quit) 
    go quitRandomly(quit) 
    for { 
     x, ok := <-vals 
     if !ok { 
      break 
     } 
     fmt.Println(x) 
     processed++ 
     time.Sleep(time.Duration(rand.Int63n(5e8))) 
    } 
    fmt.Println("Produced:", produced) 
    fmt.Println("Processed:", processed) 
} 

ten jest udokumentowany w sekcji „Receive operator” w odchodzeniu specyfikacji: http://golang.org/ref/spec#Receive_operator

+0

To jest dokładnie to rozwiązanie, którego szukałem, i nie ma potencjalnego błędu warunków wyścigu, który jest w odpowiedzi Sonii – BrandonAGr

0

W moim przypadku, naprawdę chciałem priorytet danych z jednego kanał nad innym, a nie tylko poza sygnałem wyjściowym.Z korzyścią dla nikogo innego o tym samym numerze myślę, że to podejście działa bez potencjalnego wyścigu:

OUTER: 
for channelA != nil || channelB != nil { 

    select { 

    case typeA, ok := <-channelA: 
     if !ok { 
      channelA = nil 
      continue OUTER 
     } 
     doSomething(typeA) 

    case nodeIn, ok := <-channelB: 
     if !ok { 
      channelB = nil 
      continue OUTER 
     } 

     // Looped non-blocking nested select here checks that channelA 
     // really is drained before we deal with the data from channelB 
     NESTED: 
     for { 
      select { 
      case typeA, ok := <-channelA: 
       if !ok { 
        channelA = nil 
        continue NESTED 
       } 
       doSomething(typeA) 

      default: 
       // We are free to process the typeB data now 
       doSomethingElse(typeB) 
       break NESTED 
      } 
     } 
    } 

} 
0

myślę odpowiedź Soni jest incorrect.This jest moje rozwiązanie, trochę komplikować.

package main 

import "fmt" 

func sender(out chan int, exit chan bool){ 
    for i := 1; i <= 10; i++ { 
     out <- i 
    } 
    exit <- true 
} 

func main(){ 
    out := make(chan int, 10) 
    exit := make(chan bool) 

    go sender(out, exit) 

    L: 
    for { 
     select { 
      case i := <-out: 
       fmt.Printf("Value: %d\n", i) 
      case <-exit: 
       for{ 
        select{ 
        case i:=<-out: 
         fmt.Printf("Value: %d\n", i) 
        default: 
         fmt.Println("Exiting") 
         break L 
        } 
       } 
       fmt.Println("Exiting") 
       break L 
     } 
    } 
    fmt.Println("Did we get all 10? Yes!") 
} 
0

Czy istnieje jakiś szczególny powód użycia buforowanego kanału make(chan int, 10)?

Musisz użyć kanału niebuforowanego vs buforowanego, którego używasz.

Po prostu usuń 10, powinno to być tylko make(chan int).

ten sposób wykonanie w funkcji sender może przystąpić tylko do rachunku exit <- true po ostatnią wiadomość z kanału out jest rozkolejkowywana sprawozdaniem i := <-out. Jeśli to stwierdzenie nie zostało wykonane, nie ma możliwości, aby exit <- true został osiągnięty w goroutine.

0

Oto kolejna opcja. Kod

konsumentów:

go func() { 
    stop := false 
    for { 
     select { 
     case item, _ := <-r.queue: 
     doWork(item) 
     case <-r.stopping: 
     stop = true 
     } 
     if stop && len(r.queue) == 0 { 
     break 
     } 
    } 
    }()