-
Notifications
You must be signed in to change notification settings - Fork 0
/
mr.erl
180 lines (147 loc) · 4.71 KB
/
mr.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
%%%-------------------------------------------------------------------
%%% @author Tudor-Stefan Dragan <[email protected]>
%%% @copyright (C) 2014, Tudor-Stefan Dragan
%%% Created : Oct 2014 by Tudor-Stefan Dragan <[email protected]>
%%%-------------------------------------------------------------------
-module(mr).
-export([start/1, stop/1, job/5]).
%%%% Interface
start(N) ->
%% we get the reducer and mappers form the init function
{Reducer, Mappers} = init(N),
%% after that we get start the coordinator loop
{ok, spawn(fun() -> coordinator_loop(Reducer, Mappers) end)}.
stop(Pid) ->
reply(Pid, stop).
job(CPid, MapFun, RedFun, RedInit, Data) ->
rpc(CPid, {init, MapFun, RedFun, RedInit, Data}),
rpc(CPid, {start, Data}).
%%%% Internal implementation
%%starting the reducer loop function
init(N) ->
init(N,spawn(fun() -> reducer_loop() end), []).
%% initializing the mappers using pattern matching and adding them to the Map list recursively
init(N, Red, Map) when N > 0 ->
init(N-1, Red, [spawn(fun() -> mapper_loop(Red, fun() -> mapper end) end)]++Map);
init(0, Red, Map) -> {Red, Map}. %% returning the reducer and mappers list
%% synchronous communication
rpc(Pid, Request) ->
Pid ! {self(), Request},
receive
{Pid, Response} ->
Response
end.
reply(From, Msg) ->
From ! {self(), Msg}.
reply_ok(From) ->
reply(From, ok).
reply_ok(From, Msg) ->
reply(From, {ok, Msg}).
%% asynchronous communication
async(Pid, Msg) ->
Pid ! Msg.
stop_async(Pid) ->
async(Pid, stop).
data_async(Pid, D) ->
async(Pid, {data, D}).
%%% Coordinator
coordinator_loop(Reducer, Mappers) ->
receive
{From, stop} ->
io:format("~p stopping~n", [self()]),
lists:foreach(fun stop_async/1, Mappers),
stop_async(Reducer),
reply_ok(From);
{From, {init, MapFun, RedFun, RedInit, Data}} ->
% io:format("~p initializing the reducer and mappers~n", [self()]),
%% get the length of the list of elements to analyze
DataLength = length(Data),
%% initialize the reducer
async(Reducer,{self(),{start,RedFun, RedInit, DataLength, From}}),
%% init the mappers
init_mappers(Mappers, MapFun),
%% everything went ok
reply_ok(From),
coordinator_loop(Reducer, Mappers);
{_, {start, Data}} ->
%% send data to the mappers
send_data(Mappers, Data),
coordinator_loop(Reducer, Mappers);
{_, {done, Result, Jid}} ->
%% ok when done
reply_ok(Jid, Result),
coordinator_loop(Reducer, Mappers);
Unknown ->
io:format("[CL] Unknown message: ~p~n",[Unknown]),
coordinator_loop(Reducer, Mappers)
end.
%%% TODO: Implement init_mappers
init_mappers([Mid|Mappers], MapFun)->
%% init a mapper with the MapFun function
async(Mid, {self(), {init, MapFun}}),
%% init the others
init_mappers(Mappers, MapFun);
init_mappers([],_) ->
done.
%%send data to mappers
send_data(Mappers, Data) ->
recursive_send(Mappers, Mappers, Data).
recursive_send(Mappers, [Mid|Queue], [D|Data]) ->
data_async(Mid, D),
recursive_send(Mappers, Queue, Data);
recursive_send(Mappers, [], Data) ->
recursive_send(Mappers, Mappers, Data);
recursive_send(_, _, []) -> ok.
%%% Reducer
reducer_loop() ->
receive
stop ->
% io:format("Reducer ~p stopping~n", [self()]),
ok;
{From, {start, RedFun, RedInit, Len, Jid}} ->
{stop_gather, OverallRes} = gather_data_from_mappers(RedFun, RedInit, Len),
async(From, {self(),{done, OverallRes, Jid}}),
reducer_loop();
{stop_gather, Res} ->
Res,
reducer_loop();
Unknown ->
io:format("[RL] Unknown message: ~p~n",[Unknown]),
reducer_loop()
end.
%%% get data from mappers
gather_data_from_mappers(Fun, Acc, Missing) ->
receive
{stop_gather, Acc} ->
%% return the accumulator
Acc;
{_, {result, ChunkOfData}} ->
Res = (lists:foldl(Fun, Acc, ChunkOfData)),
Miss = Missing - 1,
if Miss >= 1 ->
gather_data_from_mappers(Fun, Res, Miss);
true -> async(self(),{stop_gather,Res})
end;
Unknown ->
io:format("[GDFM] Unknown message: ~p~n",[Unknown]),
gather_data_from_mappers(Fun, Acc, Missing)
end.
%%% Mapper
mapper_loop(Reducer, Fun) ->
receive
stop ->
% io:format("Mapper ~p stopping~n", [self()]),
ok;
{_, {init, NewFun}} ->
%% change the function according to the job at hand
%reply_ok(From),
mapper_loop(Reducer, NewFun);
{data, Data} ->
% io:format("Data: ~p~n", [Data]),
Res = lists:map(Fun,[Data]),
async(Reducer,{self(), {result, Res}}),
mapper_loop(Reducer, Fun);
Unknown ->
io:format("[ML] Unknown message: ~p~n",[Unknown]),
mapper_loop(Reducer, Fun)
end.