-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmapreduce.py
229 lines (205 loc) · 7.86 KB
/
mapreduce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# Single-machine but multi-process mapreduce library using python multiprocessing Pool.
# http://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool
#
# Inspired by http://mikecvet.wordpress.com/2010/07/02/parallel-mapreduce-in-python, but
# this design follows the MapReduce paper more closely.
#
# The master (run()) creates a pool of processes and invokes maptask Map tasks.
# Each Map task applies Map() to 1/maptask-th of the input file, and partitions its
# output in reducetask regions, for a total of maptask x reducetask Reduce
# regions (each stored in a separate file). The master then creates reducetask
# Reduce tasks. Each Reduce task reads maptask regions, sorts the keys, and
# applies Reduce to each key, producing one output file. The reducetask output
# files can be merged by invoking Merge().
import sys
import os
import pickle
import string
from multiprocessing import Pool
# A class for the MapReduce framwork
class MapReduce(object):
def __init__(self, m, r, path):
self.maptask = m
self.reducetask = r
self.path = path
self.Split(path)
# Splits input in mapworker temporary files, each with a keyvalue, respecting word boundaries
# The keyvalue is the byte offset in the input file. User of class can overwrite the default.
def Split(self, path):
size = os.stat(self.path).st_size;
chunk = size / self.maptask
chunk += 1
f = open(self.path, "r")
buffer = f.read()
f.close()
f = open("#split-%s-%s" % (self.path, 0), "w+")
f.write(str(0) + "\n")
i = 0
m = 1
for c in buffer:
f.write(c)
i += 1
if (c in string.whitespace) and (i > chunk * m):
f.close()
m += 1
f = open("#split-%s-%s" % (self.path, m-1), "w+")
f.write(str(i) + "\n")
f.close()
# Maps value into into a list of (key, value) pairs
# To be defined by user of class
def Map(self, keyvalue, value):
pass
# Determines the default reduce task that will receive (key, value)
# User of class can overwrite the default.
def Partition(self, item):
return hash(item[0]) % self.reducetask
# Reduces all pairs for one key [(key, value), ...])
# To be defined by user of class
def Reduce(self, key, keyvalues):
pass
# Optionally merge all reduce partitions into a single output file
# A better implementation would do a merge sort of the reduce partitions,
# since each partition has been sorted by key.
def Merge(self):
out = []
for r in xrange(0, self.reducetask):
f = open("#reduce-%s-%d" % (self.path, r), "r")
out = out + pickle.load(f)
f.close()
os.unlink("#reduce-%s-%d" % (self.path, r))
out = sorted(out, key=lambda pair: pair[0])
return out
# Load a mapper's split and apply Map to it
def doMap(self, i):
f = open("#split-%s-%s" % (self.path, i), "r")
keyvalue = f.readline()
value = f.read()
f.close()
os.unlink("#split-%s-%s" % (self.path, i))
keyvaluelist = self.Map(keyvalue, value)
for r in range(0, self.reducetask):
# print "map", i, "#map-%s-%s-%d" % (self.path, i, r)
f = open("#map-%s-%s-%d" % (self.path, i, r), "w+")
itemlist = [item for item in keyvaluelist if self.Partition(item) == r]
pickle.dump(itemlist, f)
f.close()
return [(i, r) for r in range(0, self.reducetask)]
# Get reduce regions from maptasks, sort by key, and apply Reduce for each key
def doReduce(self, i):
keys = {}
out = []
for m in range(0, self.maptask):
# print "reduce", i, "#map-%s-%s-%d" % (self.path, m, i)
f = open("#map-%s-%s-%d" % (self.path, m, i), "r")
itemlist = pickle.load(f)
for item in itemlist:
if keys.has_key(item[0]):
keys[item[0]].append(item)
else:
keys[item[0]] = [item]
f.close()
os.unlink("#map-%s-%s-%d" % (self.path, m, i))
for k in sorted(keys.keys()):
out.append(self.Reduce(k, keys[k]))
f = open("#reduce-%s-%d" % (self.path, i), "w+")
pickle.dump(out, f)
f.close()
return i
# The master.
def run(self):
pool = Pool(processes=max(self.maptask, self.reducetask),)
regions = pool.map(self.doMap, range(0, self.maptask))
partitions = pool.map(self.doReduce, range(0, self.reducetask))
# An instance of the MapReduce framework. It performs word count on title-cased words.
class WordCount(MapReduce):
def __init__(self, maptask, reducetask, path):
MapReduce.__init__(self, maptask, reducetask, path)
# Produce a (key, value) pair for each title word in value
def Map(self, keyvalue, value):
results = []
i = 0
n = len(value)
while i < n:
# skip non-ascii letters in C/C++ style a la MapReduce paper:
while i < n and value[i] not in string.ascii_letters:
i += 1
start = i
while i < n and value[i] in string.ascii_letters:
i += 1
w = value[start:i]
if start < i and w.istitle():
results.append ((w.lower(), 1))
return results
# Reduce [(key,value), ...])
def Reduce(self, key, keyvalues):
return (key, sum(pair[1] for pair in keyvalues))
class ReverseIndex(MapReduce):
def __init__(self, maptask, reducetask, path):
MapReduce.__init__(self, maptask, reducetask, path)
# Produce a (key, value) pair for each word in value
# TODO: your code here
def Map(self, keyvalue, value):
results = []
i = 0
n = len(value)
while i < n:
# skip non-ascii letters in C/C++ style a la MapReduce paper:
while i < n and value[i] not in string.ascii_letters:
i += 1
start = i
while i < n and value[i] in string.ascii_letters:
i += 1
w = value[start:i]
if start < i and w.istitle():
results.append ((w.lower(), start+int(keyvalue)))
return results
# Reduce [(key,value), ...])
# TODO: your code here
def Reduce(self, key, keyvalues):
valuelist = [item[1] for item in keyvalues]
return (key, sorted(valuelist))
# Python doesn't pickle method instance by default, so here you go:
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
# Run WordCount instance
if __name__ == '__main__':
if (len(sys.argv) != 2):
print "Program requires path to file for reading!"
sys.exit(1)
# Modify the following code to produce a correct output of the last section
# TODO: your code here
# Create a WordCount MapReduce program
#wc = WordCount(4, 2, sys.argv[1])
wc = ReverseIndex(4, 2, sys.argv[1])
# Run it
wc.run()
# Merge out of Reduce tasks:
out = wc.Merge()
if isinstance(wc, WordCount):
# Sort by word count:
out = sorted(out, key=lambda pair: pair[1], reverse=True)
# Print top 20:
print "WordCount:"
for pair in out[0:20]:
print pair[0], pair[1]
else:
out = sorted(out, key=lambda pair: pair[0], reverse=True)
# Print top 20:
print "ReverseIndex:"
for pair in out[0:20]:
print pair[0], pair[1]