2009-09-30 25 views
8
 
(fileNameToCharStream "bigfile" 
|>> fuse [length; 
      splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length; 
      splitBy (fun x -> x = '\n') keepEmpty |>> length; 
     ]) 
    (*fuse "fuses" the three functions to run concurrently*) 
|> run 2 (*forces to run in parallel on two threads*) 
|> (fun [num_chars; num_words; num_lines] -> 
     printfn "%d %d %d" 
      num_chars num_words, num_lines)) 

Chcę, aby ten kod działał w następujący sposób: podzielić oryginalny strumień na dwa dokładnie w środku; następnie dla każdej połowy uruchomić oddzielne obliczenie, że oblicza 3 rzeczy: długość (tj. liczba znaków), liczbę słów, liczbę linii. Jednak nie chcę mieć problemu, jeśli błędnie podzieliłem słowo. To musi być pod opieką . Plik należy przeczytać tylko raz.Równoległe pipelining

Jak zaprogramować określone funkcje i operator | >>? Czy to możliwe?

+0

może się okazać, że USA nie ma jeszcze obudziła, ale do czasu, że warto zajrzeć do słowa kluczowego „async”, aby uzyskać lepsze wyobrażenie o tym, co jest możliwe. – Benjol

+0

Jakie podpisy wyobrażacie sobie bezpiecznik, biegnijcie i | >>? Na przykład, gdzie twoja lista trzech elementów zmienia się w 3-krotne? – Gabriel

+0

Dobrze, mam na myśli: |> (fun [num_chars; num_words; num_lines] -> –

Odpowiedz

8

Wygląda na to, że o to prosisz. Zostawię to do ciebie, aby dowiedzieć się manipulacji ciąg, ale pokażę ci, jak zdefiniować operatora, który wykonuje szereg operacji równolegle.

Krok 1: Napisz fuse funkcja

Twoja funkcja bezpiecznik wydaje się zmapować jedno wejście korzystania z wielu funkcji, które są na tyle łatwe do napisania, co następuje:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list 
let fuse functionList input = [ for f in functionList -> f input] 

pamiętać, że wszystkie funkcje mapowania muszą mieć ten sam typ.

Krok 2: Definiowanie operatora do wykonywania funkcji w równoległym

Standardowa funkcja map równolegle można zapisać następująco:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array 
let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

Według mojej wiedzy, Async.Parallel będzie wykonywał operacje async równolegle, gdzie liczba równoległych zadań wykonywanych w danym momencie jest równa liczbie rdzeni na komputerze (ktoś może mnie poprawić, jeśli się mylę). Tak więc na maszynie dwurdzeniowej powinniśmy mieć co najwyżej 2 wątki uruchomione na moim komputerze, gdy ta funkcja jest wywoływana. Jest to dobra rzecz, ponieważ nie spodziewamy się żadnego przyspieszenia, uruchamiając więcej niż jeden wątek na rdzeń (w rzeczywistości dodatkowe przełączanie kontekstów może spowolnić działanie).

Możemy zdefiniować operator |>> pod względem pmap i fuse:

//val (|>>) : seq<'a> -> seq<('a -> 'b)> -> 'b list array 
let (|>>) input functionList = pmap (fuse functionList) input 

Więc operator |>> zajmuje kilka wejść i mapuje je za pomocą wielu różnych wyjść. Do tej pory, jeśli kładziemy to wszystko razem, otrzymujemy następujące (w FSI):

> let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];; 

val countOccurrences : 'a -> seq<'a> -> int 
val length : string -> int 
val testData : string [] = 
    [|"Juliet is awesome"; "Someone should give her a medal"|] 
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|] 

testOutput zawiera dwa elementy, z których oba były obliczane równolegle.

Etap 3: agregatów elementy do jednego wyjścia

dobrze, więc teraz mają częściowe wyniki przedstawione każdego elementu w naszym tablicy i scalić nasze wyniki cząstkowe w jednym agregacie. Zakładam, że każdy element w tablicy powinien być połączony z tą samą funkcją, ponieważ każdy element na wejściu ma ten sam typ danych.

Oto naprawdę brzydki funkcja Napisałem do pracy:

> let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);; 

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list 

> reduceMany (+) testOutput;; 
val it : int list = [48; 1; 4] 

reduceMany wykonuje sekwencję sekwencji n długości, i zwraca tablicę n-length jako wyjście. Jeśli można myśleć o lepszym sposobem, aby napisać tę funkcję, być moim gościem :)

zdekodować wyjście powyżej:

  • 48 = suma długości moich dwóch ciągów wejściowych. Zauważ, że oryginalny ciąg znaków składał się z 49 znaków, ale dzielił go na "|" zjadłem jeden znak za "|".
  • 1 = suma wszystkich wystąpień "J" na moim wejściu
  • 4 = suma wszystkich wystąpień "O".

Krok 4: Włóż wszystko razem

let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

let fuse functionList input = [ for f in functionList -> f input] 

let (|>>) input functionList = pmap (fuse functionList) input 

let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]) 

let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'] 
    |> reduceMany (+)