2013-02-10 13 views
11

mam następujący program:Odpowiednio przekazywanie danych na stdin do polecenia i odbiera dane z stdout tego polecenia w golang

package main 

import "bytes" 
import "io" 
import "log" 
import "os" 
import "os/exec" 
import "time" 

func main() { 
    runCatFromStdinWorks(populateStdin("aaa\n")) 
    runCatFromStdinWorks(populateStdin("bbb\n")) 
} 

func populateStdin(str string) func(io.WriteCloser) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     io.Copy(stdin, bytes.NewBufferString(str)) 
    } 
} 

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    go populate_stdin_func(stdin) 
    go func() { 
      // Removing the following lines allow some output 
      // to be fetched from cat's stdout sometimes 
      time.Sleep(5 * time.Second) 
      io.Copy(os.Stdout, stdout) 
    }() 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 

Podczas pracy w pętli, mam żadnych rezultatów, tak jak poniżej:

$ while true; do go run cat_thingy.go; echo ; done 



^C 

Ten wynik pojawia się po zainstalowaniu golang-go na Ubuntu 12.04 z apt na maszynie wirtualnej (przejdź do wersji go1). Nie udało mi się zreplikować podczas instalacji na Macbook Air (przejdź do wersji go1.0.3). Wygląda na to, że jest to jakiś rodzaj wyścigu. W rzeczywistości, jeśli położę się spać (1 * czas.Second), nigdy nie widzę problemu kosztem losowego snu w moim kodzie.

Czy jest coś, co robię źle w kodzie, czy jest to błąd? Jeśli jest to błąd, czy zostało to naprawione?

UPDATE: Możliwe Clue

okazało się, że Command.Wait zamknie rur do komunikacji z/do podproces kota, nawet jeśli mają jeszcze nieprzeczytane danych. Nie jestem naprawdę pewien co do właściwego sposobu, aby sobie z tym poradzić. Sądzę, że mógłbym stworzyć kanał powiadamiający o zakończeniu pisania na stdin, ale nadal musiałbym wiedzieć, czy proces cat zakończył się, aby upewnić się, że nic innego nie zostanie zapisane na stdout pipe. Wiem, że mogę użyć cmd.Process.Wait do określenia, kiedy proces się kończy, ale czy można bezpiecznie wywołać cmd.Wait?

UPDATE: Getting Closer

Oto nowy krój w kodzie. Wierzę, że działa to tak daleko, jak pisanie do standardowego wejścia i czytanie ze standardowego wyjścia. Myślę, że mogę sprawić, że poprawnie przesyłać dane strumieniowo (zamiast do ich buforowania), jeśli zastąpię io.Copy ze stdouta obsługującego goroutine bez czegoś, co strumieniami.

package main 

import "bytes" 
import "fmt" 
import "io" 
import "log" 
import "os/exec" 
import "runtime" 

const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB 
const numInputBlocks = 6 

func main() { 
    runtime.GOMAXPROCS(5) 
    runCatFromStdin(populateStdin(numInputBlocks)) 
} 

func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"} 
     for i := 0; i < numInputBlocks; i++ { 
      repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes() 
      fmt.Printf("%s\n", repeatedBytes) 
      io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength))) 
     } 
    } 
} 

func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    go populate_stdin_func(stdin) 
    output_done_channel := make(chan bool) 
    go func() { 
     out_bytes := new(bytes.Buffer) 
     io.Copy(out_bytes, stdout) 
     fmt.Printf("%s\n", out_bytes) 
     fmt.Println(out_bytes.Len()) 
     fmt.Println(inputBufferBlockLength*numInputBlocks) 
     output_done_channel <- true 
    }() 
    <-output_done_channel 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 

Odpowiedz

0

Go statements

A „iść” oświadczenie rozpoczyna wykonywanie funkcji lub metody wywołania jako niezależną jednoczesnych wątków kontroli lub goroutine, w samej przestrzeni adresowej.

GoStmt = Wyrażenie "go".

Wyrażenie musi być wywołaniem. Wartość funkcji i parametry są ocenione jak zwykle w wywołującej goroutine, ale w przeciwieństwie do zwykłego wywołania , wykonanie programu nie czeka na zakończenie wywoływanej funkcji do . Zamiast tego funkcja zaczyna działać niezależnie w nowej goroutine. Kiedy funkcja się kończy, kończy się jej goroutine . Jeśli funkcja ma jakiekolwiek zwracane wartości, są one odrzucane , gdy funkcja się zakończy.

Konwertuj nieuzasadnione goroutiny na wywołania funkcji.

package main 

import (
    "bytes" 
    "io" 
    "log" 
    "os" 
    "os/exec" 
) 

func main() { 
    runCatFromStdinWorks(populateStdin("aaa\n")) 
    runCatFromStdinWorks(populateStdin("bbb\n")) 
} 

func populateStdin(str string) func(io.WriteCloser) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     io.Copy(stdin, bytes.NewBufferString(str)) 
    } 
} 

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    populate_stdin_func(stdin) 
    io.Copy(os.Stdout, stdout) 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 
+2

Twój kod działa, ponieważ zderzaki rurowe w moim przykładzie nigdy nie są pełne. Zmiana goroutine na funkcje wywołania nie działa w ogóle. W ogólnym przypadku, rury, których proces kot wykorzystuje do komunikacji, będą miały bufory o określonym rozmiarze. Na przykład rura stdin ma określony bufor. Gdy bufor zostanie wypełniony, zapisy do potoku zostaną zablokowane. W Linuksie, uważam, że rozmiar bufora to 64 KB. W przypadku stdouta podobny problem pojawiłby się również na rurze dla kotów. Wykonanie blokujących operacji we/wy w kodzie głównym oznacza, że ​​te blokujące wywołania zablokują główny kod. –

4

Oto wersja twojego pierwszego kodu, który działa. Zwróć uwagę na dodanie grupy sync.WaitGroup, aby upewnić się, że zakończyłeś procedurę wysyłania i odbierania go przed zamknięciem polecenia.

package main 

import (
    "bytes" 
    "io" 
    "log" 
    "os" 
    "os/exec" 
    "sync" 
    "time" 
) 

func main() { 
    runCatFromStdinWorks(populateStdin("aaa\n")) 
    runCatFromStdinWorks(populateStdin("bbb\n")) 
} 

func populateStdin(str string) func(io.WriteCloser) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     io.Copy(stdin, bytes.NewBufferString(str)) 
    } 
} 

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    var wg sync.WaitGroup 
    wg.Add(2) 
    go func() { 
     defer wg.Done() 
     populate_stdin_func(stdin) 
    }() 
    go func() { 
     defer wg.Done() 
     time.Sleep(5 * time.Second) 
     io.Copy(os.Stdout, stdout) 
    }() 
    wg.Wait() 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 

(Jest to kolejny sposób na powiedzenie tego, co powiedział, chociaż @peterSO ;-)

+0

To nie jest kolejny sposób na wypowiedzenie tego, co powiedział @peterSO. Właściwie obsługuje bufory rur, ponieważ wejście do kota jest obsługiwane w osobnym goryle z wyjścia. Myślę też, że WaitGroups jest trochę ładniejszy niż kanał, z którego robiłem synchronizację. Nadal wydaje mi się to dość mylące, że rury są zamknięte jako efekt uboczny CMd.Wait(). To naprawdę mylące, ponieważ nie zdarza się, dopóki proces się nie skończy. –

Powiązane problemy