-
Notifications
You must be signed in to change notification settings - Fork 1
/
demo_multithreaded_raptor_encode.py
129 lines (114 loc) · 4.99 KB
/
demo_multithreaded_raptor_encode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/python
import os
import glob
import argparse
import progressbar
import multiprocessing
from norec4dna.Encoder import Encoder
from norec4dna.Packet import ParallelPacket
from norec4dna.RU10Encoder import RU10Encoder
from norec4dna.rules.FastDNARules import FastDNARules
from norec4dna.distributions.RaptorDistribution import RaptorDistribution
from norec4dna.ErrorCorrection import nocode, get_error_correction_encode
CHUNK_SIZE = 100
def create_progress_bar(max_value):
widgets = [
progressbar.Percentage(),
progressbar.Bar(),
' Correct: ',
progressbar.Counter(),
', ',
progressbar.Variable('Corrupt'),
', ',
progressbar.AdaptiveETA(), ' ',
progressbar.Timer()
]
return progressbar.ProgressBar(max_value=max_value, widgets=widgets, max_error=False,
redirect_stdout=True).start()
def encode(p_output, file, as_dna=True, error_correction=nocode, insert_header=False,
save_number_of_chunks_in_packet=False, overhead=6.0, clear_output=False, checksum_len_str=None):
number_of_chunks = Encoder.get_number_of_chunks_for_file_with_chunk_size(file, CHUNK_SIZE)
dist = RaptorDistribution(number_of_chunks)
dna_rules = FastDNARules()
if as_dna:
rules = dna_rules
else:
rules = None
x = RU10Encoder(file, number_of_chunks, dist, chunk_size=CHUNK_SIZE, insert_header=insert_header, rules=rules,
error_correction=error_correction, id_len_format="H", number_of_chunks_len_format="B",
save_number_of_chunks_in_packet=save_number_of_chunks_in_packet, checksum_len_str=checksum_len_str)
x.set_overhead_limit(overhead)
x.encode_to_packets()
p_output.send([ParallelPacket.from_packet(packet) for packet in x.encodedPackets])
p_output.send("DONE")
p_output.close()
return 0
if __name__ == "__main__":
# try:
parser = argparse.ArgumentParser()
parser.add_argument(
"--asdna",
help="convert packets to dna and use dna rules",
action="store_true",
required=False,
)
parser.add_argument("filename", metavar="file", type=str, help="the file to Encode")
parser.add_argument("--error_correction", metavar="error_correction", required=False, type=str, default="nocode",
help="Error Correction Method to use; possible values: \
nocode, crc, reedsolomon (default=nocode)")
parser.add_argument("--repair_symbols", metavar="repair_symbols", required=False, type=int, default=2,
help="number of repair symbols for ReedSolomon (default=2)")
parser.add_argument("--insert_header", metavar="insert_header", required=False, type=bool, default=False)
parser.add_argument("--header_crc_str", metavar="header_crc_str", required=False, type=str, default="")
parser.add_argument("--save_number_of_chunks", metavar="save_number_of_chunks", required=False, type=bool,
default=False)
args = parser.parse_args()
_overhead = 6.0
_file = args.filename
_as_dna = args.asdna
_repair_symbols = args.repair_symbols
_insert_header = args.insert_header
_save_number_of_chunks = args.save_number_of_chunks
_header_crc_str = args.header_crc_str
_error_correction = get_error_correction_encode(args.error_correction, _repair_symbols)
print("File to encode: " + str(_file))
from multiprocessing import Process
cores = multiprocessing.cpu_count()
print("[i] Clearing output-folder...")
fulldir, filename = os.path.split(os.path.realpath(_file))
filename = "RU10_" + filename
out_file = os.path.join(fulldir, filename)
if not out_file.endswith("/"):
files = glob.glob(out_file + "/*")
else:
files = glob.glob(out_file + "*")
for f in files:
os.remove(f)
print("[i] Spawning " + str(cores) + " processes:")
processes = []
for core in range(cores):
_p_output, _p_input = multiprocessing.Pipe()
p = Process(target=encode, args=(
_p_output, _file, _as_dna, _error_correction, False, False,
-(1.0 - (1.0 / cores)) + 1.0 * _overhead / cores,
False))
p.start()
print("[" + str(core + 1) + "] started")
processes.append((p, _p_input))
res = set()
for process, _p_input in processes:
done = False
while not done:
inp = _p_input.recv()
if inp != "DONE":
res = res.union(inp)
else:
_p_input.close()
done = True
process.join()
_number_of_chunks = Encoder.get_number_of_chunks_for_file_with_chunk_size(_file, CHUNK_SIZE)
_dist = RaptorDistribution(_number_of_chunks)
tmp = RU10Encoder(_file, _as_dna, distribution=_dist, chunk_size=CHUNK_SIZE, checksum_len_str=_header_crc_str)
tmp.encodedPackets = res
tmp.save_packets(True, save_as_dna=_as_dna, clear_output=True, seed_is_filename=True)
# input("Press Enter to continue ...")