-
Notifications
You must be signed in to change notification settings - Fork 1
/
caco-mela.py
executable file
·242 lines (209 loc) · 10.4 KB
/
caco-mela.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/usr/bin/env python
import argparse
import os
import pwd
from typing import Optional, Union
import ldap
from dotenv import load_dotenv, find_dotenv
def set_default(config: dict, var: str, default: str):
if var not in config or config[var] is None:
config[var] = default
def set_boolean(config: dict, var: str):
if isinstance(config[var], str):
config[var] = config[var].lower() not in ("0", "no", "false", "n", "off")
def set_comma(config: dict, var: str):
if isinstance(config[var], list):
config[var] = set(config[var])
else:
config[var] = set(os.environ.get(var, default="").split(","))
def parse_args(tests_env: Optional[str] = None) -> dict[str, Union[str, bool]]:
parser = argparse.ArgumentParser(description="Provision SSH keys from a LDAP server, without syncing UIDs.", prog="caco-mela")
parser.add_argument("--version", action="version", version="%(prog)s 1.0.0")
parser.add_argument("-l", "--ldap", dest="LDAP_BIND_SERVER", type=str, help="LDAP server address")
parser.add_argument("-D", "--binddn", dest="LDAP_BIND_DN", type=str, help="LDAP bind DN")
parser.add_argument("-w", "--bindpw", dest="LDAP_BIND_PASSWORD", type=str, help="LDAP bind password")
parser.add_argument("-t", "--starttls", action="store_true", dest="LDAP_STARTTLS", help="Use LDAP_STARTTLS")
parser.add_argument("-s", "--search", dest="LDAP_SEARCH_BASE", type=str, help="LDAP search base")
parser.add_argument("-f", "--filter", dest="LDAP_FILTER", type=str, help="LDAP filter")
parser.add_argument("--key", dest="LDAP_SEARCH_SSH_KEY_ATTR", type=str, help="Attribute containing the SSH public key")
parser.add_argument("--uid", dest="LDAP_SEARCH_SSH_UID_ATTR", type=str, help="Attribute containing the username")
parser.add_argument("-a", "--authorized", dest="SSH_AUTHORIZED_KEYS_FILES", type=str, help="Value of sshd option AuthorizedKeysFile")
parser.add_argument("--user-owns-file", dest="SSH_USER_OWNS_FILE", action="store_true", help="Users are set to owners of their authorized_keys file, if the file is created")
parser.add_argument("-i", "--ignored", dest="IGNORED_ACCOUNTS", nargs="+", type=str, help="Accounts to ignore")
parser.add_argument("--shared", dest="SHARED_ACCOUNTS", nargs="+", type=str, help="Shared accounts: add everyone's keys to them")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("--dry-run", action="store_true", help="Do not write to authorized_keys file, just print what would have been written")
if tests_env is not None:
args = parser.parse_args([])
load_dotenv(tests_env, override=True)
else:
args = parser.parse_args()
load_dotenv()
config = dict()
for key, val in vars(args).items():
config[key] = os.environ.get(key) if (val is None or val is False) else val
set_default(config, "LDAP_SEARCH_SSH_KEY_ATTR", "sshPublicKey")
set_default(config, "LDAP_SEARCH_SSH_UID_ATTR", "uid")
set_default(config, "SSH_AUTHORIZED_KEYS_FILES", "")
set_boolean(config, "LDAP_STARTTLS")
set_boolean(config, "SSH_USER_OWNS_FILE")
set_comma(config, "IGNORED_ACCOUNTS")
set_comma(config, "SHARED_ACCOUNTS")
return config
def get_data_from_server(config):
conn = ldap.initialize(config["LDAP_BIND_SERVER"])
try:
conn.protocol_version = ldap.VERSION3
conn.simple_bind_s(config["LDAP_BIND_DN"], config["LDAP_BIND_PASSWORD"])
if config["LDAP_STARTTLS"]:
conn.start_tls_s()
except ldap.LDAPError as e:
print(f"LDAP Error: {e}")
exit(1)
if config["LDAP_STARTTLS"]:
try:
conn.start_tls_s()
except ldap.LDAPError as e:
print(f"LDAP Error: {e}")
exit(1)
finally:
conn.unbind_s()
add = dict()
try:
ldap_result_id = conn.search(config["LDAP_SEARCH_BASE"], ldap.SCOPE_SUBTREE, config["LDAP_FILTER"], [config["LDAP_SEARCH_SSH_KEY_ATTR"], config["LDAP_SEARCH_SSH_UID_ATTR"]])
while 1:
result_type, result_data = conn.result(ldap_result_id, 0)
if result_type == ldap.RES_SEARCH_ENTRY:
if config["verbose"]:
print(f"Parsing result {result_data[0][0]}")
if config["LDAP_SEARCH_SSH_UID_ATTR"] in result_data[0][1]:
if config["LDAP_SEARCH_SSH_KEY_ATTR"] in result_data[0][1]:
add[result_data[0][1][config["LDAP_SEARCH_SSH_UID_ATTR"]][0].decode("ascii")] = [x.decode("ascii") for x in result_data[0][1][config["LDAP_SEARCH_SSH_KEY_ATTR"]]]
else:
add[result_data[0][1][config["LDAP_SEARCH_SSH_UID_ATTR"]][0].decode("ascii")] = []
if config["verbose"]:
print(f"No attribute {config['LDAP_SEARCH_SSH_KEY_ATTR']} for user {result_data[0][0]}, SSH keys will be removed")
else:
if config["verbose"]:
print(f"No attribute {config['LDAP_SEARCH_SSH_UID_ATTR']} for user {result_data[0][0]}, ignoring")
else:
break
except ldap.LDAPError as e:
print(f"LDAP Error: {e}")
exit(1)
finally:
conn.unbind_s()
return add
def read_login_defs(config: dict):
uid_min = 1000
uid_max = 60000
with open("/etc/login.defs", "r") as file:
for line in file:
line = line.lstrip()
if len(line) == 0 or line.startswith("#"):
continue
if line.startswith("UID_MIN") or line.startswith("UID_MAX"):
split = line.split()
if len(split) > 1:
value = int(split[1])
if line.startswith("UID_MIN"):
uid_min = value
if config["verbose"]:
print(f"Obtained UID_MIN: {str(uid_min)}")
else:
uid_max = value
if config["verbose"]:
print(f"Obtained UID_MAX: {str(uid_max)}")
return uid_min, uid_max
def ssh_authorized_keys_file(config, user: pwd.struct_passwd, create: bool = True):
if len(config["SSH_AUTHORIZED_KEYS_FILES"]) > 0:
path = config["SSH_AUTHORIZED_KEYS_FILES"].replace("%u", user.pw_name)
else:
dotssh = os.path.join(user.pw_dir, ".ssh")
if create and not os.path.exists(dotssh):
os.mkdir(dotssh)
os.chown(dotssh, user.pw_uid, user.pw_gid)
path = os.path.join(user.pw_dir, ".ssh", "authorized_keys")
if config["verbose"]:
print(f"User {user.pw_name} file is {path}")
if create and not os.path.exists(path):
if config["verbose"]:
print(f"Creating {path}")
with open(path, "w"):
pass
if config["SSH_USER_OWNS_FILE"]:
if config["verbose"]:
print(f"Setting owner to {user.pw_uid}:{user.pw_gid} and mode to 600 for file {path}")
os.chown(path, user.pw_uid, user.pw_gid)
else:
if config["verbose"]:
print(f"Setting mode to 600 for file {path}")
os.chmod(path, 0o600)
return path
def update_file(ssh_file, text: str, dry_run: bool) -> bool:
with open(ssh_file, "r") as file:
current = file.read()
if text != current:
if dry_run:
print(f"Dry run, would have written this to {ssh_file}:")
print(text)
return True
else:
with open(ssh_file, "w") as file:
file.write(text)
return True
return False
def _keys_to_text(keys):
keys_text = "\n".join(keys) if len(keys) else "# No SSH keys for this user"
return keys_text
def _warning_text():
return f"#\n# This file is managed by Caco mela ({__file__})\n# All manual changes will be overwritten.\n#"
def generate_text(keys: list[str]):
write_this = f"{_warning_text()}\n{_keys_to_text(keys)}\n"
return write_this
def generate_text_shared(results: dict[str, list[str]]):
write_almost_this = [_warning_text()]
for user in results:
write_almost_this.append(f"# Keys for {user}:")
write_almost_this.append(_keys_to_text(results[user]))
write_this = "\n".join(write_almost_this)
return write_this
def main(tests_env: Optional[str] = None):
config = parse_args(tests_env)
results = get_data_from_server(config)
uid_min, uid_max = read_login_defs(config)
for user in pwd.getpwall():
if uid_min <= user.pw_uid <= uid_max:
if user.pw_name in config["IGNORED_ACCOUNTS"]:
if config["verbose"]:
print(f"Ignoring user {user.pw_name} due to IGNORED_ACCOUNTS")
continue
if user.pw_name in config["SHARED_ACCOUNTS"]:
if config["verbose"]:
print(f"User {user.pw_name} is a shared account")
text = generate_text_shared(results)
ssh_file = ssh_authorized_keys_file(config, user)
if update_file(ssh_file, text, config["dry_run"]):
print(f"Updated user {user.pw_name} with all available SSH keys")
elif config["verbose"]:
print(f"No change for user {user.pw_name} with all SSH keys")
elif user.pw_name in results:
text = generate_text(results[user.pw_name])
ssh_file = ssh_authorized_keys_file(config, user)
if update_file(ssh_file, text, config["dry_run"]):
print(f"Updated user {user.pw_name} with {str(len(results[user.pw_name]))} SSH keys")
elif config["verbose"]:
print(f"No change for user {user.pw_name} with {str(len(results[user.pw_name]))} SSH keys")
else:
ssh_file = ssh_authorized_keys_file(config, user, False)
if os.path.exists(ssh_file):
if config["verbose"]:
print(f"User {user.pw_name} not found in LDAP server, removing keys")
text = generate_text([])
if update_file(ssh_file, text, config["dry_run"]):
print(f"Updated user {user.pw_name} by removing all SSH keys")
elif config["verbose"]:
print(f"No change for user {user.pw_name} with 0 SSH keys")
if __name__ == "__main__":
main()
exit(0)