2017-09-14 65 views
19

Próbuję napisać skrypt Pythona skanujący folder i zbierać zaktualizowany skrypt SQL, a następnie automatycznie pobierać dane dla skryptu SQL. W kodzie pętla while skanuje nowy plik SQL i wysyła do funkcji ściągania danych. Mam problem, aby zrozumieć, jak utworzyć dynamiczną kolejkę z pętlą while, ale także mieć wiele procesorów do uruchamiania zadań w kolejce.Używanie skryptów w języku Python w pętli do aktualizowania skryptów zadań i wieloprocesowych zadań w kolejce

Następujący kod ma problem z iteracją pętli while, która będzie działać na długim zadaniu, zanim przejdzie do następnej iteracji i zbierze inne zadania, aby wypełnić pusty procesor.

Aktualizacja:

  1. Dzięki @pbacterio do połowu błąd, a teraz komunikat o błędzie zniknął. Po zmianie kodu, kod Pythona może zająć wszystkie skrypty zadania podczas jednej iteracji i dystrybuować skrypty do czterech procesorów. Jednak zawiesi się po długim zadaniu, aby przejść do kolejnej iteracji, skanowania i wysyłania nowo dodanych skryptów zadań. Masz pomysł, jak zrekonstruować kod?

  2. W końcu wymyśliłem rozwiązanie, patrz odpowiedź poniżej. Okazało się to, czego szukasz jest

    the_queue = Kolejka()
    the_pool = Pool (4, worker_main (the_queue,))

  3. Dla tych natknąć się na podobnym pomysłem, po to cały architektura tego skryptu automatyzacji konwertująca udostępniony dysk na "serwer do ciągnięcia SQL" lub jakikolwiek inny "serwer" kolejki zadań.

    a. Skrypt python auto_data_pull.py, jak pokazano w odpowiedzi. Musisz dodać własną funkcję pracy.

    b. A 'skrypt partia' z następujących powodów:

    startu C: \ Anaconda2 \ python.exe C: \ Users \ bin \ auto_data_pull.py

    c. Dodaj zadanie uruchomione przez komputer startowy, uruchom "skrypt wsadowy" To wszystko. To działa.

kod Python:

from glob import glob 
import os, time 
import sys 
import CSV 
import re 
import subprocess 
import pandas as PD 
import pypyodbc 
from multiprocessing import Process, Queue, current_process, freeze_support 

# 
# Function run by worker processes 
# 

def worker(input, output): 
    for func, args in iter(input.get, 'STOP'): 
     result = compute(func, args) 
     output.put(result) 

# 
# Function used to compute result 
# 

def compute(func, args): 
    result = func(args) 
    return '%s says that %s%s = %s' % \ 
     (current_process().name, func.__name__, args, result) 


def query_sql(sql_file): #test func 
    #jsl file processing and SQL querying, data table will be saved to csv. 
    fo_name = os.path.splitext(sql_file)[0] + '.csv' 
    fo = open(fo_name, 'w') 
    print sql_file 
    fo.write("sql_file {0} is done\n".format(sql_file)) 
    return "Query is done for \n".format(sql_file) 


def check_files(path): 
    """ 
    arguments -- root path to monitor 
    returns -- dictionary of {file: timestamp, ...} 
    """ 
    sql_query_dirs = glob(path + "/*/IDABox/") 

    files_dict = {} 
    for sql_query_dir in sql_query_dirs: 
     for root, dirs, filenames in os.walk(sql_query_dir): 
      [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for 
        filename in filenames if filename.endswith('.jsl')] 
    return files_dict 


##### working in single thread 
def single_thread(): 
    path = "Y:/" 

    before = check_files(path) 
    sql_queue = [] 

    while True: 
     time.sleep(3) 
     after = check_files(path) 
     added = [f for f in after if not f in before] 
     deleted = [f for f in before if not f in after] 
     overlapped = list(set(list(after)) & set(list(before))) 
     updated = [f for f in overlapped if before[f] < after[f]] 

     before = after 

     sql_queue = added + updated 
     # print sql_queue 
     for sql_file in sql_queue: 
      try: 
       query_sql(sql_file) 
      except: 
       pass 


