forked from ria-ee/X-Road-scripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxrd_all_methods.py
executable file
·159 lines (133 loc) · 5.34 KB
/
xrd_all_methods.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/python3
"""X-Road listMethods request to all members."""
import argparse
import queue
import sys
from threading import Thread, Event
import xrdinfo
# By default return listMethods
DEFAULT_METHOD = 'listMethods'
# Default timeout for HTTP requests
DEFAULT_TIMEOUT = 5.0
# Do not use threading by default
DEFAULT_THREAD_COUNT = 1
def print_error(content):
"""Error printer."""
sys.stderr.write(f'ERROR: {content}\n')
def worker(params):
"""Main function for worker threads"""
while True:
# Checking periodically if it is the time to gracefully shut down
# the worker.
try:
subsystem = params['work_queue'].get(True, 0.1)
except queue.Empty:
if params['shutdown'].is_set():
return
continue
try:
if params['rest']:
for method in xrdinfo.methods_rest(
addr=params['url'], client=params['client'], producer=subsystem,
method=params['method'], timeout=params['timeout'], verify=params['verify'],
cert=params['cert']):
# Using thread safe "write" instead of "print"
sys.stdout.write(xrdinfo.identifier(method) + '\n')
else:
for method in xrdinfo.methods(
addr=params['url'], client=params['client'], producer=subsystem,
method=params['method'], timeout=params['timeout'],
verify=params['verify'],
cert=params['cert']):
# Using thread safe "write" instead of "print"
sys.stdout.write(xrdinfo.identifier(method) + '\n')
except Exception as err:
print_error(f'{type(err).__name__}: {err}')
finally:
params['work_queue'].task_done()
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description='X-Road listMethods request to all members.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='By default peer TLS certificate is not validated.'
)
parser.add_argument(
'url', metavar='SERVER_URL',
help='URL of local Security Server accepting X-Road requests.')
parser.add_argument(
'client', metavar='CLIENT',
help='Client identifier consisting of slash separated Percent-Encoded parts (e.g. '
'"INSTANCE/MEMBER_CLASS/MEMBER_CODE/SUBSYSTEM_CODE" '
'or "INSTANCE/MEMBER_CLASS/MEMBER_CODE").')
parser.add_argument('-t', metavar='TIMEOUT', help='timeout for HTTP query', type=float)
parser.add_argument('--allowed', help='return only allowed methods', action='store_true')
parser.add_argument('--rest', help='return REST methods instead of SOAP', action='store_true')
parser.add_argument('--threads', metavar='THREADS', help='amount of threads to use', type=int)
parser.add_argument(
'--verify', metavar='CERT_PATH',
help='validate peer TLS certificate using CA certificate file.')
parser.add_argument(
'--cert', metavar='CERT_PATH', help='use TLS certificate for HTTPS requests.')
parser.add_argument('--key', metavar='KEY_PATH', help='private key for TLS certificate.')
parser.add_argument(
'--instance', metavar='INSTANCE',
help='use this instance instead of local X-Road instance.')
args = parser.parse_args()
params = {
'url': args.url,
'client': xrdinfo.identifier_parts(args.client),
'method': DEFAULT_METHOD,
'instance': None,
'timeout': DEFAULT_TIMEOUT,
'verify': False,
'cert': None,
'rest': args.rest,
'thread_cnt': DEFAULT_THREAD_COUNT,
'work_queue': queue.Queue(),
'shutdown': Event()
}
if not len(params['client']) in (3, 4):
print_error(f'Client name is incorrect: "{args.client}"')
sys.exit(1)
if args.allowed:
params['method'] = 'allowedMethods'
if args.instance:
params['instance'] = args.instance
if args.t:
params['timeout'] = args.t
if args.verify:
params['verify'] = args.verify
if args.cert and args.key:
params['cert'] = (args.cert, args.key)
if args.threads and args.threads > 0:
params['thread_cnt'] = args.threads
try:
shared_params = xrdinfo.shared_params_ss(
addr=args.url, instance=params['instance'], timeout=params['timeout'],
verify=params['verify'], cert=params['cert'])
except xrdinfo.XrdInfoError as err:
print_error(f'Cannot download Global Configuration: {err}')
sys.exit(1)
# Create and start new threads
threads = []
for _ in range(params['thread_cnt']):
thread = Thread(target=worker, args=(params,))
thread.daemon = True
thread.start()
threads.append(thread)
# Populate the queue
try:
for subsystem in xrdinfo.registered_subsystems(shared_params):
params['work_queue'].put(subsystem)
except xrdinfo.XrdInfoError as err:
print_error(err)
sys.exit(1)
# Block until all tasks in queue are done
params['work_queue'].join()
# Set shutdown event and wait until all daemon processes finish
params['shutdown'].set()
for thread in threads:
thread.join()
if __name__ == '__main__':
main()