2012-05-07 31 views
7

Mój skrypt Perl potrzebuje do uruchomienia wielu wątków jednocześnie ...Jak zaimplementować komunikację wątków semaforów w Perlu?

use threads ('yield', 'exit' => 'threads_only'); 
use threads::shared; 
use strict; 
use warnings; 
no warnings 'threads'; 
use LWP::UserAgent; 
use HTTP::Request; 
use HTTP::Async; 
use ... 

... i takie wątki muszą uzyskać pewne informacje z sieci, więc HTTP::Async służy.

my $request = HTTP::Request->new; 
    $request->protocol('HTTP/1.1'); 
    $request->method('GET'); 
    $request->header('User-Agent' => '...'); 

my $async = HTTP::Async->new(slots   => 100, 
           timeout   => REQUEST_TIMEOUT, 
           max_request_time => REQUEST_TIMEOUT); 

Ale niektóre wątki potrzebują dostępu do sieci tylko, gdy inny wątek (s) tak mówi.

my $start = [Time::HiRes::gettimeofday()]; 
my @threads =(); 
foreach ... { 
    $thread = threads->create(
    sub { 
      local $SIG{KILL} = sub { threads->exit }; 
      my $url = shift; 
      if ($url ...) { 
      # wait for "go" signal from other threads 
      } 
      my ($response, $data); 
      $request->url($url); 
      $data = ''; 
      $async->add($request); 
      while ($response = $async->wait_for_next_response) { 
      threads->yield(); 
      $data .= $response->as_string; 
      } 
      if ($data ...) { 
      # send "go" signal to waiting threads 
      } 
     } 
     }, $_); 

    if (defined $thread) { 
    $thread->detach; 
    push (@threads, $thread); 
    } 
} 

Nie może być jeden lub więcej wątków czeka na sygnał „go” i nie może być jeden lub więcej wątków, że taki sygnał „go” może wysyłać. Na początku stan semafora to "wait", a gdy zmieni się na "przejdź do", pozostanie tak.

Wreszcie aplikacja sprawdza maksymalny czas pracy. Jeśli wątki działają zbyt długo, wysyłany jest sygnał samoczynnego zakończenia.

my $running; 
do { 
    $running = 0; 
    foreach my $thread (@threads) { 
    $running++ if $thread->is_running(); 
    } 
    threads->yield(); 
} until (($running == 0) || 
     (Time::HiRes::tv_interval($start) > MAX_RUN_TIME)); 
$running = 0; 
foreach my $thread (@threads) { 
    if ($thread->is_running()) { 
    $thread->kill('KILL'); 
    $running++; 
    } 
} 
threads->yield(); 

Teraz do rzeczy. Moje pytania to:

  1. Jak mogę najefektywniej kod oczekiwania „semafor” w skrypcie (patrz komentarze w skrypcie powyżej). Czy powinienem po prostu użyć właśnie udostępnionej zmiennej z jakąś obcą pętląsleep?

  2. Czy muszę dodać trochęsleeppętlę na końcu aplikacji, aby dać czas do gwintu do samozniszczenia?

+0

Czy dobrze rozumiem, że używasz oddzielnego HTTP :: obiektów asynchroniczny (kopiowane, nie jest dzielona przez nowych wątków), aby pobrać co najwyżej jeden adres URL w danym momencie na wątek? – pilcrow

+0

@pilcrow - Tak, to wygląda. Czy to marnotrawstwo zasobów? –

+0

Może być, ale nie musi, mniej pamięci lub czasu, ale jest to naprawdę drenaż w cyklach programatora. :) Projekt jest trudny do zrozumienia, a więc być może do zmiany/rozszerzenia bezpiecznie, ponieważ komponenty nie wydają się całkiem poprawne. – pilcrow

Odpowiedz

3

Można spojrzeć na Thread::Queue do wykonywania tej pracy. Można ustawić kolejkę, która będzie obsługiwać sygnalizację między wątkami oczekującymi na sygnał "go" a wątkami wysyłającymi sygnał "go". Oto krótki mock-up, że nie testowałem:

