-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathunix-socket-links
executable file
·142 lines (123 loc) · 6.24 KB
/
unix-socket-links
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python
import os, sys, re, subprocess as sp, collections as cs
conn_t = cs.namedtuple('Conn', 'src srcn dst dstn st rx tx procs')
proc_t = cs.namedtuple('Proc', 'name pid')
def parse_procs(procs) -> [proc_t]:
'Parse ss "users:(...)" info to a list of processes'
if not (m := re.fullmatch(r'users:\((.+)\)', procs)): return [proc_t(procs, 0)]
proc_list, m = list(), m[1]
while m:
if not (mp := re.match(r',*\("([^"]+)",pid=(\d+),fd=\d+\)', m)): break
m = m[mp.end():]
proc_list.append(proc_t(mp[1], int(mp[2])))
return proc_list if not m else [proc_t(procs, 0)]
def parse_ss_conns(out) -> [conn_t]:
'Parse "ss -xp" output string into normalized connection info tuples'
out = list(reversed(out.strip('\r\n').splitlines()))
err_cmp_fmt, sx = ' Actual: {}\n Expected: {}'.format, lambda s: ' '.join(s.split())
if (hdr := sx(hdr_raw := out.pop())) != ( hdr_chk :=
'Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port Process' ):
raise ValueError(f'Unrecognized "ss" output format:\n{err_cmp_fmt(hdr, hdr_chk)}')
a, s, b = hdr_raw.partition(':'); n_src = len(a)
a, s, b = b.partition(':'); n_dst = len(a)
conns, ep = list(), lambda s: '' if s == '*' else s
for line in out:
src, s1, dst = line[:n_src], line[n_src], line[n_src+1:]
dst, s2, procs = dst[:n_dst], dst[n_dst], dst[n_dst+1:]
if not (s1 == s2 == ' '):
raise ValueError(f'ss line format error:\n{err_cmp_fmt(line, hdr_raw)}')
netid, st, q_rx, q_tx, src = src.split(None, 4)
if netid not in ['u_str', 'u_dgr', 'u_seq']:
raise ValueError(f'ss socket type error:\n {sx(line)}')
srcn, dst = dst.split(None, 1)
if dst != '*':
raise ValueError(f'Unexpected "Peer Address" value: {dst!r}\n {sx(line)}')
dstn, procs = ( (procs[0], '')
if len(procs := procs.strip().split(None, 1)) == 1 else procs )
conns.append(conn := conn_t( ep(src), srcn := int(srcn),
ep(dst), dstn := int(dstn), st, int(q_rx), int(q_tx), parse_procs(procs) ))
return conns
conn_procs_t = cs.namedtuple('ConnProcs', 'c s st')
def format_status(conn, peer) -> str:
'Detect/format any odd conn-status info as suffix, empty str otherwise'
st = [conn.st]
if peer.st != conn.st: st[0] += f'-{peer.st}'
if st[0] == 'ESTAB': st[0] = ''
for k in 'rx', 'tx':
if v := getattr(conn, k): st.append(f's{k}={v}')
if v := getattr(peer, k): st.append(f'c{k}={v}')
return (st := ' '.join(st).strip()) and f' [ {st} ]'
def parse_sock_procs(conns) -> {str: [conn_procs_t]}:
'Parse/format connection states, link procs into {socket: src -> dst tuples}'
sock_procs, srcn_idx = cs.defaultdict(list), cs.defaultdict(list)
for conn in conns: srcn_idx[conn.srcn].append(conn)
for conn in sorted(conns, key=lambda c: c.src):
if not conn.src: continue
if not conn.dstn and conn.st == 'LISTEN': continue
# Can be a cycle: @xxx/bus/systemd/bus-system [10896 -> 8141]
# connected to /run/dbus/system_bus_socket [8141 -> 10896]
peers = srcn_idx[conn.dstn].copy()
if not peers: peers = [conn_t('', conn.dstn, '', 0, conn.st, 0, 0, list())]
for p in peers:
if not (p.procs or conn.procs): continue
sock_procs[conn.src].append(conn_procs_t(
p.procs, conn.procs, format_status(conn, p) ))
return sock_procs
def main(args=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(usage='%(prog)s [options] [socket]',
formatter_class=argparse.RawTextHelpFormatter, description=dd('''
List unix socket (aka "named pipe") connections with processes on both ends.
Output format: <socket-path> :: <listening-process> :: <client-proc> ...
It's based on "ss -xp", but links src-dst inodes to print both peers and dedup stuff.'''))
parser.add_argument('socket', nargs='?', help=dd('''
Specific socket path to lookup all connections to/from. Can be a symlink.
"ss -xp src <socket>" can also do similar thing, but somewhat more verbosely.'''))
parser.add_argument('-c', '--conns', action='store_true', help=dd('''
Print separate "src -> dst" line for each opened socket connection,
instead of printing only a single line for each socket by default.
Default is to print src/dst processes for sockets,
merging and deduplicating process info from connections.
Multiple processes can still be printed on one line, if connection fd is shared.'''))
parser.add_argument('-p', '--pretty', action='store_true', help=dd('''
Listed aggregated/deduplicated processes one-per-line,
to hopefully make it a bit more human-readable in the terminal.
Also separates pids from process names by spaces, so they'd be easier to copy.'''))
opts = parser.parse_args(sys.argv[1:] if args is None else args)
if (sock_filter := opts.socket) and os.path.exists(sock_filter):
sock_filter = os.path.realpath(sock_filter, strict=True)
conns = parse_ss_conns(sp.run( ['ss', '-xp'], check=True,
stdout=sp.PIPE ).stdout.decode('utf-8', 'backslashreplace'))
sock_procs = parse_sock_procs(conns)
out = out1 = list()
if opts.pretty: out1 = list() # separate buffer for oneliners
nx, ns = '?', lambda s: s if '::' not in s else ns(s.replace('::', ':').replace(' ', '_'))
for sock, proc_tuples in sock_procs.items():
if sock_filter and sock != sock_filter: continue
if not opts.conns:
pc, ps = set(), set()
for procs in proc_tuples: pc.update(procs.c); ps.update(procs.s)
proc_tuples = [conn_procs_t(pc, ps, '')]
for procs in proc_tuples:
if not opts.pretty or len(procs.c) == 1:
pc, ps = (ns( ' '.join(f'{p.name}[{p.pid}]'
for p in sorted(set(pp))) ) for pp in [procs.c, procs.s])
out1.append(f'{sock} :: {ps or nx}{procs.st} :: {pc or nx}'); continue
ps = ns(' + '.join(f'{p.name} {p.pid}' for p in sorted(set(procs.s))))
si = [f'{sock} :: {ps or nx}{procs.st}']
if not procs.c: out1.append(si[0]); continue
pc = cs.defaultdict(set)
for p in procs.c: pc[p.name].add(p.pid)
si.extend(( f' {name} ' +
' '.join(map(str, pids)) ) for name, pids in sorted(pc.items()))
out.append('\n'.join(si))
if not opts.pretty: return print('\n'.join(out))
if out1: print('\n'.join(out1), end='\n\n')
print('\n\n'.join(out))
if __name__ == '__main__':
try: sys.exit(main())
except BrokenPipeError: # stdout pipe closed
os.dup2(os.open(os.devnull, os.O_WRONLY), sys.stdout.fileno())
sys.exit(1)