2013-04-03 8 views
5

Mamy kolejkę zadań i pracownicy przetwarzają te zadania pojedynczo. Każde zadanie wymaga od nas sformatowania niektórych danych i wysłania żądania HTTP POST, z danymi jako ładunkiem żądania.Jak wysłać asynchroniczne żądania http w pythonie po jednym na raz?

W jaki sposób możemy kazać każdemu pracownikowi wysyłać żądania HTTP POST asynchronicznie w sposób jedno-wątkowy, bez blokowania? Nie jesteśmy zainteresowani odpowiedzią na żądanie - chcemy tylko, aby żądanie zostało wykonane tak szybko, jak to możliwe, a następnie, aby pracownik natychmiast przystąpił do następnego zadania.

Przeprowadziliśmy badania przy użyciu biblioteki gevent i grequests (patrz: Why does gevent.spawn not execute the parameterized function until a call to Greenlet.join?). Nasz pracownik kod wygląda mniej więcej tak:

def execute_task(worker, job): 

    print "About to spawn request" 
    greenlet = gevent.spawn(requests.post, url, params=params) 

    print "Request spawned, about to call sleep" 
    gevent.sleep() 

    print "Greenlet status: ", greenlet.ready() 

Pierwsza instrukcja print wykonuje, ale wypowiedzi drugi i trzeci wydruk nigdy wydrukowany oraz adres URL nie jest trafiony.

Jak uzyskać te asynchroniczne żądania do wykonania?

+0

Istnieje standardowa biblioteka o nazwie [asyncore] (http://docs.python.org/2/library/asyncore.html), ale może być zbyt niska dla twojego przypadku użycia. – lucasg

+0

Musiałbym zgodzić się z @georgesl na tym, asyncore byłoby doskonałym miejscem do migracji, ponieważ daje lepszą elastyczność w stosunku do aplikacji do późniejszego rozwoju. Również 'http: // stackoverflow.com/ questions/15753901/python-asyncore-client-socket-can-not-assessaine-connection-status/15754244 # 15754244' to dobry początek i przykład tego, jak można go użyć (zobacz odpowiedź na moje pytanie). Jeśli nie, musisz to zrobić w wielu procesach, nawet "podrzędne" biblioteki Pythona najprawdopodobniej utworzą dla ciebie, jeśli możesz wysyłać żądania równolegle, to jest rzecz o wieloprocesowym – Torxed

+0

Twój kod gevent wygląda dobrze (a szybki test mówi mi, że działa dobrze, używam gevent 1.0b3). Chyba zależy to od kontekstu, w którym wywołana jest 'execute_task'. – robertklep

Odpowiedz

1

1) czynią obiekt Queue.Queue

2) zrobić jak najwięcej „pracownik” wątki, jak chcesz która pętlę i odczytać z Queue.Queue

3) karmić pracy na kolejce. kolejka

pracownik wątki będą odczytać Queue.Queue w kolejności, w jakiej są umieszczone na nim

przykład, który czyta wiersze z pliku i umieszcza je w Queue.Queue

import sys 
import urllib2 
import urllib 
from Queue import Queue 
import threading 
import re 

THEEND = "TERMINATION-NOW-THE-END" 


#read from file into Queue.Queue asynchronously 
class QueueFile(threading.Thread): 
    def run(self): 
     if not(isinstance(self.myq, Queue)): 
      print "Queue not set to a Queue" 
      sys.exit(1) 
     h = open(self.f, 'r') 
     for l in h: 
      self.myq.put(l.strip()) # this will block if the queue is full 
     self.myq.put(THEEND) 

    def set_queue(self, q): 
     self.myq = q 

    def set_file(self, f): 
     self.f = f 

pomysł co nitki pracownik może być jak (tylko przykład)

class myWorker(threading.Thread): 
    def run(self): 
     while(running):   
      try: 
       data = self.q.get() # read from fifo 

       req = urllib2.Request("http://192.168.1.10/url/path") 
       req.add_data(urllib.urlencode(data)) 
       h1 = urllib2.urlopen(req, timeout=10) 
       res = h1.read() 
       assert(len(res) > 80) 

      except urllib2.HTTPError, e: 
       print e 

      except urllib2.URLError, e: 
       print "done %d reqs " % n 
       print e 
       sys.exit() 

