2015-05-13 12 views
6

Mam jeden proces, który został stworzony tak:Jak utworzyć nieblokujący ciągły odczyt z `stdin`?

p = subprocess.Popen(args = './myapp', 
        stdin = subprocess.PIPE, 
        stdout = subprocess.PIPE, 
        universal_newlines=True) 

Później, próbuję napisać do p „s stdin:

p.stdin.write('my message\n') 

Proces myapp ma następujący setup:

q = queue.Queue() 
def get_input(): 
    for line in iter(sys.stdin.readline, ''): 
     q.put(line) 
    sys.stdin.close() 

threading.Thread(name = 'input-getter', 
       target = get_input).start() 

Próbuje odczytywać nowe linie w sposób ciągły, tak jak poniżej:

Niestety, podproces nigdy nie otrzymuje żadnej z moich wiadomości. Oczywiście, gdy używam:

p.communicate('my message\n') 

podproces otrzyma wiadomość, ale zgodnie z oczekiwaniami, metoda communicate zamyka p „s stdin, więc nie ma więcej komunikacji dzieje.

+2

Jeśli nie chcesz zakończyć procesu, nie powinieneś używać polecenia "komunikuj się" (wysyła tylko te dane, a następnie czeka, aż proces się zakończy); zamiast tego napisz bezpośrednio do 'p.stdin'. – poke

+0

'stdin.flush()'? A co powiesz na używanie modułu takiego jak [async_subprocess] (https://pypi.python.org/pypi/async_subprocess/0.2.1)? –

+0

@InbarRose już wypróbował to .. nie szczęście .. –

Odpowiedz

7
p = subprocess.Popen(args = './myapp', 
        stdin = subprocess.PIPE, 
        stdout = subprocess.PIPE, 
        universal_newlines=True) 

while p.poll() is None: 
    data = p.stdout.readline() 

Spowoduje to odczytanie procesu bez blokowania do momentu zakończenia procesu. Istnieje jednak kilka ostrzeżeń, o których należy pamiętać. Na przykład, jeśli chcesz, możesz również potoczyć stderr, ale nie można z niego odczytać. Wtedy najprawdopodobniej wypełnisz bufor lub dwa i mimo to zawiesisz program. Zawsze upewnij się, że wyczyścisz wszystkie bufory we/wy podczas robienia rzeczy ręcznie.

Lepszym rozwiązaniem byłoby wykorzystanie select.epoll() jeśli to możliwe, to jest dostępne tylko w systemach UNIX, ale daje cholernie dużo lepszą wydajność i obsługi błędów :)

epoll = select.epoll() 
epoll.register(p.stdout.fileno(), select.EPOLLHUP) # Use select.EPOLLIN for stdin. 

for fileno, event in epoll.poll(1): 
    if fileno == p.stdout.fileno(): 
     # ... Do something ... 

UWAGA: pamiętać, że za każdym razem, gdy proces oczekuje wejścia, zwykle wskazuje to przez stdout, więc nadal będziesz rejestrować STDOUT z select.epoll, aby sprawdzić "oczekiwanie na wejście". Możesz zarejestrować select.EPOLLIN, aby sprawdzić, czy dane wejście zostało podane, ale prawie nie widzę tego punktu, ponieważ pamiętaj, że to, co zdecydujesz się wprowadzić do procesu, który już powinieneś być świadomy, "dzieje się".

Sprawdzanie, czy proces spodziewa się danych

Można użyć select.epoll aby sprawdzić, czy proces czeka na wejście lub nie bez blokuje wykonanie aplikacji z powyższego przykładu. Ale są lepsze alternatywy.

Pexpect to jedna biblioteka, która naprawdę dobrze się sprawdza i na przykład współpracuje z SSH.

Działa nieco inaczej niż podprocesy, ale może być dobrą alternatywą.

Pierwsze subprocess.Popen do pracy z SSH

