2011-07-02 16 views
25

Interesuje mnie wiedzieć, jaki byłby najlepszy sposób wdrożenia kolejki opartej na wątku.Wątek i kolejka

Na przykład:

mam 10 działań, które chcę wykonać tylko 4 wątkach. Chciałbym utworzyć kolejkę ze wszystkimi 10 akcjami umieszczonymi liniowo i rozpocząć pierwszą 4 akcję z 4 wątkami, gdy jeden z wątków zostanie wykonany, następny zacznie się itd. - Tak, po prostu liczba wątków jest albo 4 albo mniej niż 4.

Odpowiedz

28

Jest Queue klasa w thread w bibliotece standardowej. Korzystanie że można zrobić coś takiego:

require 'thread' 

queue = Queue.new 
threads = [] 

# add work to the queue 
queue << work_unit 

4.times do 
    threads << Thread.new do 
    # loop until there are no more things to do 
    until queue.empty? 
     # pop with the non-blocking flag set, this raises 
     # an exception if the queue is empty, in which case 
     # work_unit will be set to nil 
     work_unit = queue.pop(true) rescue nil 
     if work_unit 
     # do work 
     end 
    end 
    # when there is no more work, the thread will stop 
    end 
end 

# wait until all threads have completed processing 
threads.each { |t| t.join } 

Powodem pop z flagą non-blocking, że pomiędzy until queue.empty? i pop inny wątek mógł pop'ed kolejkę, więc chyba bez blokowania flaga jest ustawiona, możemy na zawsze utknąć w tej linii.

Jeśli używasz MRI, domyślnego interpretera Ruby, pamiętaj, że wątki nie będą absolutnie współbieżne. Jeśli twoja praca jest związana z procesorem, możesz równie dobrze uruchomić pojedynczy wątek. Jeśli masz operację, która blokuje IO, możesz uzyskać paralelizm, ale YMMV. Alternatywnie możesz użyć interpretera, który pozwala na pełną współbieżność, na przykład jRuby lub Rubinius.

+1

W oskard, to sugeruje, że na 4 ': END_OF_WORK'' work_unit's zamiast nieblokujące POP. Ostatnie oświadczenie o wątkach, które nie mają jednocześnie uruchomionych procesorów, odnosi się do YARV, ale nie do JRuby. –

+0

@AndrewGrimm, podoba mi się ta odpowiedź, ponieważ czasami chcesz mieć kolejkę roboczą i wątki wokół, aby pracować, gdy dodawany jest nowy element pracy. – akostadinov

7

Istnieje kilka klejnotów, które wykonują ten wzór dla ciebie; równoległe, brzoskwiniowe i moje nazywa się threach (lub jruby_threach pod jruby). Jest to zamiennik dla #each, ale pozwala ci określić, z ilu wątków ma działać, używając SizedQueue pod spodem, aby zapobiec wymykaniu się spod kontroli.

Więc ...

(1..10).threach(4) {|i| do_my_work(i) } 

Nie pchania moje własne rzeczy; istnieje wiele dobrych implementacji, które ułatwiają pracę.

Jeśli używasz JRuby, jruby_threach jest znacznie lepszą implementacją - Java oferuje po prostu znacznie bogatszy zestaw prymatów i struktur danych do użycia.

5

wykonywalny opisowy przykład:

require 'thread' 

p tasks = [ 
    {:file => 'task1'}, 
    {:file => 'task2'}, 
    {:file => 'task3'}, 
    {:file => 'task4'}, 
    {:file => 'task5'} 
] 

tasks_queue = Queue.new 
tasks.each {|task| tasks_queue << task} 

# run workers 
workers_count = 3 
workers = [] 
workers_count.times do |n| 
    workers << Thread.new(n+1) do |my_n| 
     while (task = tasks_queue.shift(true) rescue nil) do 
      delay = rand(0) 
      sleep delay 
      task[:result] = "done by worker ##{my_n} (in #{delay})" 
      p task 
     end 
    end 
end 

# wait for all threads 
workers.each(&:join) 

# output results 
puts "all done" 
p tasks 
1

Używam klejnotu o nazwie work_queue. To naprawdę praktyczne.

przykład:

require 'work_queue' 
wq = WorkQueue.new 4, 10 
(1..10).each do |number| 
    wq.enqueue_b("Thread#{number}") do |thread_name| 
     puts "Hello from the #{thread_name}" 
    end 
end 
wq.join