2014-05-23 15 views
8

Próbuję wdrożyć pulę pracowników w Go. Sekcja go-wiki (i Efektywne przejście w sekcji Kanały) oferuje doskonałe przykłady ograniczania wykorzystania zasobów. Po prostu stwórz kanał z buforem, który jest tak duży, jak pula pracownicza. Następnie wypełnij ten kanał pracownikami i po zakończeniu ich prześlij z powrotem do kanału. Odbieranie z bloków kanałów, dopóki pracownik nie jest dostępny. Tak więc kanał i pętla to cała implementacja - bardzo fajnie!Pula zmiennych o rozmiarach idiomatycznych w Go

Alternatywnie można zablokować wysyłanie do kanału, ale sam pomysł.

Moje pytanie dotyczy zmiany rozmiaru puli pracowników podczas jej działania. Nie wierzę, że istnieje sposób na zmianę rozmiaru kanału. Mam pewne pomysły, ale większość z nich wydaje się zbyt skomplikowana. This page faktycznie implementuje semafor przy użyciu kanału i pustych struktur w taki sam sposób, ale ma ten sam problem (te rzeczy pojawiają się cały czas podczas wyszukiwania w Google dla "semafora golang".)

+0

dlaczego chcesz mieć pulę pracowników o zmiennych rozmiarach? – fabrizioM

+0

@fabrizioM Pracownicy robią bardzo mało - kontrolują tylko kilka procesów zewnętrznych. Liczba pracowników powinna zależeć od obciążenia maszyn procesami zewnętrznymi i innymi czynnikami (np. Priorytetem jednego rodzaju pracy względem drugiego). – Hut8

+0

W takim przypadku najlepiej jest ustawić inny mechanizm niż puli o regulowanej wielkości. Można na przykład uruchomić goryntynę, która sprawdza RAM lub ładunek komputera docelowego lub uruchamia zadanie, jeśli maszyna jest gotowa i planuje kolejną kontrolę, jeśli nie. Oczywiście nie wiem, co działa, ponieważ tak naprawdę nie wiem, co budujesz. – twotwotwo

Odpowiedz

17

Zrobiłbym to na odwrót. Zamiast odradzać wiele goroutinów (które wciąż wymagają znacznej ilości pamięci) i używać kanału do ich blokowania, modelowałbym robotników jako goroutines i używałbym kanału do dystrybucji pracy. Coś takiego:

package main 

import (
    "fmt" 
    "sync" 
) 

type Task string 

func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) { 
    defer wg.Done() 
    for { 
     select { 
     case task, ok := <-tasks: 
      if !ok { 
       return 
      } 
      fmt.Println("processing task", task) 
     case <-quit: 
      return 
     } 
    } 
} 

func main() { 
    tasks := make(chan Task, 128) 
    quit := make(chan bool) 
    var wg sync.WaitGroup 

    // spawn 5 workers 
    for i := 0; i < 5; i++ { 
     wg.Add(1) 
     go worker(tasks, quit, &wg) 
    } 

    // distribute some tasks 
    tasks <- Task("foo") 
    tasks <- Task("bar") 

    // remove two workers 
    quit <- true 
    quit <- true 

    // add three more workers 
    for i := 0; i < 3; i++ { 
     wg.Add(1) 
     go worker(tasks, quit, &wg) 
    } 

    // distribute more tasks 
    for i := 0; i < 20; i++ { 
     tasks <- Task(fmt.Sprintf("additional_%d", i+1)) 
    } 

    // end of tasks. the workers should quit afterwards 
    close(tasks) 
    // use "close(quit)", if you do not want to wait for the remaining tasks 

    // wait for all workers to shut down properly 
    wg.Wait() 
} 

Dobrym pomysłem może być utworzenie osobnego typu WorkerPool za pomocą wygodnych metod. Ponadto, zamiast type Task string, dość powszechne jest użycie struktury, która zawiera także kanał done, który jest używany do sygnalizowania, że ​​zadanie zostało wykonane pomyślnie.

Edytuj: Zrobiłem trochę więcej i wymyśliłem: http://play.golang.org/p/VlEirPRk8V. Jest to w zasadzie ten sam przykład, z ładniejszym interfejsem API.

+1

Dzięki za to! Wygląda świetnie. –

+1

W tym przypadku możliwe jest, że wszyscy pracownicy otrzymają rezygnację przed przykładami: http://play.golang.org/p/19DKaA4Q6G – Oleg

2

Prosta zmiana, która może się wydawać mieć kanał, który kontroluje, jak duży jest semafor, Odpowiednią częścią są instrukcje select .Jeśli jest więcej pracy z kolejki, przetwarzaj ją z bieżącym semaforem .Jeśli istnieje żądanie zmiany rozmiaru semafora, zmień go i kontynuować przetwarzanie kolejki req z nowym semafora. Zauważ, że stary będzie śmieci zebrane.

package main 

import "time" 
import "fmt" 

type Request struct{ num int } 
var quit chan struct{} = make(chan struct{}) 

func Serve(queue chan *Request, resize chan int, semsize int) { 
    for { 
     sem := make(chan struct{}, semsize) 
     var req *Request 
     select { 
     case semsize = <-resize: 
      { 
       sem = make(chan struct{}, semsize) 
       fmt.Println("changing semaphore size to ", semsize) 
      } 
     case req = <-queue: 
      { 
       sem <- struct{}{} // Block until there's capacity to process a request. 
       go handle(req, sem) // Don't wait for handle to finish. 
      } 
       case <-quit: 
        return 
     } 

    } 
} 

func process(r *Request) { 
    fmt.Println("Handled Request", r.num) 
} 

func handle(r *Request, sem chan struct{}) { 
    process(r) // May take a long time & use a lot of memory or CPU 
    <-sem  // Done; enable next request to run. 
} 

func main() { 
    workq := make(chan *Request, 1) 
    ctrlq := make(chan int) 
    go func() { 
     for i := 0; i < 20; i += 1 { 
      <-time.After(100 * time.Millisecond) 
      workq <- &Request{i} 
     } 
     <-time.After(500 * time.Millisecond) 
      quit <- struct{}{} 
    }() 
    go func() { 
     <-time.After(500 * time.Millisecond) 
     ctrlq <- 10 
    }() 
    Serve(workq, ctrlq, 1) 
} 

http://play.golang.org/p/AHOLlAv2LH

Powiązane problemy