-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathznc-log-aggregator
executable file
·217 lines (183 loc) · 8.63 KB
/
znc-log-aggregator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#!/usr/bin/env python
import subprocess as sp, contextlib as cl, datetime as dt, functools as ft
import os, sys, re, tempfile, logging, pathlib as pl
def rel_path(p):
p0, p = pl.Path('.').resolve(), p.resolve()
return p.relative_to(p0) if p.is_relative_to(p0) else p
def sort_logfiles_readline(p):
'First timestamp in a file can be used for ordering'
with p.open() as src:
while not (s := src.readline().strip()): pass
return s
def sort_logfiles(files):
'Order files with possibly-duplicate date'
files_dup = dict()
for ts, p in files:
if ts not in files_dup: files_dup[ts] = list()
files_dup[ts].append(p)
for ts, ps in sorted(files_dup.items()):
if len(ps) == 1: yield ts, ps; continue
else: yield ts, sorted(ps, key=sort_logfiles_readline)
def agg_logs(ctx, p_logs, logs, fn_tpl):
'Read/compress all data to .xz tempfiles'
ts_now, chan_agg = dt.date.today(), dict() # {p_xz: [(ts, p), ...]}
log = logging.getLogger('zla.agg')
for (net, chan), files in sorted(logs.items()):
if chat := chan[0] in '#&': # selects chan-type as well
# Strip/replace chan prefixes - don't work well with shells
if chan.startswith('#'): chan = chan[1:]
while chan.startswith('#'): chan = '_' + chan[1:]
while chan.startswith('&'): chan = '+' + chan[1:]
agg_files = dict()
for ts, ps in sort_logfiles(files):
if ts == ts_now: continue # skip current log(s)
fn = fn_tpl.format(
net=net, type='chat' if chat else 'priv',
chan=chan, yy=ts.year % 100, mm=ts.month )
agg_files.setdefault(fn, list()).extend((ts, p) for p in ps)
for fn, agg_files in agg_files.items():
if (p_xz := p_logs / fn) not in chan_agg: chan_agg[p_xz] = agg_files; continue
log.warning(f'Diff nets/chans with same dst file [ {p_xz.name} ]')
chan_agg[p_xz].extend(agg_files); chan_agg[p_xz].sort()
xz_tuples = list() # (sources, tmp_xz, dst_xz) for final mv+rm
for p_xz, agg_files in sorted(chan_agg.items()):
log.debug(f'Aggregating channel [+ {len(agg_files)} log(s)]: {p_xz.name}')
if not p_xz.parent.is_dir(): p_xz.parent.mkdir(parents=True)
with tempfile.NamedTemporaryFile(
dir=p_xz.parent, prefix=p_xz.name+'.', delete=False ) as tmp:
ctx.callback((p_tmp := pl.Path(tmp.name)).unlink, missing_ok=True)
agg_logs_chan(tmp, p_xz, agg_files)
agg_files = sorted(set(p for ts, p in agg_files))
xz_tuples.append((agg_files, p_tmp, p_xz))
return xz_tuples
def agg_logs_chan(tmp, p_xz, agg_files, xz_bs=2 * 2**20):
'''Aggregate logs for one channel into
monthly xz file, reusing pre-existing part of it, if any.'''
if p_xz.exists(): # copy old part first
with p_xz.open('rb') as src:
for chunk in iter(ft.partial(src.read, xz_bs), b''): tmp.write(chunk)
tmp.flush()
xz = sp.Popen( # append compressed new files
['xz', '--compress', '--format=xz', '--check=sha256'],
stdin=sp.PIPE, stdout=tmp )
line_pat = re.compile(br'^(\s*\[)(\d{2}:\d{2}(:\d{2})?\]\s+)')
for ts, p in agg_files:
line, line_sub = b'', ( fr'\g<1>{ts.year%100:02d}'
fr'-{ts.month:02d}-{ts.day:02d} \g<2>' ).encode()
with p.open('rb') as src:
for line in iter(src.readline, b''):
xz.stdin.write(line_pat.sub(line_sub, line))
if line and not line.endswith(b'\n'): xz.stdin.write(b'\n')
xz.stdin.flush(); xz.stdin.close()
if xz.wait(): raise AssertionError(f'xz failed for [ {p_xz} ]')
tmp.seek(0)
sp.run( # make sure decompression of the whole thing works
['xz', '--test', '--format=xz', '--check=sha256'],
check=True, stdin=tmp )
def find_logs():
p_znc, logs = pl.Path('.').resolve(), dict()
def logs_append(user, net, chan, ts_str, p):
if net == 'default' and user: net = user
ts = dt.date(*map(int, [ts_str[:4], ts_str[4:6], ts_str[6:8]]))
if not (len(ts_str) == 8 and 2050 > ts.year >= 2000):
raise AssertionError(ts_str)
logs.setdefault((net, chan), list()).append((ts, p.resolve()))
def re_chk(re_str, p, *groups):
if not (m := re.search(re_str, str(p))): raise AssertionError(re_str, p)
if groups: return (m.group(k) for k in groups)
return m
# Prehistoric logs
for p in p_znc.glob('users/*/moddata/log/*.log'):
net, chan, ts = re_chk(
r'/users/(?P<net>[^/]+)/moddata/'
r'log/(?P<chan>[^/]+)_(?P<ts>\d{8})\.log$',
p, 'net', 'chan', 'ts' )
logs_append(None, net, chan, ts, p)
# Logs for old-style setup with user=network, multi-network users
for p in p_znc.glob('moddata/log/*.log'):
user, net, chan, ts = re_chk(
r'/moddata/log/(?P<user>[^/]+?)'
r'_(?P<net>[^/]+?)_(?P<chan>[^/]+)_(?P<ts>\d{8})\.log$',
p, 'user', 'net', 'chan', 'ts' )
if '_' in f'{user}{net}': raise AssertionError(net, p)
logs_append(user, net, chan, ts, p)
# Modern logs enabled globally for multi-network users
# Can also be enabled by user: users/*/moddata/log/*/*/*.log
# Can also be enabled by network: users/*/networks/*/moddata/log/*/*.log
# All these are redundant with globally-enabled module, so not used here
for p in p_znc.glob('moddata/log/*/*/*/*.log'):
user, net, chan, ts = re_chk(
r'/moddata/log/(?P<user>[^/]+?)/'
r'(?P<net>[^/]+?)/(?P<chan>[^/]+)/(?P<ts>\d{4}-\d{2}-\d{2})\.log$',
p, 'user', 'net', 'chan', 'ts' )
if '_' in f'{user}{net}': raise AssertionError(net, p)
logs_append(user, net, chan, ts.replace('-', ''), p)
return logs
def main(args=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter, description=dd('''
Collect ZNC logs in one dir (-d/--log-dir),
prefixing, aggregating and compressing them as necessary.
Aggregates and creates all files with tmp-suffixes first,
then renames them all to final names, then removes all temp files,
then removes all processed src logs - strictly in that order,
with errors interrupting the process safely for actual data.
Dry-run mode only disables "rename" and "remove src logs" steps,
but still does all the work exactly as it would without these couple lines.
Temp files are always created and cleaned-up, even if not used.
Never overwrites or updates any files in-place.
Lines in aggregated files are processed to have full ISO-8601
date/time prefix, not just time-of-day or similar truncated variants.'''))
parser.add_argument('-s', '--znc-home', metavar='dir', default='~znc',
help='Path to ZNC home directory (default: %(default)s).')
parser.add_argument('-d', '--log-dir', metavar='dir', default='~znc/logs',
help='Path to destination directory'
' to store aggregated logs to (default: %(default)s).')
parser.add_argument('-n', '--log-name', metavar='fn-tpl',
default='{net}/{type}/{chan}__{yy:02d}-{mm:02d}.log.xz', help=dd('''
Name template for xz-compressed log files created under -d/--log-dir.
Uses python str.format formatting - https://pyformat.info/
Available templating keys: net, type ("chat" or "priv"), chan, yy, mm.
Default: %(default)s'''))
parser.add_argument('--dry-run', action='store_true',
help='Do all the stuff, but dont actually create any non-tmp files.')
parser.add_argument('--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
logging.basicConfig(
format='%(asctime)s :: %(name)s :: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG if opts.debug else logging.INFO )
log = logging.getLogger('zla.main')
p_logs = pl.Path(opts.log_dir).expanduser().resolve()
p_logs.mkdir(exist_ok=True)
os.chdir(pl.Path(opts.znc_home).expanduser()) # strips it for rel_path logging
dry_run = ' [DRY-RUN]' if opts.dry_run else ''
log.debug(f'Finding and normalizing log sources [ {p_logs} ]')
logs = find_logs()
agg_files_set = set()
with cl.ExitStack() as ctx:
log.debug(f'Aggregating log data [ {p_logs} ]')
xz_tuples = agg_logs(ctx, p_logs, logs, opts.log_name)
log.debug(f'Moving new xz files into place ({len(xz_tuples):,d}){dry_run}')
for agg_files, p_xz_tmp, p_xz in xz_tuples:
if log.isEnabledFor(logging.DEBUG):
sz1, sz0 = ( p_xz_tmp.stat().st_size,
p_xz.stat().st_size if p_xz.exists() else 0 )
log.debug(f' {rel_path(p_xz)} [ {sz0:,d} -> {sz1:,d} B]')
agg_lines, agg_sz = list(), 0
for p in agg_files:
agg_sz += (sz := p.stat().st_size)
agg_lines.append(f' {rel_path(p)} [ {sz:,d} B]')
log.debug( f' <- {p_xz_tmp.name}'
f' [+ {agg_sz:,d} -> {sz1-sz0:,d} B] [+ {len(agg_files)} log(s)]' )
for msg in agg_lines: log.debug(msg)
if not dry_run: p_xz_tmp.rename(p_xz)
agg_files_set.update(agg_files)
log.debug(f'Removing aggregated source files ({len(agg_files_set):,d}){dry_run}')
for p in agg_files_set:
if not dry_run: p.unlink()
log.debug('Finished')
if __name__ == '__main__': sys.exit(main())