28

Spędziłem cały dzień szukając najprostszego możliwego pobierania znacznika URL w Pythonie, ale większość skryptów, które znalazłem, używa kolejek lub wieloprocesowych lub złożonych bibliotek.Bardzo proste wielowątkowe pobieranie równoległego adresu URL (bez kolejki)

Nareszcie napisałem jedną, którą zgłaszam jako odpowiedź. Proszę zasugerować jakąkolwiek poprawę.

Sądzę, że inne osoby mogły szukać czegoś podobnego.

+1

tylko dodać: w przypadku Pythona wielowątkowość nie jest natywna dla rdzenia z powodu GIL. – akshayb

+0

To wygląda na to, że pobieranie równoległych adresów URL jest szybsze niż robienie tego serialnie. Dlaczego? jest to spowodowane tym, że (zakładam) interpreter Pythona nie działa ciągle podczas żądania HTTP? –

+0

Co zrobić, jeśli chcę przeanalizować zawartość tych stron internetowych, które pobieram? Czy lepiej jest parsować w każdym wątku, czy też powinienem to robić sekwencyjnie po dołączeniu wątków roboczych do głównego wątku? –

Odpowiedz

27

Uproszczenie oryginalną wersję miarę możliwości:

import threading 
import urllib2 
import time 

start = time.time() 
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] 

def fetch_url(url): 
    urlHandler = urllib2.urlopen(url) 
    html = urlHandler.read() 
    print "'%s\' fetched in %ss" % (url, (time.time() - start)) 

threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls] 
for thread in threads: 
    thread.start() 
for thread in threads: 
    thread.join() 

print "Elapsed Time: %s" % (time.time() - start) 

Jedyne nowe sztuczki są tu:

  • Śledzić wątków tworzonych.
  • Nie przejmuj się licznikiem wątków, jeśli chcesz wiedzieć, kiedy wszystko jest gotowe; join już ci to mówi.
  • Jeśli nie potrzebujesz żadnego stanu lub zewnętrznego interfejsu API, nie potrzebujesz podklasy Thread, tylko funkcji target.
+0

Upewniłem się, że zostało to uproszczone "w miarę możliwości", ponieważ jest to najlepszy sposób, aby upewnić się, że ktoś sprytnie pojawi się i znajdzie sposób, aby go jeszcze bardziej uprościć, aby sprawić, żebym wyglądał głupio. :) – abarnert

+0

Wierzę, że nie jest łatwo to pokonać! :-) jest to doskonała poprawa od pierwszej wersji, którą opublikowałem tutaj: –

+1

może możemy połączyć pierwsze 2 pętle w jedną? poprzez tworzenie instancji i uruchamianie wątków w tej samej pętli 'for'? –

-1

Ten skrypt pobiera zawartość z zestawu adresów URL zdefiniowanych w tablicy. Tworzy wątek dla każdego adresu URL, który ma być pobrany, więc jest przeznaczony do użycia w ograniczonym zestawie adresów URL.

Zamiast używać obiektu kolejki, każdy wątek powiadamia swój koniec o wywołaniu zwrotnym do funkcji globalnej, która utrzymuje liczbę uruchomionych wątków.

import threading 
import urllib2 
import time 

start = time.time() 
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] 
left_to_fetch = len(urls) 

class FetchUrl(threading.Thread): 
    def __init__(self, url): 
     threading.Thread.__init__(self) 
     self.setDaemon = True 
     self.url = url 

    def run(self): 
     urlHandler = urllib2.urlopen(self.url) 
     html = urlHandler.read() 
     finished_fetch_url(self.url) 


def finished_fetch_url(url): 
    "callback function called when a FetchUrl thread ends" 
    print "\"%s\" fetched in %ss" % (url,(time.time() - start)) 
    global left_to_fetch 
    left_to_fetch-=1 
    if left_to_fetch==0: 
     "all urls have been fetched" 
     print "Elapsed Time: %ss" % (time.time() - start) 


for url in urls: 
    "spawning a FetchUrl thread for each url to fetch" 
    FetchUrl(url).start() 
+0

Widzę, że to bardzo przydatne! Dzięki :) –

+2

Nie można modyfikować współdzielonych globałów bez blokady. I szczególnie niebezpieczne jest wykonywanie takich rzeczy jak 'urlsToFetch- = 1'. Wewnątrz interpretera, który kompiluje się w trzech osobnych krokach, aby załadować 'urlsToFetch', odjąć jeden i zapisać' urlsToFetch'. Jeśli interpreter przełącza wątki między ładunkiem a magazynem, skończysz z wątkiem 1, ładując 2, następnie wątkiem 2 ładując to samo 2, następnie wątkiem 2 przechowując 1, a następnie gwintem 1 przechowując 1. – abarnert

+0

hi abarnert, dzięki za odpowiedź czy możesz zaproponować rozwiązanie dla wątków bezpieczne? wielkie dzięki –

9

Główny przykład w concurrent.futures robi wszystko, co chcesz, o wiele prostszy. Co więcej, może obsłużyć ogromną liczbę adresów URL, robiąc tylko 5 jednocześnie i znacznie lepiej radzi sobie z błędami.

Oczywiście ten moduł jest zbudowany tylko z Pythonem 3.2 lub nowszym ... ale jeśli używasz 2.5-3.1, możesz po prostu zainstalować backport, futures, z PyPI. Aby zmienić kod przykładowy, wystarczy wyszukać i zastąpić concurrent.futures z futures, a dla 2.x, urllib.request z urllib2.

Oto próbka przeniesiona do 2.x, zmodyfikowane do wykorzystania lista adresów URL i dodać razy:

import concurrent.futures 
import urllib2 
import time 

start = time.time() 
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] 

