2015-05-08 17 views
5

Tak więc próbuję wielu wątków niektórych połączeń internetowych w python. Używałem modułu do przetwarzania wieloprocesowego, dzięki czemu mogę poruszać się po "Global Interpreter Lock". Ale wydaje się, że system daje tylko jeden port otwartego połączenia pythonowi, a przynajmniej pozwala tylko na jedno połączenie na raz. Oto przykład tego, co mówię.Ile portów sieciowych obsługuje Python?

* Należy pamiętać, że jest uruchomiony na serwerze linux

from multiprocessing import Process, Queue 
import urllib 
import random 

# Generate 10,000 random urls to test and put them in the queue 
queue = Queue() 
for each in range(10000): 
    rand_num = random.randint(1000,10000) 
    url = ('http://www.' + str(rand_num) + '.com') 
    queue.put(url) 

# Main funtion for checking to see if generated url is active 
def check(q): 
    while True: 
     try: 
      url = q.get(False) 
      try: 
       request = urllib.urlopen(url) 
       del request 
       print url + ' is an active url!' 
      except: 
       print url + ' is not an active url!' 
     except: 
      if q.empty(): 
       break 

# Then start all the threads (50) 
for thread in range(50): 
    task = Process(target=check, args=(queue,)) 
    task.start() 

Więc jeśli uruchomić to można zauważyć, że rozpoczyna się 50 wystąpień na temat funkcji, ale działa tylko jeden na raz. Możesz myśleć, że "Global Interpreter Lock" to robi, ale tak nie jest. Spróbuj zmienić funkcję na funkcję matematyczną zamiast na żądanie sieciowe, a zobaczysz, że wszystkie pięćdziesiąt wątków jest uruchamianych jednocześnie.

Czy będę musiał pracować z gniazdkami? Czy jest coś, co mogę zrobić, aby uzyskać dostęp Pythona do większej liczby portów? Czy jest coś, czego nie widzę? Powiedz mi co myślisz! Dzięki!

* Edycja

Więc napisałem ten skrypt, aby przetestować rzeczy lepiej z biblioteką żądań. Wygląda na to, że wcześniej nie testowałem tego zbyt dobrze. (Używałem głównie urllib i urllib2)

from multiprocessing import Process, Queue 
from threading import Thread 
from Queue import Queue as Q 
import requests 
import time 

# A main timestamp 
main_time = time.time() 

# Generate 100 urls to test and put them in the queue 
queue = Queue() 
for each in range(100): 
    url = ('http://www.' + str(each) + '.com') 
    queue.put(url) 

# Timer queue 
time_queue = Queue() 

# Main funtion for checking to see if generated url is active 
def check(q, t_q): # args are queue and time_queue 
    while True: 
     try: 
      url = q.get(False) 
      # Make a timestamp 
      t = time.time() 
      try: 
       request = requests.head(url, timeout=5) 
       t = time.time() - t 
       t_q.put(t) 
       del request 
      except: 
       t = time.time() - t 
       t_q.put(t) 
     except: 
      break 

# Then start all the threads (20) 
thread_list = [] 
for thread in range(20): 
    task = Process(target=check, args=(queue, time_queue)) 
    task.start() 
    thread_list.append(task) 

# Join all the threads so the main process don't quit 
for each in thread_list: 
    each.join() 
main_time_end = time.time() 

# Put the timerQueue into a list to get the average 
time_queue_list = [] 
while True: 
    try: 
     time_queue_list.append(time_queue.get(False)) 
    except: 
     break 

# Results of the time 
average_response = sum(time_queue_list)/float(len(time_queue_list)) 
total_time = main_time_end - main_time 
line = "Multiprocessing: Average response time: %s sec. -- Total time: %s sec." % (average_response, total_time) 
print line 

# A main timestamp 
main_time = time.time() 

# Generate 100 urls to test and put them in the queue 
queue = Q() 
for each in range(100): 
    url = ('http://www.' + str(each) + '.com') 
    queue.put(url) 

# Timer queue 
time_queue = Queue() 

# Main funtion for checking to see if generated url is active 
def check(q, t_q): # args are queue and time_queue 
    while True: 
     try: 
      url = q.get(False) 
      # Make a timestamp 
      t = time.time() 
      try: 
       request = requests.head(url, timeout=5) 
       t = time.time() - t 
       t_q.put(t) 
       del request 
      except: 
       t = time.time() - t 
       t_q.put(t) 
     except: 
      break 

# Then start all the threads (20) 
thread_list = [] 
for thread in range(20): 
    task = Thread(target=check, args=(queue, time_queue)) 
    task.start() 
    thread_list.append(task) 