Aby obiekty oparte na threading.Thread przejść, utworzyć obiekt następnie wywołać „start” na przykład

1

Będziesz musiał uruchomić go w różnych wątkach lub użyć wbudowanej biblioteki asyncore. Większość bibliotek utworzy wątki bez twojej wiedzy, lub będzie polegać na asyncore, który jest standardową częścią Pythona.

Oto kombinacja Threading i asyncore:

#!/usr/bin/python 
# -*- coding: iso-8859-15 -*- 
import asyncore, socket 
from threading import * 
from time import sleep 
from os import _exit 
from logger import * # <- Non-standard library containing a log function 
from config import * # <- Non-standard library containing settings such as "server" 

class logDispatcher(Thread, asyncore.dispatcher): 
    def __init__(self, config=None): 
     self.inbuffer = '' 
     self.buffer = '' 
     self.lockedbuffer = False 
     self.is_writable = False 

     self.is_connected = False 

     self.exit = False 
     self.initated = False 

     asyncore.dispatcher.__init__(self) 
     Thread.__init__(self) 

     self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      self.connect((server, server_port)) 
     except: 
      log('Could not connect to ' + server, 'LOG_SOCK') 
      return None 

     self.start() 

    def handle_connect_event(self): 
     self.is_connected = True 

    def handle_connect(self): 
     self.is_connected = True 
     log('Connected to ' + str(server), 'LOG_SOCK') 

    def handle_close(self): 
     self.is_connected = False 
     self.close() 

    def handle_read(self): 
     data = self.recv(8192) 
     while self.lockedbuffer: 
      sleep(0.01) 

     self.inbuffer += data 


    def handle_write(self): 
     while self.is_writable: 
      sent = self.send(self.buffer) 
      sleep(1) 

      self.buffer = self.buffer[sent:] 
      if len(self.buffer) <= 0: 
       self.is_writable = False 
      sleep(0.01) 

    def _send(self, what): 
     self.buffer += what + '\r\n' 
     self.is_writable = True 

    def run(self): 
     self._send('GET/HTTP/1.1\r\n') 

while 1: 
    logDispatcher() # <- Initate one for each request. 
    asyncore.loop(0.1) 
    log('All threads are done, next loop in 10', 'CORE') 
    sleep(10) 

Albo można po prostu zrobić wątku, który spełnia swoje zadanie, a następnie umiera.

from threading import * 
class worker(Thread): 
    def __init__(self, host, postdata) 
     Thread.__init__(self) 
     self.host = host 
     self.postdata = postdata 
     self.start() 
    def run(self): 
     sock.send(self.postdata) #Pseudo, create the socket! 

for data in postDataObjects: 
    worker('example.com', data) 

Jeśli trzeba ograniczyć liczbę wątków (jeśli wysyłasz ponad 5k postów lub więc może się opodatkowania w systemie) po prostu zrobić while len(enumerate()) > 1000: sleep(0.1) i niech obiekt looper odczekać kilka wątków wymierać.

0

Zawiń swój URL i params na liście, a następnie puknij jedną parę raz do puli zadań (pula zadań tutaj ma jedno zadanie lub jest pusta), twórz wątki, odczytuj zadanie z puli zadań, kiedy jeden wątek dostaje zadanie i wysyła żądanie, a następnie wyskakuje kolejny z listy (tj. jest to w rzeczywistości lista kolejki).

1

Możesz użyć metody join zamiast sleep, a następnie sprawdzić status.Jeśli chcesz uruchomić jeden na raz, który rozwiąże problem. Modyfikowanie kodu w celu jego przetestowania wydaje się działać dobrze.

import gevent 
import requests 

def execute_task(worker, job): 

    print "About to spawn request" 
    greenlet = gevent.spawn(requests.get, 'http://example.com', params={}) 

    print "Request spawned, about to call sleep" 
    gevent.sleep() 

    print "Greenlet status: ", greenlet.ready() 
    print greenlet.get() 

execute_task(None, None) 

podaje wyniki:

About to spawn request 
Request spawned, about to call sleep 
Greenlet status: True 
<Response [200]> 

tam jest więcej dzieje się w tym procesie Pythona, który mógłby blokować Gevent z uruchomieniem tego Greenlet?

Powiązane problemy