2010-06-23 10 views
5

Wyobraź sobie odwrócone drzewo binarne z węzłami A, B, C, D, E, F na poziomie 0. węzły G, H, I na poziomie 1, węzeł J na poziomie 2, a węzłem K w poziomie 3.Implementacja specjalnego typu kolejki do przetwarzania wieloprocesowego w Pythonie

Poziom 1: G = func (a, B), H = func (C, D), i = func (E, F)

Poziom 2: J = func (G, H)

Poziom 3: K = func (J, I).

Każda para węzłów na poziomie 0 musi być przetwarzana w kolejności, każda para węzłów na poziomie 1 może być przetwarzana w dowolnej kolejności, ale wynik musi być na następnym poziomie musi być przetwarzany, jak pokazano, i tak dalej, aż do końca z końcowym wynikiem, K.

Rzeczywistym problemem jest problem geometrii obliczeniowej, w którym sekwencja brył jest połączona ze sobą. A sąsiaduje z B, który sąsiaduje z C, i tak dalej. Powstały bezpiecznik A i B (G) sąsiaduje z bezpiecznikiem C i D (H). Wynikowy bezpiecznik J i I (K) jest wynikiem końcowym. W ten sposób nie można połączyć G i I, ponieważ nie sąsiadują ze sobą. Jeśli liczba węzłów na poziomie nie jest potęgą 2, kończy się na zwisającej jednostce, która musi zostać przetworzona o jeden poziom dalej.

Ponieważ proces bezpiecznika jest kosztowny pod względem obliczeniowym i wymaga dużej ilości pamięci, ale jest bardzo równoległy, chciałbym użyć pakietu wieloprocesorowego Python i pewnej formy kolejki. Po obliczeniu G = func (A, B), chciałbym wypchnąć wynik G do kolejki dla kolejnych obliczeń J = func (G, H). Gdy kolejka jest pusta, ostatnim wynikiem jest wynik końcowy. Należy pamiętać, że parametr mp.queue niekoniecznie musi dawać wyniki FIFO, ponieważ I = func (E, F) może kończyć się przed H = func (C, D)

Wymyśliłem kilka (złych) rozwiązań ale jestem pewien, że istnieje eleganckie rozwiązanie, które leży poza moim zasięgiem. Propozycje?

+0

Dlaczego poziom = 0 musi być przetwarzany w kolejności, ale poziom = 1 może być przetwarzany w dowolnej kolejności? Czy nie wystarczy wybrać dwa znane liście i połączyć je w jeden węzeł? – Stephen

+0

Jestem niepoprawny, mówiąc, że węzły muszą być przetwarzane w kolejności. Muszą być przetwarzane pod kątem dopasowania. A sąsiaduje z B sąsiaduje z C i tak dalej dla poziomu 0. Możesz zrobić func (A, B) lub func (B, C), ale nie func (A, C). Podobnie na poziomie 1, G sąsiaduje z H sąsiaduje z I. Możesz zrobić func (G, H) lub func (H, I), ale nie func (G, I). – user90855

Odpowiedz

0

Nie mogłem wymyślić inteligentnego projektu dla kolejki, ale można łatwo zastąpić kolejkę jeszcze jednym procesem, który w moim przykładzie nazwałam WorkerManager. Proces ten zbiera wyniki ze wszystkich procesów Worker i uruchamia nowych pracowników tylko wtedy, gdy czekają na przetwarzanie dwa sąsiednie pakiety danych. W ten sposób nigdy nie spróbujesz dołączyć do nie sąsiadujących wyników, więc możesz zignorować "poziomy" i wystrzelić obliczenia następnej pary, gdy tylko będzie ona gotowa.

from multiprocessing import Process, Queue 

class Result(object): 
    '''Result from start to end.''' 
    def __init__(self, start, end, data): 
     self.start = start 
     self.end = end 
     self.data = data 


class Worker(Process): 
    '''Joins two results into one result.''' 
    def __init__(self, result_queue, pair): 
     self.result_queue = result_queue 
     self.pair = pair 
     super(Worker, self).__init__() 

    def run(self): 
     left, right = self.pair 
     result = Result(left.start, right.end, 
         '(%s, %s)' % (left.data, right.data)) 
     self.result_queue.put(result) 


class WorkerManager(Process): 
    ''' 
    Takes results from result_queue, pairs them 
    and assigns workers to process them. 
    Returns final result into final_queue. 
    ''' 
    def __init__(self, result_queue, final_queue, start, end): 
     self._result_queue = result_queue 
     self._final_queue = final_queue 
     self._start = start 
     self._end = end 
     self._results = [] 
     super(WorkerManager, self).__init__() 

    def run(self): 
     while True: 
      result = self._result_queue.get() 
      self._add_result(result) 
      if self._has_final_result(): 
       self._final_queue.put(self._get_final_result()) 
       return 
      pair = self._find_adjacent_pair() 
      if pair: 
       self._start_worker(pair) 

    def _add_result(self, result): 
     self._results.append(result) 
     self._results.sort(key=lambda result: result.start) 

    def _has_final_result(self): 
     return (len(self._results) == 1 
       and self._results[0].start == self._start 
       and self._results[0].end == self._end) 

    def _get_final_result(self): 
     return self._results[0] 

    def _find_adjacent_pair(self): 
     for i in xrange(len(self._results) - 1): 
      left, right = self._results[i], self._results[i + 1] 
      if left.end == right.start: 
       self._results = self._results[:i] + self._results[i + 2:] 
       return left, right 

    def _start_worker(self, pair): 
     worker = Worker(self._result_queue, pair) 
     worker.start() 

if __name__ == '__main__': 
    DATA = [Result(i, i + 1, str(i)) for i in xrange(6)] 
    result_queue = Queue() 
    final_queue = Queue() 
    start = 0 
    end = len(DATA) 
    man = WorkerManager(result_queue, final_queue, start, end) 
    man.start() 
    for res in DATA: 
     result_queue.put(res) 
    final = final_queue.get() 
    print final.start 
    # 0 
    print final.end 
    # 6 
    print final.data 
    # For example: 
    # (((0, 1), (2, 3)), (4, 5)) 

Na moim przykładzie użyłem prostego Worker która zwraca dane podane w nawiasach, oddzielone przecinkiem, ale można umieścić dowolny obliczenia tam. W moim przypadku końcowy wynik to (((0, 1), (2, 3)), (4, 5)), co oznacza, że ​​algorytm obliczony (0, 1) i (2, 3) przed obliczeniem ((0, 1), (2, 3)), a następnie dołączył do wyniku z (4, 5). Mam nadzieję, że tego właśnie szukałeś.

+0

że pojawił się z roztworem, który wygląda następująco: def nagrzewnicy (kształty): shape1_id, shape1 kształty = [0] shape2_id, shape2 = kształty [1] skondensowany = OCC.BRepAlgoAPI.BRepAlgoAPI_Fuse (shape1, shape2).Shape() return ((shape1_id, shape2_id), fused) wyniki = [(i, a) dla i, a w wyliczeniu (plasterki)] podczas len (wyniki)> 1: P = processing.Pool (7 wyniki = P.map (utrwalacz, [(a, b) dla a, bw zip (wyniki [:: 2], wyniki [1 :: 2])]) results.sort (klucz = wynik lambda: wynik [0]) – user90855

Powiązane problemy