##### not working in queue 
def multiple_thread(): 

    NUMBER_OF_PROCESSES = 4 
    path = "Y:/" 

    sql_queue = [] 
    before = check_files(path) # get the current dictionary of sql_files 
    task_queue = Queue() 
    done_queue = Queue() 

    while True:   #while loop to check the changes of the files 
     time.sleep(5) 
     after = check_files(path) 
     added = [f for f in after if not f in before] 
     deleted = [f for f in before if not f in after] 
     overlapped = list(set(list(after)) & set(list(before))) 
     updated = [f for f in overlapped if before[f] < after[f]] 

     before = after 
     sql_queue = added + updated 

     TASKS = [(query_sql, sql_file) for sql_file in sql_queue] 
     # Create queues 

     #submit task 
     for task in TASKS: 
      task_queue.put(task) 

     for i in range(NUMBER_OF_PROCESSES): 
       p = Process(target=worker, args=(task_queue, done_queue)).start()   
      # try: 
      #  p = Process(target=worker, args=(task_queue)) 
      #  p.start() 

      # except: 
      #  pass 

     # Get and print results 
     print 'Unordered results:' 
     for i in range(len(TASKS)): 
      print '\t', done_queue.get() 
     # Tell child processes to stop 
     for i in range(NUMBER_OF_PROCESSES): 
      task_queue.put('STOP')   

# single_thread() 
if __name__ == '__main__': 
    # freeze_support() 
    multiple_thread() 

referencyjny:

  1. zmiany plików Monitor z skrypt Pythona: http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
  2. Multiprocessing:
    https://docs.python.org/2/library/multiprocessing.html
+0

@bruno desthuilliers –

+0

podaj minimalny przykład roboczy. – obgnaw

+0

@ bgnaw Dodałem komunikat o błędzie z monitu. Zmodyfikowałem także kod, który teraz powinien zająć 'task_queue' na funkcję' worker() '. –

Odpowiedz

1

I zorientowali to. Dziękuję za odpowiedź, która zainspirowała tę myśl. Teraz skrypt może uruchomić pętlę while, aby monitorować folder dla nowego zaktualizowanego/dodanego skryptu SQL, a następnie rozpowszechniać dane ciągnące do wielu wątków. Rozwiązanie pochodzi z queue.get() i queue.put(). Zakładam, że obiekt kolejki sam zajmuje się komunikacją.

To jest ostatni kod -

from glob import glob 
import os, time 
import sys 
import pypyodbc 
from multiprocessing import Process, Queue, Event, Pool, current_process, freeze_support 

def query_sql(sql_file): #test func 
    #jsl file processing and SQL querying, data table will be saved to csv. 
    fo_name = os.path.splitext(sql_file)[0] + '.csv' 
    fo = open(fo_name, 'w') 
    print sql_file 
    fo.write("sql_file {0} is done\n".format(sql_file)) 
    return "Query is done for \n".format(sql_file) 


def check_files(path): 
    """ 
    arguments -- root path to monitor 
    returns -- dictionary of {file: timestamp, ...} 
    """ 
    sql_query_dirs = glob(path + "/*/IDABox/") 

    files_dict = {} 
    try: 
     for sql_query_dir in sql_query_dirs: 
      for root, dirs, filenames in os.walk(sql_query_dir): 
       [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for 
         filename in filenames if filename.endswith('.jsl')] 
    except: 
     pass 

    return files_dict 


def worker_main(queue): 
    print os.getpid(),"working" 
    while True: 
     item = queue.get(True) 
     query_sql(item) 

def main(): 
    the_queue = Queue() 
    the_pool = Pool(4, worker_main,(the_queue,)) 

    path = "Y:/" 
    before = check_files(path) # get the current dictionary of sql_files 
    while True:   #while loop to check the changes of the files 
     time.sleep(5) 
     sql_queue = [] 
     after = check_files(path) 
     added = [f for f in after if not f in before] 
     deleted = [f for f in before if not f in after] 
     overlapped = list(set(list(after)) & set(list(before))) 
     updated = [f for f in overlapped if before[f] < after[f]] 

     before = after 
     sql_queue = added + updated 
     if sql_queue: 
      for jsl_file in sql_queue: 
       try: 
        the_queue.put(jsl_file) 
       except: 
        print "{0} failed with error {1}. \n".format(jsl_file, str(sys.exc_info()[0])) 
        pass 
     else: 
      pass 

if __name__ == "__main__": 
    main() 
2

Skąd określić sql_file w multiple_thread() w

multiprocessing.Process(target=query_sql, args=(sql_file)).start() 

Nie określono sql_file w metodzie a ponadto użyłeś tej zmiennej w pętli for. Zasięg zmiennej ogranicza się tylko do pętli for.

+0

Dzięki za ten połów. Zmodyfikowałem swój kod i dodałem funkcję worker, aby uruchomić task_queue w celu uruchomienia zadania. Jednak wciąż nie działa. Ale głosuję za twoją pomocą. –

2

spróbować wymienić to:

result = func(*args) 

przez to:

result = func(args) 
Powiązane problemy