2013-04-29 13 views
15

Chcę czegoś podobnego do executor.map, z wyjątkiem sytuacji, gdy sprawdzam wyniki, chcę je powtórzyć zgodnie z kolejnością wypełniania, np. element pracy, który został ukończony jako pierwszy, powinien pojawić się jako pierwszy w iteracji itp. Jest tak, że iteracja zablokuje iff każdy pojedynczy element pracy w sekwencji jeszcze się nie zakończył.Python `concurrent.futures`: Iterowanie na futures zgodnie z kolejnością wypełniania

Wiem, jak zaimplementować to samodzielnie za pomocą kolejki, ale zastanawiam się, czy jest to możliwe przy użyciu ramy futures.

(I najczęściej stosowane wykonawcy gwint oparte, więc chciałbym odpowiedzieć, że stosuje się do nich, ale ogólną odpowiedź będzie mile widziane, jak również.)

UPDATE: Dzięki za odpowiedzi! Czy możesz wyjaśnić, jak mogę użyć as_completed z executor.map? executor.map jest najbardziej przydatnym i zwięzłym narzędziem dla mnie przy korzystaniu z kontraktów terminowych, a ja niechętnie zacznę ręcznie używać obiektów Future.

+0

Masz szczęście! – damzam

Odpowiedz

25

executor.map(), jak wbudowanym map(), tylko zwraca wyniki w kolejności z iterable, więc niestety nie można go używać do określenia kolejności realizacji. concurrent.futures.as_completed() jest to, czego szukasz - oto przykład:

import time 
import concurrent.futures 

times = [3, 1, 2] 

def sleeper(secs): 
    time.sleep(secs) 
    print('I slept for {} seconds'.format(secs)) 
    return secs 

# returns in the order given 
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    print(list(executor.map(sleeper, times))) 

# I slept for 1 seconds 
# I slept for 2 seconds 
# I slept for 3 seconds 
# [3, 1, 2] 

# returns in the order completed 
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    futs = [executor.submit(sleeper, secs) for secs in times] 
    print([fut.result() for fut in concurrent.futures.as_completed(futs)]) 

# I slept for 1 seconds 
# I slept for 2 seconds 
# I slept for 3 seconds 
# [1, 2, 3] 

Oczywiście, jeśli są wymagane do korzystania z interfejsu mapy, można utworzyć własny map_as_completed() funkcję, która obudowuje powyżej (może dodać ją do podklasy Executor()), ale myślę, że tworzenie instancji futures poprzez executor.submit() jest prostszym/czystszym sposobem na przejście (pozwala również na zapewnienie no-args, kwargs).

0

From python doc

concurrent.futures.as_completed(fs, timeout=None)¶ 

Zwraca iterator nad przyszłością przypadkach (ewentualnie tworzonych przez różne instancje Wykonawcy ) podanych przez FS, że plony przyszłości, ponieważ kompletne (wykończone lub zostały anulowane). Wszystkie transakcje futures, które zakończyły się przed zakończeniem as_completed() , zostaną przekazane jako pierwsze. Zwracany iterator wywołuje TimeoutError, jeśli wywoływana jest funkcja , następnie(), a wynik nie jest dostępny po upływie limitu czasu od pierwotnego wywołania funkcji as_completed(). timeout może być int lub float. Jeśli timeout nie jest określony lub None, , nie ma limitu czasu oczekiwania.

Trzeba by zrozumieć różnicę między executor.map() i executor.submit(). Pierwszy odwzorowuje funkcję na wektor argumentów. Jest dość podobny do map, ale uruchamia zadania asynchronicznie. submit(func, arg) uruchamia jedno zadanie przy każdym połączeniu. W tym zadaniu func jest stosowane do arg.

Oto przykład użycia as_completed() z submit(), który mógłbym uruchomić na pythonie 3.0

from concurrent import futures 
import urllib.request 

URLS = ['http://www.foxnews.com/', 
     'http://www.cnn.com/', 
     'http://europe.wsj.com/', 
     'http://www.bbc.co.uk/', 
     'http://some-made-up-domain.com/'] 

def load_url(url, timeout): 
    return urllib.request.urlopen(url, timeout=timeout).read() 

def main(): 
    with futures.ThreadPoolExecutor(max_workers=5) as executor: 
     future_to_url = dict(
      (executor.submit(load_url, url, 60), url) 
      for url in URLS) 

     for future in futures.as_completed(future_to_url): 
      url = future_to_url[future] 
      try: 
       print('%r page is %d bytes' % (
          url, len(future.result()))) 
      except Exception as e: 
       print('%r generated an exception: %s' % (
          url, e)) 

if __name__ == '__main__': 
    main() 

nie map() jest tutaj stosowane, zadania są uruchamiane z submit i as_completed()

zwraca iterator nad przyszłością przypadkach podanych przez FS, że plony przyszłość, gdyż pełna (gotowych lub zostały anulowane) .

Powiązane problemy