Próbuję napisać skrypt Pythona skanujący folder i zbierać zaktualizowany skrypt SQL, a następnie automatycznie pobierać dane dla skryptu SQL. W kodzie pętla while skanuje nowy plik SQL i wysyła do funkcji ściągania danych. Mam problem, aby zrozumieć, jak utworzyć dynamiczną kolejkę z pętlą while, ale także mieć wiele procesorów do uruchamiania zadań w kolejce.Używanie skryptów w języku Python w pętli do aktualizowania skryptów zadań i wieloprocesowych zadań w kolejce
Następujący kod ma problem z iteracją pętli while, która będzie działać na długim zadaniu, zanim przejdzie do następnej iteracji i zbierze inne zadania, aby wypełnić pusty procesor.
Aktualizacja:
Dzięki @pbacterio do połowu błąd, a teraz komunikat o błędzie zniknął. Po zmianie kodu, kod Pythona może zająć wszystkie skrypty zadania podczas jednej iteracji i dystrybuować skrypty do czterech procesorów. Jednak zawiesi się po długim zadaniu, aby przejść do kolejnej iteracji, skanowania i wysyłania nowo dodanych skryptów zadań. Masz pomysł, jak zrekonstruować kod?
W końcu wymyśliłem rozwiązanie, patrz odpowiedź poniżej. Okazało się to, czego szukasz jest
the_queue = Kolejka()
the_pool = Pool (4, worker_main (the_queue,))Dla tych natknąć się na podobnym pomysłem, po to cały architektura tego skryptu automatyzacji konwertująca udostępniony dysk na "serwer do ciągnięcia SQL" lub jakikolwiek inny "serwer" kolejki zadań.
a. Skrypt python
auto_data_pull.py
, jak pokazano w odpowiedzi. Musisz dodać własną funkcję pracy.b. A 'skrypt partia' z następujących powodów:
startu C: \ Anaconda2 \ python.exe C: \ Users \ bin \ auto_data_pull.py
c. Dodaj zadanie uruchomione przez komputer startowy, uruchom "skrypt wsadowy" To wszystko. To działa.
kod Python:
from glob import glob
import os, time
import sys
import CSV
import re
import subprocess
import pandas as PD
import pypyodbc
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = compute(func, args)
output.put(result)
#
# Function used to compute result
#
def compute(func, args):
result = func(args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
def query_sql(sql_file): #test func
#jsl file processing and SQL querying, data table will be saved to csv.
fo_name = os.path.splitext(sql_file)[0] + '.csv'
fo = open(fo_name, 'w')
print sql_file
fo.write("sql_file {0} is done\n".format(sql_file))
return "Query is done for \n".format(sql_file)
def check_files(path):
"""
arguments -- root path to monitor
returns -- dictionary of {file: timestamp, ...}
"""
sql_query_dirs = glob(path + "/*/IDABox/")
files_dict = {}
for sql_query_dir in sql_query_dirs:
for root, dirs, filenames in os.walk(sql_query_dir):
[files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for
filename in filenames if filename.endswith('.jsl')]
return files_dict
##### working in single thread
def single_thread():
path = "Y:/"
before = check_files(path)
sql_queue = []
while True:
time.sleep(3)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
# print sql_queue
for sql_file in sql_queue:
try:
query_sql(sql_file)
except:
pass
##### not working in queue
def multiple_thread():
NUMBER_OF_PROCESSES = 4
path = "Y:/"
sql_queue = []
before = check_files(path) # get the current dictionary of sql_files
task_queue = Queue()
done_queue = Queue()
while True: #while loop to check the changes of the files
time.sleep(5)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
TASKS = [(query_sql, sql_file) for sql_file in sql_queue]
# Create queues
#submit task
for task in TASKS:
task_queue.put(task)
for i in range(NUMBER_OF_PROCESSES):
p = Process(target=worker, args=(task_queue, done_queue)).start()
# try:
# p = Process(target=worker, args=(task_queue))
# p.start()
# except:
# pass
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
# single_thread()
if __name__ == '__main__':
# freeze_support()
multiple_thread()
referencyjny:
- zmiany plików Monitor z skrypt Pythona: http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
- Multiprocessing:
https://docs.python.org/2/library/multiprocessing.html
@bruno desthuilliers –
podaj minimalny przykład roboczy. – obgnaw
@ bgnaw Dodałem komunikat o błędzie z monitu. Zmodyfikowałem także kod, który teraz powinien zająć 'task_queue' na funkcję' worker() '. –