# Join all the threads so the main process don't quit 
for each in thread_list: 
    each.join() 
main_time_end = time.time() 

# Put the timerQueue into a list to get the average 
time_queue_list = [] 
while True: 
    try: 
     time_queue_list.append(time_queue.get(False)) 
    except: 
     break 

# Results of the time 
average_response = sum(time_queue_list)/float(len(time_queue_list)) 
total_time = main_time_end - main_time 
line = "Standard Threading: Average response time: %s sec. -- Total time: %s sec." % (average_response, total_time) 
print line 

# Do the same thing all over again but this time do each url at a time 
# A main timestamp 
main_time = time.time() 

# Generate 100 urls and test them 
timer_list = [] 
for each in range(100): 
    url = ('http://www.' + str(each) + '.com') 
    t = time.time() 
    try: 
     request = requests.head(url, timeout=5) 
     timer_list.append(time.time() - t) 
    except: 
     timer_list.append(time.time() - t) 
main_time_end = time.time() 

# Results of the time 
average_response = sum(timer_list)/float(len(timer_list)) 
total_time = main_time_end - main_time 
line = "Not using threads: Average response time: %s sec. -- Total time: %s sec." % (average_response, total_time) 
print line 

Jak widać, jest bardzo wielowątkowy. W rzeczywistości większość moich testów pokazuje, że moduł wątkowania jest rzeczywiście szybszy niż moduł wieloprocesowy. (Nie rozumiem dlaczego!) Oto niektóre z moich wyników.

Multiprocessing: Average response time: 2.40511314869 sec. -- Total time: 25.6876308918 sec. 
Standard Threading: Average response time: 2.2179402256 sec. -- Total time: 24.2941861153 sec. 
Not using threads: Average response time: 2.1740363431 sec. -- Total time: 217.404567957 sec. 

Dokonano tego w mojej sieci domowej, czas reakcji na moim serwerze jest znacznie szybszy. Myślę, że moje pytanie zostało udzielone pośrednio, ponieważ miałem problemy z dużo bardziej złożonym scenariuszem. Wszystkie sugestie pomogły mi ją zoptymalizować. Dziękuję wszystkim!

+1

Czy próbowałeś inny moduł Pythona do robi legwork HTTP, może [żądania] (http: //docs.python -requests.org/en/latest/)? Znamy 'urllib' [nie jest wątkiem bezpiecznym] (http://stackoverflow.com/a/5825531/228489), chociaż nie sądzę, że powinno to wpływać na proces wieloprocesowy, ale spróbuję użyć innego modułu, aby znaleźć na zewnątrz. – amccormack

+1

Jak rozpoznać, że działa tylko jeden proces? Wydaje mi się, że funkcja matematyczna jest znacznie szybsza do wykonania niż żądanie http i chociaż może się wydawać, że bieg jest synchroniczny, to faktycznie wykonuje wiele żądań, ale udaje mu się wyraźnie napisać na standardowe wyjście, ponieważ są one wolne. –

+0

@ReutSharabani Cóż, sprawdzałem to na "htop", ale również wypisuje tylko jeden na raz. Jeśli faktycznie uruchomił wiele procesów, wydrukowałby wiele na raz. – TysonU

Odpowiedz

1

zaczyna 50 wystąpień na temat funkcji, ale działa tylko jeden naraz

Masz błędnej interpretacji wyników htop. Tylko kilka, jeśli w ogóle, kopii Pythona będzie można uruchomić w dowolnej konkretnej instancji. Większość z nich będzie blokowana podczas oczekiwania na sieciowe operacje we/wy.

Procesy są w rzeczywistości równoległe.

Spróbuj zmienić funkcję na funkcję matematyczną zamiast na żądanie sieciowe, a zobaczysz, że wszystkie pięćdziesiąt wątków jest uruchamianych jednocześnie.

Zmiana zadania na funkcję matematyczną ilustruje jedynie różnicę między procesami związanymi z procesorem (np. Matematyka) i IO (np. Urlopen). Ten pierwszy jest zawsze możliwy do uruchomienia, drugi jest rzadko uruchamiany.

Drukuje tylko jeden na raz. Jeśli faktycznie uruchomił wiele procesów, wydrukowałby wiele na raz.

Drukuje jeden na raz, ponieważ piszesz linie do terminala. Ponieważ wiersze są nie do odróżnienia, nie można stwierdzić, czy są one napisane w jednym wątku, czy w osobnym wątku po kolei.

+0

Moje pytanie brzmi: czy maszyna linux może mieć więcej niż jeden proces związany z IO na raz? Wiem, że każdy port wydaje się być ograniczony do jednego procesu. Czy mój system otwiera tylko jeden port do "urlopuen"? Czy nie mógłbym w jakiś sposób ręcznie otworzyć każdy proces na nowy port i być może mieć większy sukces? – TysonU

+0

1) Tak, oczywiście Linux może mieć wiele procesów związanych z IO. W rzeczywistości właśnie to pokazuje ci htop - z 50 rozpoczętych procesów, większość z nich czeka na IO. 2) "Wiem, że każdy port wydaje się być ograniczony do jednego procesu." Brednie. Nie ma takiego ograniczenia. 3) "odnieść większy sukces" - co, dokładnie próbujesz osiągnąć, i co sprawia, że ​​myślisz, że jeszcze tego nie dokonałeś? –