# Retrieve a single page and report the url and contents 
def load_url(url, timeout): 
    conn = urllib2.urlopen(url, timeout=timeout) 
    return conn.readall() 

# We can use a with statement to ensure threads are cleaned up promptly 
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: 
    # Start the load operations and mark each future with its URL 
    future_to_url = {executor.submit(load_url, url, 60): url for url in urls} 
    for future in concurrent.futures.as_completed(future_to_url): 
     url = future_to_url[future] 
     try: 
      data = future.result() 
     except Exception as exc: 
      print '%r generated an exception: %s' % (url, exc) 
     else: 
      print '"%s" fetched in %ss' % (url,(time.time() - start)) 
print "Elapsed Time: %ss" % (time.time() - start) 

Ale można zrobić to jeszcze prostsze. Naprawdę, wszystko czego potrzebujesz to:

def load_url(url): 
    conn = urllib2.urlopen(url, timeout) 
    data = conn.readall() 
    print '"%s" fetched in %ss' % (url,(time.time() - start)) 
    return data 

with futures.ThreadPoolExecutor(max_workers=5) as executor: 
    pages = executor.map(load_url, urls) 

print "Elapsed Time: %ss" % (time.time() - start) 
0

Jestem teraz publikowania innego rozwiązania, przez posiadające gwinty pracownik nie-demona i łączenia ich do głównego wątku (co oznacza zablokowanie głównego wątku, dopóki wszystkich wątków roboczych zakończono) zamiast powiadamiać o zakończeniu wykonywania każdego wątku roboczego z wywołaniem zwrotnym do funkcji globalnej (tak jak to zrobiłem w poprzedniej odpowiedzi), ponieważ w niektórych komentarzach zauważono, że taki sposób nie jest bezpieczny dla wątków.

import threading 
import urllib2 
import time 

start = time.time() 
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] 

class FetchUrl(threading.Thread): 
    def __init__(self, url): 
     threading.Thread.__init__(self) 
     self.url = url 

    def run(self): 
     urlHandler = urllib2.urlopen(self.url) 
     html = urlHandler.read() 
     print "'%s\' fetched in %ss" % (self.url,(time.time() - start)) 

for url in urls: 
    FetchUrl(url).start() 

#Join all existing threads to main thread. 
for thread in threading.enumerate(): 
    if thread is not threading.currentThread(): 
     thread.join() 

print "Elapsed Time: %s" % (time.time() - start) 
+0

To zadziała, ale nie jest to sposób w jaki chcesz to zrobić. Jeśli późniejsza wersja twojego programu utworzy inne wątki (demon lub dołączony przez jakiś inny kod), to się zepsuje. Ponadto, 'thread is threading.currentThread()' nie może działać (myślę, że zawsze będzie dla każdej wersji CPython do tej pory, na dowolnej platformie z prawdziwymi wątkami, jeśli jest używany w głównym wątku ... ale nadal, lepiej nie założyć).Bezpieczniej przechowywać wszystkie obiekty "Thread" na liście ('threads = [FetchUrl (url) dla adresu URL w urlach]'), a następnie uruchomić je, a następnie połączyć je z 'dla wątku w wątkach: thread.join()'. – abarnert

+0

Ponadto, w prostych przypadkach, takich jak ten, można uprościć go jeszcze dalej: Nie zawracaj sobie głowy tworzeniem podklasy "Thread", chyba że masz jakiś stan do zapisania lub jakieś API do interakcji z wątkami z zewnątrz, po prostu napisz proste i wykonaj 'threading.Thread (target = my_thread_function, args = [url])'. – abarnert

+0

masz na myśli to, że jeśli mam ten sam skrypt uruchomiony dwa razy w tym samym czasie na tym samym komputerze "dla wątku w threading.enumerate():" zawiera wątki obu egzekucji? –

16

multiprocessing posiada basen gwintu, który nie jest uruchamiany inne procesy:

#!/usr/bin/env python 
from multiprocessing.pool import ThreadPool 
from time import time as timer 
from urllib2 import urlopen 

urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] 

def fetch_url(url): 
    try: 
     response = urlopen(url) 
     return url, response.read(), None 
    except Exception as e: 
     return url, None, e 

start = timer() 
results = ThreadPool(20).imap_unordered(fetch_url, urls) 
for url, html, error in results: 
    if error is None: 
     print("%r fetched in %ss" % (url, timer() - start)) 
    else: 
     print("error fetching %r: %s" % (url, error)) 
print("Elapsed Time: %s" % (timer() - start,)) 

Zalety w porównaniu z roztworem opartym Thread:

  • ThreadPool pozwala ograniczyć maksymalną liczbę jednoczesnych połączenia (20 w kodzie przykładowym)
  • wyjście nie jest zniekształcone, ponieważ wszystkie wyjścia są w e główny wątek
  • błędy są rejestrowane
  • kod działa zarówno na Pythonie 2 i 3 bez zmian (przyjmując from urllib.request import urlopen na Pythonie 3).
+0

Mam pytanie dotyczące kodu: czy wydruk w czwartej linii od dołu rzeczywiście zwraca czas potrzebny na pobranie adresu URL lub czas potrzebny na zwrócenie adresu URL z obiektu "wyniki"? W moim rozumieniu znacznik czasu powinien być wydrukowany w funkcji fetch_url(), a nie w części drukowania wyników. –

+1

@UweZiegenhagen 'imap_unordered()' zwraca wynik, gdy tylko będzie gotowy. Zakładam, że narzut jest nieznaczny w porównaniu do czasu potrzebnego na wysłanie żądania http. – jfs

+0

Dziękuję, używam go w zmodyfikowanej formie do równoległego kompilowania plików LaTeX: http://uweziegenhagen.de/?p=3501 –

Powiązane problemy