-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathword_counter.py
82 lines (69 loc) · 3.27 KB
/
word_counter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import re
from collections import defaultdict
import aiofiles
class WordCounter(object):
# This pattern takes into account all non-alphanumeric characters
# excluding "_" since it do counts as a "letter" most of the times.
# but including ' (apostrophes), since I decided to think about the words
# with apostrophes as an actual words.
# otherwise we would have a word "s" with some count,
# which does not make any sense.
pattern = re.compile("[^\w']")
# "magic" methods aren't designed to work asynchronously
# but I don't need to have workarounds exactly here, fortunately
def __init__(self, input_file_name,
output_file_name = None,
return_output = False):
# instead of passing arguments around I decided to keep track of
# some values as class attributes.
# should provide some flexibility, without the need to change stuff
# in multiple places in the code.
# But maybe it is antipattern, and the code proves itself unclear.
self.input_file_name = input_file_name
self.output_file_name = output_file_name
self.return_output = return_output
async def process_file(self):
await self.count_words()
if self.return_output:
return (self.words_count, self.sorted_words)
await self.process_output_data()
async def count_words(self):
# The defaultdict just allows us append new, nonexisting key
# in more nicer way than "if not key in dict" or "if not dict.get(key)"
self.words_count = defaultdict(int)
try:
# usage of asyncronous file IO
async with aiofiles.open(self.input_file_name,
encoding="utf8") as input_file:
async for line in input_file:
# using our regex we could substitude
# all not required characters
for word in self.pattern.sub(' ',
line.lower()).split():
self.words_count[word] += 1
except FileNotFoundError as exception:
print(f"The program experienced an error: {exception.strerror}")
print(f"While trying to open the file \"{self.input_file_name}\"")
exit(1)
self.sorted_words = [f"{word[0]}: {word[1]}\n" for word in sorted(
self.words_count.items(),
key=lambda item: (-item[1], item[0])
)]
async def process_output_data(self):
if self.output_file_name:
await self.write_in_output_file()
else:
self.print_on_screen()
def print_on_screen(self):
for word in self.sorted_words:
print(word, end="")
# another usage of asyncronous file IO
async def write_in_output_file(self):
try:
async with aiofiles.open(self.output_file_name, "w+",
encoding="utf8") as output_file:
await output_file.writelines(self.sorted_words)
except OSError as exception:
print(f"The program experienced an error: {exception.strerror}")
print(f"While writing to the file \"{self.output_file_name}\"")
exit(1)