+0

Jeśli dwa wątki próbują drukować w tym samym czasie, zwykle powoduje to drukowanie wielu instrukcji na linię. – TysonU

0

Przede wszystkim użycie parametru multiprocessing do zrównoleglania wejść/wyjść sieciowych jest przesadą. Korzystanie z wbudowanej biblioteki threading lub lekkiej biblioteki typu greenlet, na przykład gevent, jest znacznie lepszą opcją z mniejszym narzutem. GIL nie ma nic wspólnego z blokowaniem połączeń IO, więc nie musisz się w ogóle o to martwić.

Po drugie, prostym sposobem sprawdzenia, czy podprocesy/wątki/greenlety działają równolegle, jeśli monitorujesz standardowe wyjście, jest wydrukowanie czegoś na samym początku funkcji, zaraz po wygenerowaniu podprocesów/wątków/greenletów . Na przykład zmodyfikować funkcję check() jak tak

def check(q): 
    print 'Start checking urls!' 
    while True: 
     ... 

Jeśli kod jest prawidłowy, należy zobaczyć wiele Start checking urls! linie wydrukowane przed którymś z url + ' is [not] an active url!' wydrukowany. Działa na moim komputerze, więc wygląda na to, że Twój kod jest poprawny.

+0

Myślę, że tutaj pytanie nie brzmi: "czy sprawdzenie przebiega równolegle?" *. It's * "czy urllib.urlopen działa równolegle?" *. –

+0

Jeśli 'check()' działa równolegle, to 'urllib.urlopen()' działał równolegle (chyba że jest coś poważnie nie tak z jego ustawieniami deskryptorów plików, które wątpię). jeśli potrzebujesz dowodu, uruchom 'check()' w sposób sekwencyjny (np. zamień blok 'na wątek w zasięgu ...' na 'check (kolejka)') i zobaczysz, że sprawdzanie adresów URL zajmuje o wiele więcej – oxymor0n

+0

Myślę, że ** działa ** równolegle, po prostu mówię, że twoja odpowiedź ignoruje pytanie, wyraźnie stwierdził, że matematyczne obliczenia przebiegają równolegle dla niego. –

0

Wygląda na to, że problem dotyczy seryjnego zachowania się gethostbyname(3). Jest to omówione w this SO thread.

Spróbuj ten kod, który używa Twisted asynchroniczne I/O biblioteki:

import random 
import sys 
from twisted.internet import reactor 
from twisted.internet import defer 
from twisted.internet.task import cooperate 
from twisted.web import client 

SIMULTANEOUS_CONNECTIONS = 25 
# Generate 10,000 random urls to test and put them in the queue 
pages = [] 
for each in range(10000): 
    rand_num = random.randint(1000,10000) 
    url = ('http://www.' + str(rand_num) + '.com') 
    pages.append(url) 

# Main function for checking to see if generated url is active 
def check(page): 
    def successback(data, page): 
     print "{} is an active URL!".format(page) 

    def errback(err, page): 
     print "{} is not an active URL!; errmsg:{}".format(page, err.value) 

    d = client.getPage(page, timeout=3) # timeout in seconds 
    d.addCallback(successback, page) 
    d.addErrback(errback, page) 
    return d 

def generate_checks(pages): 
    for i in xrange(0, len(pages)): 
     page = pages[i] 
     #print "Page no. {}".format(i) 
     yield check(page) 

def work(pages): 
    print "started work(): {}".format(len(pages)) 
    batch_size = len(pages)/SIMULTANEOUS_CONNECTIONS 
    for i in xrange(0, len(pages), batch_size): 
     task = cooperate(generate_checks(pages[i:i+batch_size])) 

print "starting..." 
reactor.callWhenRunning(work, pages) 
reactor.run() 
Powiązane problemy