będę przekierować do innego pytania + odpowiedzi, czy to jest to, co jesteś po (ponieważ SSH będzie tarło stdin w chronionym sposób.

Python + SSH Password auth (no external libraries or public/private keys)?

+1

po pierwsze: dziękuję za odpowiedź, po drugie: wspomniałeś tylko o 'stdout', czy to też działa na' stdin', ponieważ moje pytanie szuka tego specjalnie? –

+0

@PeterVaro Ponieważ stdin jest kontrolowany przez użytkownika (aka, wprowadzasz rzeczy) jest już z natury nieblokujący, ale widząc, że twój proces może wymagać wkładu, tak wybierz też by to działało (ja właściwie już pokrył to, ale użyłem 'stdout' w połączeniu z' select.EPOLLIN', który jest moim błędem , 'EPOLLIN' jest dla' stdin', a 'EPOLLHHUP' dla' stdout'. Ale zaktualizowałem swoją odpowiedź, aby pasowało do twoich potrzeb. – Torxed

+1

dlaczego myślisz 'p.stdout.read() 'nie jest blokujące? Nie wraca do EOF. – jfs

1

Próba zbadania swój program, napisałem mój własny "sposób ciągły strumień rzeczy do kotów i nadrobić to, co zwraca" program. Nie wdrożyłem strony podprocesu, ale mam nadzieję, że struktura jest podobna.

Linia ta jest bardzo dziwne o programie ...

for line in iter(sys.stdin.readline, ''): 
    q.put(line) 
sys.stdin.close() 

To wygląda strasznie dużo jak

for line in stdin: 
    q.put(line) 

Zauważ, że pętla się skończy, gdy rura jest zamknięta i nie ma nie trzeba go później ponownie zamykać.

Jeśli potrzebujesz ciągłego asynchronicznego odczytywania wejścia standardowego, powinieneś móc skonstruować wątek czytający w pobliżu identycznego z child_reader w poniższym kodzie. Wystarczy wymienić child.stdout na stdin.

import subprocess 
import threading 
import random 

# We may need to guard this? 
child = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE) 

# Continuously print what the process outputs... 
def print_child(): 
    for line in child.stdout: 
     print(line) 

child_reader = threading.Thread(target = print_child) 
child_reader.start() 

for i in range(10000): 
    chars = 'ABC\n' 
    child.stdin.write(random.choice(chars).encode()) 

# Send EOF. 
# This kills the cat. 
child.stdin.close() 

# I don't think order matters here? 
child.wait() 
child_reader.join() 
+0

1. masz rację, że nie potrzebujesz 'iter (..)' w Pythonie 3 'dla linii w stdin' działa tak jak jest. 2. * "nie ma potrzeby ponownego zamykania go później." * Jest nieprawidłowy. Potrzebujesz tego, aby uniknąć polegania na zbieraniu śmieci, aby pozbyć się odpowiednich deskryptorów plików (uwaga: nie myl rurek w rodzicu i procesie potomnym - są one połączone, ale każdy proces ma swój własny zestaw). – jfs

+0

Okej, rozumiem czyszczenie deskryptorów plików, ale 'sys.stdin.close()'? – QuestionC

+0

Miałem na myśli [podprocesowe rury] (http://stackoverflow.com/a/4896288/4279) takie jak 'child.stdout' w twoim kodzie. Zgadzam się, nie ma sensu zamykanie 'sys.stdin' w większości przypadków. – jfs

2

Myślę, że być może po prostu nie widzisz wyjścia z tego, co się dzieje. Oto pełny przykład, który wydaje się działać na moim polu, chyba że całkowicie nie rozumiem tego, co chcesz. Główną zmianą, którą wprowadziłem, jest ustawienie stdout dla p na zamiast na subprocess.PIPE. Może jestem nieporozumienie ciąg na swoje pytanie, a bit jest kluczowym ...

Oto pełny kod i wyjście:

W wysyłającego (testing) proces (nazwałem go test_comms.py). Jestem obecnie na Windows, więc przepraszam za .bat:

import time 
import subprocess 
import sys 

# Note I'm sending stdout to sys.stdout for observation purposes 
p = subprocess.Popen(args = 'myapp.bat', 
        stdin = subprocess.PIPE, 
        stdout = sys.stdout, 
        universal_newlines=True) 

#Send 10 messages to the process's stdin, 1 second apart      
for i in range(10): 
    time.sleep(1) 
    p.stdin.write('my message\n') 

myapp.bat jest trywialnie:

echo "In the bat cave (script)" 
python myapp.py 

myapp.py zawiera (używając Queue zamiast queue - obecnego środowiska Python 2):

import Queue 
from Queue import Empty 
import threading 
import sys 
import time 

def get_input(): 
    print("Started the listening thread") 
    for line in iter(sys.stdin.readline, ''): 
     print("line arrived to put on the queue\n") 
     q.put(line) 
    sys.stdin.close() 

print("Hi, I'm here via popen")  
q = Queue.Queue() 

threading.Thread(name = 'input-getter', 
       target = get_input).start() 

print("stdin listener Thread created and started") 

# Read off the queue - note it's being filled asynchronously based on 
# When it receives messages. I set the read interval below to 2 seconds 
# to illustrate the queue filling and emptying. 
while True: 
    time.sleep(2) 
    try: 
     print('Queue size is',q.qsize()) 
     print('input:', q.get_nowait()) 
    except Empty: 
     print('no input') 

print("Past my end of code...") 

wyjściowa:

D:\>comms_test.py 

D:\>echo "In the bat cave (script)" 
"In the bat cave (script)" 

D:\>python myapp.py 
Hi, I'm here via popen 
Started the listening threadstdin listener Thread created and started 

line arrived to put on the queue 

line arrived to put on the queue 

('Queue size is', 2) 
('input:', 'my message\n') 
line arrived to put on the queue 

line arrived to put on the queue 

('Queue size is', 3) 
('input:', 'my message\n') 
line arrived to put on the queue 

line arrived to put on the queue 

('Queue size is', 4) 
('input:', 'my message\n') 
line arrived to put on the queue 

line arrived to put on the queue 

('Queue size is', 5) 
('input:', 'my message\n') 
line arrived to put on the queue 

line arrived to put on the queue 


D:\>('Queue size is', 6) 
('input:', 'my message\n') 
('Queue size is', 5) 
('input:', 'my message\n') 
('Queue size is', 4) 
('input:', 'my message\n') 
('Queue size is', 3) 
('input:', 'my message\n') 
('Queue size is', 2) 
('input:', 'my message\n') 
('Queue size is', 1) 
('input:', 'my message\n') 
('Queue size is', 0) 
no input 
('Queue size is', 0) 
no input 
('Queue size is', 0) 
no input 
+0

, chyba że 'sys.stdout' zostanie ponownie przypisany w skrypcie Pythona, wtedy całkowite pominięcie parametru' stdout' powinno mieć ten sam efekt. – jfs

+0

Istnieje kilka błędów z buforowaniem w Pythonie 3; Użyłbym 'print (" moja wiadomość ", file = p.stdin, flush = True)' zamiast 'p.stdin.write (" moja wiadomość \ n ")'. Przekaż jawnie 'bufsize = 1'. – jfs

+0

O ile nie jest konieczne drukowanie "brak danych wejściowych" co 2 sekundy, jeśli obecnie nie ma danych wejściowych, użyłbym prostego 'dla linii w sys.stdin: print ('input:' + line, end = '')' zamiast Wątek, kolejka itp. – jfs

2

Aby wszystko działało poprawnie, należy przepłukać dane wyjściowe w procesie głównym (p.stdout) i podprocesie (sys.stdout).

communicate ma zarówno koloru:

  • go opróżnić p.stdin przy zamykaniu
  • czekać wyjście sys.stdout do przepłukiwania (tuż przed zamknięciem)

Przykład pracy main.py

import subprocess,time 
import sys 
p = subprocess.Popen(args = ['python3', './myapp.py'], 
        stdin = subprocess.PIPE, 
        stdout = subprocess.PIPE, 
        universal_newlines=True) 

time.sleep(0.5) 
p.stdin.write('my message\n') 
p.stdin.flush() 
#print("ici") 
for i,l in enumerate(iter(p.stdout.readline, ''),start=1): 

    print("main:received:",i,repr(l)) 
    if i == 6: 
     break 
    print("mainprocess:send:other message n°{}".format(i)) 
    p.stdin.write("other message n°{}\n".format(i)) 
    p.stdin.flush() 

print("main:waiting for subprocess") 
p.stdin.close()  
p.wait() 

przykładem myapp.py importowej kolejce, gwintowanie, sys, czasu rpdb

q = queue.Queue() 
def get_input(): 
    for line in iter(sys.stdin.readline, ''): 
     q.put(line) 
    sys.stdin.close() 

threading.Thread(name = 'input-getter', 
       target = get_input).start() 
for i in range(6): 
    try: 
     l= q.get_nowait() 
     print('myapp:input:', l,end="") 
     sys.stdout.flush() 

    except queue.Empty: 
     print("myapp:no input") 
     sys.stdout.flush()  
     time.sleep(1) 

wynik:

main:received: 1 'myapp:no input\n' 
mainprocess:send:other message n°1 
main:received: 2 'myapp:input: my message\n' 
mainprocess:send:other message n°2 
main:received: 3 'myapp:input: other message n°1\n' 
mainprocess:send:other message n°3 
main:received: 4 'myapp:no input\n' 
mainprocess:send:other message n°4 
main:received: 5 'myapp:input: other message n°2\n' 
mainprocess:send:other message n°5 
main:received: 6 'myapp:input: other message n°3\n' 
main:waiting for subprocess 
1

Napisałem program, który robi ... zasadzie wszystko udziałem IO asynchronicznie. Odczytuje dane wejściowe na wątku, wyprowadza na wątku, tworzy proces i komunikuje się z tym procesem na wątku.

Nie jestem do końca pewien, co powinien osiągnąć twój program, ale mam nadzieję, że ten kod go zrealizuje.

# Asynchronous cat program! 

# Asynchronously read stdin 
# Pump the results into a threadsafe queue 
# Asynchronously feed the contents to cat 
# Then catch the output from cat and print it 
# Thread all the things 

import subprocess 
import threading 
import queue 
import sys 

my_queue = queue.Queue() 

# Input! 
def input_method(): 
    for line in sys.stdin: # End on EOF 
     if line == 'STOP\n': # Also end on STOP 
      break 
     my_queue.put(line) 
input_thread = threading.Thread(target=input_method) 
input_thread.start() 

print ('Input thread started') 


# Subprocess! 
cat_process = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE) 

print ('cat process started') 

queue_alive = True 
# Continuously dump the queue into cat 
def queue_dump_method(): 
    while queue_alive: 
     try: 
      line = my_queue.get(timeout=2) 
      cat_process.stdin.write(line.encode()) 
      cat_process.stdin.flush() # For some reason, we have to manually flush 
      my_queue.task_done() # Needed? 
     except queue.Empty: 
      pass 
queue_dump_thread = threading.Thread(target = queue_dump_method) 
queue_dump_thread.start() 

print ('Queue dump thread started') 

# Output! 
def output_method(): 
    for line in cat_process.stdout: 
     print(line) 
output_thread = threading.Thread(target=output_method) 
output_thread.start() 

print ('Output thread started') 


# input_thread will die when we type STOP 
input_thread.join() 
print ('Input thread joined') 

# Now we wait for the queue to finish processing 
my_queue.join() 
print ('Queue empty') 

queue_alive = False 
queue_dump_thread.join() 
print ("Queue dump thread joined") 

# Send EOF to cat 
cat_process.stdin.close() 

# This kills the cat 
cat_process.wait() 
print ('cat process done') 

# And make sure we're done outputting 
output_thread.join() 
print ('Output thread joined') 
+0

PS. Ten program jest oczywiście głupi i wątpię, żebyś zrobił to wszystko * to IO asynchronicznie. – QuestionC