... 
use Thread::Queue; 
... 
# In main body 
my $q = Thread::Queue->new(); 
... 
$thread = threads->create(
    sub { 
      local $SIG{KILL} = sub { threads->exit }; 
      my $url = shift; 
      if ($url ...) { 
      # wait for "go" signal from other threads 
      my $mesg = $q->dequeue(); 
      # you could put in some termination code if the $mesg isn't 'go' 
      if ($mesg ne 'go') { ... } 
      } 
      ... 
      if ($data ...) { 
      # send "go" signal to waiting threads 
      $q->enqueue('go'); 
      } 
     } 
     }, $_); 
... 

Nici, które muszą czekać na sygnał „go” będzie czekać na metodzie rozkolejkowania aż coś wchodzi do kolejki. Gdy wiadomość wejdzie do kolejki jeden wątek i tylko jeden wątek pobierze wiadomość i przetworzy ją.

Jeśli chcesz zatrzymać wątki, aby nie były uruchamiane, możesz wstawić komunikat zatrzymania do nagłówka kolejki

$q->insert(0, 'stop') foreach (@threads); 

Istnieją przykłady w temat :: Queue i threads CPAN dystrybucje, które pokazują to bardziej szczegółowo.

W odpowiedzi na twoje drugie pytanie, odpowiedź brzmi, niestety, to zależy. Kiedy chcesz przerwać wątki, jaki rodzaj czyszczenia jest wymagany do czystego wyłączenia? Jaki jest najgorszy scenariusz, który mógłby wystąpić, gdyby dywan został wyrwany spod nici? W każdym momencie chciałbyś zaplanować oczyszczenie. Inną opcją, którą możesz zrobić, to poczekać na zakończenie każdego wątku.

Powodem mojego komentarza z pytaniem, czy można usunąć wywołanie detach, jest to, że ta metoda pozwala na wyjście głównego wątku i nie obchodzi go, co działo się z wątkami potomnymi. Zamiast tego, jeśli usunąć tę rozmowę, i dodać:

$_->join() foreach threads->list(); 

do końca głównego bloku, będzie to wymagało główną aplikację czekać na każdego wątku faktycznie zakończona.

Jeśli zostawisz metodę detach na swoim miejscu, będziesz musiał spać na końcu kodu, jeśli chcesz, aby nici wykonały jakiekolwiek czyszczenie. Kiedy zadzwonisz na numer detach, Perl mówi, że nie obchodzi cię, co robi wątek po wyjściu głównego wątku. Jeśli główny wątek zostanie zamknięty i istnieją wątki, które nadal działają, które zostały odłączone, program zakończy się bez ostrzeżeń. Jeśli jednak nie wymagają czyszczenia, a nadal dzwonisz pod numer detach, możesz wyjść z dowolnego miejsca.

+0

To pytanie ma otwartą nagrodę o wartości +50 reputacji. Proszę ** poprawić swoją odpowiedź **. Zauważyłem, że twój post jest interesujący, jednak nie odpowiedziałeś na drugie pytanie w moim poście (jeśli/jak ** czekać ** na autodestrukcję wątków) –

+0

@ user1215106 Zauważyłem w twoim kodzie, że robisz ' $ thread-> unpach; '. Zwykle używa się tego, aby zignorować wątek i nie przejmować się, jeśli to się zakończy, czy nie. Czy masz powód, żeby to tam było, czy może być usunięte? – Joel

+0

Wierzę, że można go usunąć –

-1

Wypróbuj coś takiego ....

#!/usr/bin/perl 

use threads; 
use threads::shared; 

$|=1; 

my ($global):shared; 
my (@threads); 

push(@threads, threads->new(\&mySub,1)); 
push(@threads, threads->new(\&mySub,2)); 
push(@threads, threads->new(\&mySub,3)); 

$i = 0; 

foreach my $myThread(@threads) 

{ 
    my @ReturnData = $myTread->join ; 
    print "Thread $i returned: @ReturnData\n"; 
    $i++; 
} 

sub mySub 
{ 
    my ($threadID) = @_; 

    for(0..1000) 
    { 
     $global++; 
     print "Thread ID: $threadID >> $_ >> GLB: $global\n"; 
     sleep(1); 
    } 
    return($id); 
} 
Powiązane problemy