2012-02-13 11 views
7

Próbuję wykonać następujące czynności:Perl Kolejki i Threading

  1. mają gwint, który odczytuje dane z bardzo dużego pliku powiedzieć o 10GB i odsyła je do kolejki. (Nie chcę za kolejce do uzyskać bardzo duże albo)

  2. Choć wątek buildQueue naciska dane do kolejki w tym samym czasie mają około 5 pracownika nici de-kolejkę i przetwarzania danych.

Zrobiłem próbę ale moje inne wątki są nieosiągalne z powodu pętli w moim buildQueue wątku.

Moje podejście może być całkowicie błędne. Dzięki za pomoc, jest to bardzo doceniane.

Oto kod dla buildQueue:

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open DICT_FILE, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<DICT_FILE>) { 
      if ($queue->pending() < 100) { 
       my $query = <DICT_FILE>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

A jak już spodziewać, kiedy ten wątek zostaje nic innego wykonywane po zostanie wykonany, ponieważ ten wątek nie zakończy.

my $builder = new Thread(&buildQueue); 

Ponieważ wątek budowniczego będzie działał przez długi czas, nigdy nie utworzę wątków roboczych.

Oto cały kod:

#!/usr/bin/perl -w 
use strict; 
use Thread; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
my @threads; 

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open dict_file, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<dict_file>) { 
      if ($queue->pending() < 100) { 
       my $query = <dict_file>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

sub processor { 
    my $query; 
    while (1) { 
     if ($query = $queue->dequeue) { 
      print "$query\n"; 
     } 
    } 
} 

my $builder = new Thread(&buildQueue); 
push @threads, new Thread(&processor) for 1..5; 
+0

Kilka pytań: Możesz wspomnieć, że gwint kolejka-budowniczy nie zakończy, ale to niczego w ogóle? Czy rozmiar kolejki spadł poniżej 100 lub przekroczył 0? Ponadto [nie jestem pewien, czy poprawnie tworzysz swoje wątki] (http://perldoc.perl.org/perlthrtut.html). Czy nie powinno to być 'my $ builder = threads-> create (\ & buildQueue);'? –

+0

Kreator kolejki jest dobrze zbudowany, ale ponieważ wątki robocze nie zostały utworzone, nie można usunąć niczego z kolejki, więc kolejka zatrzyma się na 100, podczas gdy kolejka kompilacji nadal działa z powodu ciągłej pętli. – Sinista

+0

Hmmm, będę potrzebował zobaczyć więcej kodu, aby ustalić kontekst, szczególnie tam, gdzie tworzysz wątki. Nie jesteś 'join'ing lub' odłączyć 'budowniczego kolejki przed utworzeniem wątków roboczych, prawda? –

Odpowiedz

10

Musisz zaznaczyć, gdy chcesz, aby gwinty, aby wyjść (przez obu joinor detach). Problem stanowi również fakt, że masz nieskończone pętle bez instrukcji last.

Edytuj: Zapomniałem również bardzo ważną część! Each worker thread will block, waiting for another item to process off of the queue until they get an undef in the queue. Z tego powodu po każdym budowaniu kolejki po każdym kolejnym wątku dodajemy do kolejnej instancji undef.

Spróbuj:

#!/usr/bin/perl -w 
use strict; 
use threads; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
our @threads; #Do you really need our instead of my? 

sub buildQueue 
{ 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 

    #Three-argument open, please! 
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!"); 
    while(my $query=<$dict_file>) 
    { 
     chomp($query); 
     while(1) 
     { #Wait to see if our queue has < 100 items... 
      if ($queue->pending() < 100) 
      { 
       $queue->enqueue($query); 
       print "Queue Size: " . $queue->pending . "\n"; 
       last; #This breaks out of the infinite loop 
      } 
     } 
    } 
    close($dict_file); 
    foreach(1..5) 
    { 
     $queue->enqueue(undef); 
    } 
} 

sub processor 
{ 
    my $query; 
    while ($query = $queue->dequeue) 
    { 
     print "Thread " . threads->tid . " got $query\n"; 
    } 
} 

my $builder=threads->create(\&buildQueue); 
push @threads,threads->create(\&process) for 1..5; 

#Waiting for our threads to finish. 
$builder->join; 
foreach(@threads) 
{ 
    $_->join; 
} 
+1

Wygląda na to, że problemem był nieaktualny moduł wątku, który przełączyłem na moduł wątków, a mój kod działa tak jak powinien. Dziękuję Jackowi Many za wskazanie mnie we właściwym kierunku. – Sinista

0

Inne podejście: Można również użyć user_tasks w MCE 1.2+ i utworzenie dwóch multi-robotnikatasks, jedno zadanie do czytania (ponieważ jest to duży plik, można również korzystać z czytania równoległego zachowując plik odczytać szukać) i jedno zadanie do przetworzenia itp.

Poniższy kod nadal używa Thread::Queue do zarządzania kolejką bufora.

Podsłówka ma kontrolę nad wielkością kolejki i przesyła dane bezpośrednio do procesu zarządzania "$ R_QUEUE, ponieważ używaliśmy wątków, więc ma dostęp do przestrzeni pamięci rodzica. Jeśli zamiast tego chcesz używać wideł, nadal możesz uzyskać dostęp do kolejki za pomocą funkcji oddzwaniania. Ale tutaj postanowiłem po prostu popchnąć do kolejki.

Podsłówka processQueue będzie po prostu usuwać kolejki wszystkich elementów znajdujących się w kolejce, dopóki nie będzie niczego więcej.

Pod każdą czynność jest uruchamiany tylko jeden raz przez proces menedżera na końcu każdego zadania, więc używamy go do sygnalizowania zatrzymania naszych procesów roboczych.

Oczywiście, istnieje wiele swobody w jaki sposób chcesz kawałek dane do robotników, aby można było podjąć decyzję dotyczącą wielkości fragmentu lub nawet Jak slurp swoje dane.

#!/usr/bin/env perl 
use strict; 
use warnings; 
use threads; 
use threads::shared; 
use Thread::Queue; 
use MCE; 

my $R_QUEUE = Thread::Queue->new; 
my $queue_workers = 8; 
my $process_workers = 8; 
my $chunk_size = 1; 

print "Enter a file name: "; 
my $input_file = <STDIN>; 
chomp($input_file); 

sub buildQueue { 
    my ($self, $chunk_ref, $chunk_id) = @_; 
    if ($R_QUEUE->pending() < 100) { 
     $R_QUEUE->enqueue($chunk_ref); 
     $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n"); 
    } 
} 

sub processQueue { 
    my $self = shift; 
    my $wid = $self->wid; 
    while (my $buff = $R_QUEUE->dequeue) { 
     $self->sendto('stdout', "Thread " . $wid . " got $$buff"); 
    } 
} 

my $mce = MCE->new(
    input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details. 
    chunk_size => $chunk_size, 
    use_slurpio => 1, 

    user_tasks => [ 
     { # queueing task 
      max_workers => $queue_workers, 
      user_func => \&buildQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory. 
      task_end => sub { $R_QUEUE->enqueue((undef) x $process_workers) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney! 
     }, 
     { # process task 
      max_workers => $process_workers, 
      user_func => \&processQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory 
      task_end => sub { print "Finished processing!\n"; } 
     } 
    ] 
); 

$mce->run(); 

exit; 
3

Moduł MCE dla Perla uwielbia duże pliki. Za pomocą MCE można podzielić wiele linii naraz, wydzielać duży fragment jako łańcuch skalarny lub odczytywać po 1 linii naraz. Cięcie wielu linii naraz redukuje obciążenie dla IPC.

MCE 1.504 jest już dostępny. Zapewnia kolejkę MCE :: Queue z obsługą procesów podrzędnych, w tym wątków. Ponadto wersja 1.5 zawiera 5 modeli (MCE :: Flow, MCE :: Grep, MCE :: Loop, MCE :: Map i MCE :: Stream), które zajmują się instancją instancji MCE, a także automa- dostrajanie max_workers i chunk_size. Można zastąpić te opcje btw.

Poniżej, MCE :: Loop służy do demonstracji.

use MCE::Loop; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
    my ($mce, $chunk_ref, $chunk_id) = @_; 

    foreach my $line (@$chunk_ref) { 
     chomp $line; 
     ## add your code here to process $line 
    } 

} $dict_path; 

Jeśli chcesz określić liczbę pracowników i/lub chunk_size, istnieją 2 sposoby, aby to zrobić.

use MCE::Loop max_workers => 5, chunk_size => 300000; 

Albo ...

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 300000 
}; 

Chociaż wyrwy jest korzystne dla dużych plików, można porównać czas z wyrwy jednej linii na raz. Można pominąć pierwszą linię wewnątrz bloku (skomentował). Zwróć uwagę, że nie ma potrzeby wewnętrznej pętli for. $ chunk_ref jest nadal tablicą ref zawierającą 1 linię. Wejściowy skalar $ _ zawiera linię, gdy wielkość_karty równa się 1, w przeciwnym razie wskazuje na $ chunk_ref.

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 1 
}; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
# my ($mce, $chunk_ref, $chunk_id) = @_; 

    my $line = $_; 
    ## add your code here to process $line or $_ 

} $dict_path; 

Mam nadzieję, że ta demonstracja była pomocna dla osób, które chcą przetwarzać plik równolegle.

:) Mario