Po zagraniu z agentami F # próbowałem zrobić mapę, korzystając z nich.Mapa Zredukuj z agentami F #
Podstawowa struktura używam to:
- mapa nadzorca, który kolejek całą pracę do zrobienia w swoim stanie i odbiera żądania pracy z mapą pracowników
- redukują przełożonego robi to samo, co mapy nadzorującą zredukuj pracę
- garść map i zredukuj pracowników, którzy mapują i redukują, jeśli ktoś nie wykona swojej pracy, odsyła go z powrotem do odpowiedniego opiekuna, który ma zostać ponownie przetworzony.
pytania zastanawiam się:
- ma to żadnego sensu w porównaniu do bardziej tradycyjnych (jeszcze bardzo ładne) jak zmniejszyć mapie (http://tomasp.net/blog/fsharp- parallel-aggregate.aspx), który używa PSeq?
- sposób, w jaki wdrożyłem mapę i zmniejszam liczbę pracowników, wydaje się brzydki, czy istnieje lepszy sposób?
- wydaje się, że mogę utworzyć 1000 000 pracowników map i 1000 0000 zmniejszyć liczbę pracowników Lol, jak mam wybrać te liczby, im więcej tym lepiej?
dziękuję,
type Agent<'T> = MailboxProcessor<'T>
//This is the response the supervisor
//gives to the worker request for work
type 'work SupervisorResponse =
| Work of 'work //a piece of work
| NoWork//no work left to do
//This is the message to the supervisor
type 'work WorkMsg =
| ToDo of 'work //piles up work in the Supervisor queue
| WorkReq of AsyncReplyChannel<SupervisorResponse<'work>> //'
//The supervisor agent can be interacted with
type AgentOperation =
| Stop //stop the agent
| Status //yield the current status of supervisor
type 'work SupervisorMsg =
| WorkRel of 'work WorkMsg
| Operation of AgentOperation
//Supervises Map and Reduce workers
module AgentSupervisor=
let getNew (name:string) =
new Agent<SupervisorMsg<'work>>(fun inbox -> //'
let rec loop state = async {
let! msg = inbox.Receive()
match msg with
| WorkRel(m) ->
match m with
| ToDo(work) ->
let newState = work:state
return! loop newState
| WorkReq(replyChannel) ->
match state with
| [] ->
replyChannel.Reply(NoWork)
return! loop []
| [item] ->
replyChannel.Reply(Work(item))
return! loop []
| (item::remaining) ->
replyChannel.Reply(Work(item))
return! loop remaining
| Operation(op) ->
match op with
| Status ->
Console.WriteLine(name+" current Work Queue "+
string (state.Length))
return! loop state
| Stop ->
Console.WriteLine("Stoppped SuperVisor Agent "+name)
return()
}
loop [])
let stop (agent:Agent<SupervisorMsg<'work>>) = agent.Post(Operation(Stop))
let status (agent:Agent<SupervisorMsg<'work>>) =agent.Post(Operation(Status))
//Code for the workers
type 'success WorkOutcome =
| Success of 'success
| Fail
type WorkerMsg =
| Start
| Stop
| Continue
module AgentWorker =
type WorkerSupervisors<'reduce,'work> =
{ Map:Agent<SupervisorMsg<'work>> ; Reduce:Agent<SupervisorMsg<'reduce>> }
let stop (agent:Agent<WorkerMsg>) = agent.Post(Stop)
let start (agent:Agent<WorkerMsg>) = agent.Start()
agent.Post(Start)
let getNewMapWorker(map, supervisors:WorkerSupervisors<'reduce,'work> ) =
new Agent<WorkerMsg>(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Start -> inbox.Post(Continue)
return! loop()
| Continue ->
let! supervisorOrder =
supervisors.Map.PostAndAsyncReply(
fun replyChannel ->
WorkRel(WorkReq(replyChannel)))
match supervisorOrder with
| Work(work) ->
let! res = map work
match res with
| Success(toReduce) ->
supervisors.Reduce
.Post(WorkRel(ToDo(toReduce)))
| Fail ->
Console.WriteLine("Map Fail")
supervisors.Map
.Post(WorkRel(ToDo(work)))
inbox.Post(Continue)
| NoWork ->
inbox.Post(Continue)
return! loop()
| Stop ->
Console.WriteLine("Map worker stopped")
return()
}
loop() )
let getNewReduceWorker(reduce,reduceSupervisor:Agent<SupervisorMsg<'work>>)=//'
new Agent<WorkerMsg>(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Start -> inbox.Post(Continue)
return! loop()
| Continue ->
let! supervisorOrder =
reduceSupervisor.PostAndAsyncReply(fun replyChannel ->
WorkRel(WorkReq(replyChannel)))
match supervisorOrder with
| Work(work) ->
let! res = reduce work
match res with
| Success(toReduce) -> inbox.Post(Continue)
| Fail ->
Console.WriteLine("ReduceFail")
reduceSupervisor.Post(WorkRel(ToDo(work)))
inbox.Post(Continue)
| NoWork -> inbox.Post(Continue)
return! loop()
|Stop ->Console.WriteLine("Reduce worker stopped"); return()
}
loop())
open AgentWorker
type MapReduce<'work,'reduce>(numberMap:int ,
numberReduce: int,
toProcess:'work list,
map:'work->Async<'reduce WorkOutcome>,
reduce:'reduce-> Async<unit WorkOutcome>) =
let mapSupervisor= AgentSupervisor.getNew("MapSupervisor")
let reduceSupervisor = AgentSupervisor.getNew("ReduceSupervisor")
let workerSupervisors = {Map = mapSupervisor ; Reduce = reduceSupervisor }
let mapWorkers =
[for i in 1..numberMap ->
AgentWorker.getNewMapWorker(map,workerSupervisors) ]
let reduceWorkers =
[for i in 1..numberReduce ->
AgentWorker.getNewReduceWorker(reduce,workerSupervisors.Reduce) ]
member this.Start() =
//Post work to do
toProcess
|>List.iter(fun elem -> mapSupervisor.Post(WorkRel(ToDo(elem))))
//Start supervisors
mapSupervisor.Start()
reduceSupervisor.Start()
//start workers
List.iter(fun mapper -> mapper |>start) mapWorkers
List.iter(fun reducer ->reducer|>start) reduceWorkers
member this.Status() = (mapSupervisor|>AgentSupervisor.status)
(reduceSupervisor|>AgentSupervisor.status)
member this.Stop() =
List.map2(fun mapper reducer ->
mapper |>stop; reducer|>stop) mapWorkers reduceWorkers
//Run some tests
let map = function (n:int64) -> async{ return Success(n) }
let reduce = function (toto: int64) -> async{ return Success() }
let mp = MapReduce<int64,int64>(1,1,[for i in 1L..1000000L->i],map,reduce)
mp.Start()
mp.Status()
mp.Stop()
FYI, nie czytam pytania, które zawiera więcej niż 120 wierszy (źle sformatowanego) kodu. – Brian
@Brian, przepraszam za bałagan, próbowałem trochę zmienić format, ale nadal mam problem z kolorem, który sprawia, że wszystko jest okropnie brzydkie. Uwaga Nie mam najmniejszej nadziei, że ktokolwiek przeczyta wszystkie 120 linii mojego kodu, po prostu umieściłem go na wszelki wypadek, aby mógł odpowiedzieć na moje pytanie. Dzięki – jlezard
Wprowadziłem kilka zmian, aby oczyścić go jeszcze bardziej. W szczególności używa mniej poziome i pionowe białe znaki (bez przewijania w prawo, bez wielu pustych linii w rzędzie). Zwróć też uwagę na użycie // 'jako sposobu na uniknięcie błędów związanych z kolorowaniem wielu linii. – Brian