Po zagraniu z agentami F # próbowałem zrobić mapę, korzystając z nich.Mapa Zredukuj z agentami F #

Podstawowa struktura używam to:

  • mapa nadzorca, który kolejek całą pracę do zrobienia w swoim stanie i odbiera żądania pracy z mapą pracowników
  • redukują przełożonego robi to samo, co mapy nadzorującą zredukuj pracę
  • garść map i zredukuj pracowników, którzy mapują i redukują, jeśli ktoś nie wykona swojej pracy, odsyła go z powrotem do odpowiedniego opiekuna, który ma zostać ponownie przetworzony.

pytania zastanawiam się:

  • ma to żadnego sensu w porównaniu do bardziej tradycyjnych (jeszcze bardzo ładne) jak zmniejszyć mapie (http://tomasp.net/blog/fsharp- parallel-aggregate.aspx), który używa PSeq?
  • sposób, w jaki wdrożyłem mapę i zmniejszam liczbę pracowników, wydaje się brzydki, czy istnieje lepszy sposób?
  • wydaje się, że mogę utworzyć 1000 000 pracowników map i 1000 0000 zmniejszyć liczbę pracowników Lol, jak mam wybrać te liczby, im więcej tym lepiej?


type Agent<'T> = MailboxProcessor<'T> 

//This is the response the supervisor 
//gives to the worker request for work 
type 'work SupervisorResponse = 
| Work of 'work //a piece of work 
| NoWork//no work left to do 

//This is the message to the supervisor 
type 'work WorkMsg = 
| ToDo of 'work //piles up work in the Supervisor queue 
| WorkReq of AsyncReplyChannel<SupervisorResponse<'work>> //' 

//The supervisor agent can be interacted with 
type AgentOperation = 
| Stop //stop the agent 
| Status //yield the current status of supervisor 

type 'work SupervisorMsg = 
| WorkRel of 'work WorkMsg 
| Operation of AgentOperation 

//Supervises Map and Reduce workers 
module AgentSupervisor= 
    let getNew (name:string) = 
     new Agent<SupervisorMsg<'work>>(fun inbox -> //' 
      let rec loop state = async { 
       let! msg = inbox.Receive() 
       match msg with 
       | WorkRel(m) -> 
        match m with 
        | ToDo(work) -> 
         let newState = work:state 
         return! loop newState 
        | WorkReq(replyChannel) -> 
         match state with 
         | [] -> 
          return! loop [] 
         | [item] -> 
          return! loop [] 
         | (item::remaining) -> 
          return! loop remaining 
       | Operation(op) -> 
        match op with 
        | Status -> 
         Console.WriteLine(name+" current Work Queue "+ 
              string (state.Length)) 
         return! loop state 
        | Stop -> 
         Console.WriteLine("Stoppped SuperVisor Agent "+name) 
      loop []) 
    let stop (agent:Agent<SupervisorMsg<'work>>) = agent.Post(Operation(Stop)) 
    let status (agent:Agent<SupervisorMsg<'work>>) =agent.Post(Operation(Status)) 

//Code for the workers 
type 'success WorkOutcome = 
| Success of 'success 
| Fail 

type WorkerMsg = 
| Start 
| Stop 
| Continue 

module AgentWorker = 
    type WorkerSupervisors<'reduce,'work> = 
     { Map:Agent<SupervisorMsg<'work>> ; Reduce:Agent<SupervisorMsg<'reduce>> } 

    let stop (agent:Agent<WorkerMsg>) = agent.Post(Stop) 
    let start (agent:Agent<WorkerMsg>) = agent.Start() 

    let getNewMapWorker(map, supervisors:WorkerSupervisors<'reduce,'work> ) = 
     new Agent<WorkerMsg>(fun inbox -> 
      let rec loop() = async { 
       let! msg = inbox.Receive() 
       match msg with 
       | Start -> inbox.Post(Continue) 
          return! loop() 
       | Continue -> 
        let! supervisorOrder = 
         fun replyChannel -> 
        match supervisorOrder with 
        | Work(work) -> 
         let! res = map work 
         match res with 
         | Success(toReduce) -> 
         | Fail -> 
          Console.WriteLine("Map Fail") 
        | NoWork -> 
          return! loop() 
       | Stop -> 
        Console.WriteLine("Map worker stopped") 
      loop() ) 

    let getNewReduceWorker(reduce,reduceSupervisor:Agent<SupervisorMsg<'work>>)=//' 
     new Agent<WorkerMsg>(fun inbox -> 
      let rec loop() = async { 
       let! msg = inbox.Receive() 
       match msg with 
       | Start -> inbox.Post(Continue) 
          return! loop() 
       | Continue -> 
        let! supervisorOrder = 
         reduceSupervisor.PostAndAsyncReply(fun replyChannel -> 
        match supervisorOrder with 
        | Work(work) -> 
         let! res = reduce work 
         match res with 
         | Success(toReduce) -> inbox.Post(Continue) 
         | Fail -> 
        | NoWork -> inbox.Post(Continue) 
        return! loop() 
       |Stop ->Console.WriteLine("Reduce worker stopped"); return() 

open AgentWorker 

type MapReduce<'work,'reduce>(numberMap:int , 
           numberReduce: int, 
           toProcess:'work list, 
           map:'work->Async<'reduce WorkOutcome>, 
           reduce:'reduce-> Async<unit WorkOutcome>) = 

    let mapSupervisor= AgentSupervisor.getNew("MapSupervisor") 
    let reduceSupervisor = AgentSupervisor.getNew("ReduceSupervisor") 

    let workerSupervisors = {Map = mapSupervisor ; Reduce = reduceSupervisor } 

    let mapWorkers = 
     [for i in 1..numberMap -> 
      AgentWorker.getNewMapWorker(map,workerSupervisors) ] 
    let reduceWorkers = 
     [for i in 1..numberReduce -> 
      AgentWorker.getNewReduceWorker(reduce,workerSupervisors.Reduce) ] 

    member this.Start() = 
     //Post work to do 
     |>List.iter(fun elem -> mapSupervisor.Post(WorkRel(ToDo(elem)))) 
     //Start supervisors 
     //start workers 
     List.iter(fun mapper -> mapper |>start) mapWorkers 
     List.iter(fun reducer ->reducer|>start) reduceWorkers 

    member this.Status() = (mapSupervisor|>AgentSupervisor.status) 
    member this.Stop() = 
     List.map2(fun mapper reducer -> 
      mapper |>stop; reducer|>stop) mapWorkers reduceWorkers 

//Run some tests 
let map = function (n:int64) -> async{ return Success(n) } 

let reduce = function (toto: int64) -> async{ return Success() } 

let mp = MapReduce<int64,int64>(1,1,[for i in 1L..1000000L->i],map,reduce) 


Lubię używać MailboxProcessor dla części zmniejszenia algorytmu i bloku asynchronicznym, która jest wywoływana z Async.Parallel dla części mapy. Sprawia, że ​​rzeczy bardziej wyraźne, co daje lepszą kontrolę nad obsługą wyjątków, limitów czasu i anulowania.

Poniższy kod został zaprojektowany z pomocą Briana, a przy pomocy jego doskonałego bloku F # podświetlającego wtyczkę "F # Depth Colorizer" dla VS2010.

Ten kod służy do pobierania kanałów RSS z serwera pogodowego yahoo w schemacie zmniejszania liczby map. Pokazuje, w jaki sposób możemy kontrolować przepływ wykonawczy z zewnątrz rzeczywistego algorytmu.

fetchWeather jest częścią mapy, a funkcja mailboxLoop redukuje część algorytmu.

#r "System.Xml.Linq.dll" 

#r "FSharp.PowerPack.dll" 

open System 
open System.Diagnostics 
open System.IO 
open System.Linq 
open System.Net 
open System.Xml.Linq 

open Microsoft.FSharp.Control.WebExtensions 

type Weather (city, region, temperature) = class 
    member x.City = city 
    member x.Region = region 
    member x.Temperature : int = temperature 

    override this.ToString() = 
     sprintf "%s, %s: %d F" this.City this.Region this.Temperature 

type MessageForActor = 
    | ProcessWeather of Weather 
    | ProcessError of int 
    | GetResults of (Weather * Weather * Weather list) AsyncReplyChannel 

let parseRss woeid (rssStream : Stream) = 
    let xn str = XName.Get str 
    let yweather elementName = XName.Get(elementName, "http://xml.weather.yahoo.com/ns/rss/1.0") 

    let channel = (XDocument.Load rssStream).Descendants(xn "channel").First() 
    let location = channel.Element(yweather "location") 
    let condition = channel.Element(xn "item").Element(yweather "condition") 

    // If the RSS server returns error, condition XML element won't be available. 
    if not(condition = null) then 
     let temperature = Int32.Parse(condition.Attribute(xn "temp").Value) 
     ProcessWeather(new Weather(
        location.Attribute(xn "city").Value, 
        location.Attribute(xn "region").Value, 

let fetchWeather (actor : MessageForActor MailboxProcessor) woeid = 
    async { 
     let rssAddress = sprintf "http://weather.yahooapis.com/forecastrss?w=%d&u=f" woeid 
     let webRequest = WebRequest.Create rssAddress 
     use! response = webRequest.AsyncGetResponse() 
     use responseStream = response.GetResponseStream() 
     let weather = parseRss woeid responseStream 
     //do! Async.Sleep 1000 // enable this line to see amplified timing that proves concurrent flow 

let mailboxLoop initialCount = 
    let chooseCityByTemperature op (x : Weather) (y : Weather) = 
     if op x.Temperature y.Temperature then x else y 

    let sortWeatherByCityAndState (weatherList : Weather list) = 
     |> List.sortWith (fun x y -> x.City.CompareTo(y.City)) 
     |> List.sortWith (fun x y -> x.Region.CompareTo(y.Region)) 

    MailboxProcessor.Start(fun inbox -> 
     let rec loop minAcc maxAcc weatherList remaining = 
     async { 
      let! message = inbox.Receive() 
      let remaining = remaining - 1 

      match message with 
      | ProcessWeather weather -> 
       let colderCity = chooseCityByTemperature (<) minAcc weather 
       let warmerCity = chooseCityByTemperature (>) maxAcc weather 
       return! loop colderCity warmerCity (weather :: weatherList) remaining 
      | ProcessError woeid -> 
       let errorWeather = new Weather(sprintf "Error with woeid=%d" woeid, "ZZ", 99999) 
       return! loop minAcc maxAcc (errorWeather :: weatherList) remaining 
      | GetResults replyChannel -> 
       replyChannel.Reply(minAcc, maxAcc, sortWeatherByCityAndState weatherList) 

     let minValueInitial = new Weather("", "", Int32.MaxValue) 
     let maxValueInitial = new Weather("", "", Int32.MinValue) 
     loop minValueInitial maxValueInitial [] initialCount 

let RunSynchronouslyWithExceptionAndTimeoutHandlers computation = 
    let timeout = 30000 
     Async.RunSynchronously(Async.Catch(computation), timeout) 
     |> function Choice1Of2 answer    -> answer |> ignore 
       | Choice2Of2 (except : Exception) -> printfn "%s" except.Message; printfn "%s" except.StackTrace; exit -4 
    | :? System.TimeoutException -> printfn "Timed out waiting for results for %d seconds!" <| timeout/1000; exit -5 

let main = 
    // Should have script name, sync/async select, and at least one woeid 
    if fsi.CommandLineArgs.Length < 3 then 
     printfn "Expecting at least two arguments!" 
     printfn "There were %d arguments" (fsi.CommandLineArgs.Length - 1) 
     exit -1 

    let woeids = 
     |> Seq.skip 2 // skip the script name and sync/async select 
     |> Seq.map Int32.Parse 
     |> Seq.toList 
     | except -> printfn "One of supplied arguments was not an integer: %s" except.Message; exit -2 

    let actor = mailboxLoop woeids.Length 

    let processWeatherItemsConcurrently woeids = 
     |> Seq.map (fetchWeather actor) 
     |> Async.Parallel 
     |> RunSynchronouslyWithExceptionAndTimeoutHandlers 

    let processOneWeatherItem woeid = 
     |> fetchWeather actor 
     |> RunSynchronouslyWithExceptionAndTimeoutHandlers 

    let stopWatch = new Stopwatch() 
    match fsi.CommandLineArgs.[1].ToUpper() with 
    | "C" -> printfn "Concurrent execution: "; processWeatherItemsConcurrently woeids 
    | "S" -> printfn "Synchronous execution: "; woeids |> Seq.iter processOneWeatherItem 
    | _ -> printfn "Unexpected run options!"; exit -3 

    let (min, max, weatherList) = actor.PostAndReply GetResults 
    assert (weatherList.Length = woeids.Length) 

    printfn "{" 
    weatherList |> List.iter (printfn " %O") 
    printfn "}" 
    printfn "Coldest place: %O" min 
    printfn "Hottest place: %O" max 
    printfn "Completed in %d millisec" stopWatch.ElapsedMilliseconds 
