forked from apprell/proxmox-autosnap
-
Notifications
You must be signed in to change notification settings - Fork 0
/
proxmox-autosnap.py
executable file
·239 lines (198 loc) · 9.39 KB
/
proxmox-autosnap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#!/usr/bin/env python3
import os
import re
import json
import socket
import argparse
import functools
import subprocess
from datetime import datetime, timedelta
MUTE = False
DRY_RUN = False
USE_SUDO = False
ONLY_ON_RUNNING = False
DATE_ISO_FORMAT = False
DATE_TRUENAS_FORMAT = False
INCLUDE_VM_STATE = False
def running(func):
@functools.wraps(func)
def create_pid(*args, **kwargs):
pid = str(os.getpid())
location = os.path.dirname(os.path.realpath(__file__))
location_pid = os.path.join(location, 'running.pid')
if os.path.isfile(location_pid):
with open(location_pid) as f:
print('Script already running under PID {0}, skipping execution.'.format(f.read()))
raise SystemExit(1)
try:
with open(location_pid, 'w') as f:
f.write(pid)
return func(*args, **kwargs)
finally:
os.unlink(location_pid)
return create_pid
def run_command(command: list, force_no_sudo: bool = False) -> dict:
if USE_SUDO and not force_no_sudo:
command.insert(0, 'sudo')
run = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = run.communicate()
if run.returncode == 0:
return {'status': True, 'message': out.decode('utf-8', 'replace').rstrip()}
else:
return {'status': False, 'message': err.decode('utf-8', 'replace').rstrip()}
def vm_is_stopped(vmid: str, virtualization: str) -> bool:
run = run_command([virtualization, 'status', vmid])
if run['status']:
if 'stopped' in run['message'].lower():
return True
return False
def get_pve_config(vmid: str, virtualization: str) -> dict:
run = run_command([virtualization, 'config', vmid])
if not run['status']:
raise SystemExit(run['message'])
cfg = {}
for line in run["message"].splitlines():
if ":" in line:
(k, v) = line.split(": ")
cfg[k.strip()] = v.strip()
return cfg
def get_zfs_volume(proxmox_fs: str) -> str:
run = run_command(['pvesm', 'path', proxmox_fs])
if not run['status']:
raise SystemExit(run['message'])
zfsvol = run['message'].strip().split(",")[0]
if zfsvol[0] == "/":
return zfsvol[1:]
else:
return zfsvol
def vmid_list(exclude: list, vmlist_path: str = '/etc/pve/.vmlist') -> dict:
vm_id = {}
node = socket.gethostname().split('.')[0]
run = run_command(['cat', vmlist_path])
if not run['status']:
raise SystemExit(run['message'])
data = json.loads(run['message'])
for key, value in data['ids'].items():
if value['type'] == 'lxc' and value['node'] == node and key not in exclude:
vm_id[key] = 'pct'
elif value['type'] == 'qemu' and value['node'] == node and key not in exclude:
vm_id[key] = 'qm'
return vm_id
def create_snapshot(vmid: str, virtualization: str, label: str = 'daily') -> None:
if ONLY_ON_RUNNING and vm_is_stopped(vmid, virtualization):
print('VM {0} - status is stopped, skipping...'.format(vmid)) if not MUTE else None
return
labels = {'minute': 'autominute', 'hourly': 'autohourly', 'daily': 'autodaily', 'weekly': 'autoweekly',
'monthly': 'automonthly'}
suffix_datetime = datetime.now() + timedelta(seconds=1)
if DATE_ISO_FORMAT:
suffix = "_" + suffix_datetime.isoformat(timespec="seconds").replace("-", "_").replace(":", "_")
elif DATE_TRUENAS_FORMAT:
suffix = suffix_datetime.strftime('%Y%m%d%H%M%S')
else:
suffix = suffix_datetime.strftime('%y%m%d%H%M%S')
snapshot_name = labels[label] + suffix
params = [virtualization, 'snapshot', vmid, snapshot_name, '--description', 'autosnap']
if virtualization == 'qm' and INCLUDE_VM_STATE:
params.append('--vmstate')
if DRY_RUN:
params.insert(0, 'sudo') if USE_SUDO else None
print(' '.join(params))
else:
run = run_command(params)
if run['status']:
print('VM {0} - Creating snapshot {1}'.format(vmid, snapshot_name)) if not MUTE else None
else:
print('VM {0} - {1}'.format(vmid, run['message']))
def remove_snapshot(vmid: str, virtualization: str, label: str = 'daily', keep: int = 30) -> None:
if ONLY_ON_RUNNING and vm_is_stopped(vmid, virtualization):
print('VM {0} - status is stopped, skipping...'.format(vmid)) if not MUTE else None
return
listsnapshot = []
snapshots = run_command([virtualization, 'listsnapshot', vmid])
for snapshot in snapshots['message'].splitlines():
snapshot = re.search(r'auto{0}([_0-9T]+$)'.format(label), snapshot.replace('`->', '').split()[0])
if snapshot is not None:
listsnapshot.append(snapshot.group(0))
if listsnapshot and len(listsnapshot) > keep:
old_snapshots = [snap for num, snap in enumerate(sorted(listsnapshot, reverse=True), 1) if num > keep]
if old_snapshots:
for old_snapshot in old_snapshots:
params = [virtualization, 'delsnapshot', vmid, old_snapshot]
if DRY_RUN:
params.insert(0, 'sudo') if USE_SUDO else None
print(' '.join(params))
else:
run = run_command(params)
if run['status']:
print('VM {0} - Removing snapshot {1}'.format(vmid, old_snapshot)) if not MUTE else None
else:
print('VM {0} - {1}'.format(vmid, run['message']))
def zfs_send(vmid: str, virtualization: list[str], zfs_send_to: str):
cfg = get_pve_config(vmid, virtualization)
localzfs = get_zfs_volume(cfg["rootfs"])
remotezfs = os.path.join(zfs_send_to, cfg["hostname"])
params = ("/usr/sbin/syncoid", localzfs, remotezfs, "--identifier=autosnap", "--no-privilege-elevation")
if DRY_RUN:
print(' '.join(params))
else:
run = run_command(params, force_no_sudo=True)
if run['status']:
print('VM {0} - syncoid to {1}'.format(vmid, zfs_send_to)) if not MUTE else None
else:
print('VM {0} - syncoid FAIL: {1}'.format(vmid, run['message']))
@running
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--autosnap', action='store_true', help='Create a snapshot and delete the old one.')
parser.add_argument('-s', '--snap', action='store_true', help='Create a snapshot but do not delete anything.')
parser.add_argument('-v', '--vmid', nargs='+', required=True,
help='Space separated list of CT/VM ID or all for all CT/VM in node.')
parser.add_argument('-c', '--clean', action='store_true', help='Delete all or selected autosnapshots.')
parser.add_argument('-k', '--keep', type=int, default=30, help='The number of snapshots which should will keep.')
parser.add_argument('-l', '--label', choices=['minute', 'hourly', 'daily', 'weekly', 'monthly'], default='daily',
help='One of minute, hourly, daily, weekly or monthly.')
parser.add_argument('--date-iso-format', action='store_true', help='Store snapshots in ISO 8601 format.')
parser.add_argument('--date-truenas-format', action='store_true', help='Store snapshots in TrueNAS format.')
parser.add_argument('-e', '--exclude', nargs='+', default=[],
help='Space separated list of CT/VM ID to exclude from processing.')
parser.add_argument('-m', '--mute', action='store_true', help='Output only errors.')
parser.add_argument('-r', '--running', action='store_true', help='Run only on running vm, skip on stopped')
parser.add_argument('-i', '--includevmstate', action='store_true', help='Include the VM state in snapshots.')
parser.add_argument('--zfs-send-to', metavar='[USER@]HOST:ZFSDIR', help='Send zfs snapshot to USER@HOST on ZFSDIR hierarchy - USER@ is optional with syncoid > 2:1')
parser.add_argument('-d', '--dryrun', action='store_true',
help='Do not create or delete snapshots, just print the commands.')
parser.add_argument('--sudo', action='store_true', help='Launch commands through sudo.')
argp = parser.parse_args()
global MUTE, DRY_RUN, USE_SUDO, ONLY_ON_RUNNING, INCLUDE_VM_STATE, DATE_ISO_FORMAT, DATE_TRUENAS_FORMAT
MUTE = argp.mute
DRY_RUN = argp.dryrun
USE_SUDO = argp.sudo
ONLY_ON_RUNNING = argp.running
DATE_ISO_FORMAT = argp.date_iso_format
DATE_TRUENAS_FORMAT = argp.date_truenas_format
INCLUDE_VM_STATE = argp.includevmstate
all_vmid = vmid_list(exclude=argp.exclude)
if 'all' in argp.vmid:
picked_vmid = dict(sorted(all_vmid.items()))
else:
picked_vmid = {}
for vm in argp.vmid:
picked_vmid[vm] = all_vmid[vm]
if argp.snap:
for k, v in picked_vmid.items():
create_snapshot(vmid=k, virtualization=v, label=argp.label)
elif argp.clean:
for k, v in picked_vmid.items():
remove_snapshot(vmid=k, virtualization=v, label=argp.label, keep=argp.keep)
elif argp.autosnap:
for k, v in picked_vmid.items():
create_snapshot(vmid=k, virtualization=v, label=argp.label)
remove_snapshot(vmid=k, virtualization=v, label=argp.label, keep=argp.keep)
elif argp.zfs_send_to:
for k, v in picked_vmid.items():
zfs_send(vmid=k, virtualization=v, zfs_send_to=argp.zfs_send_to)
else:
parser.print_help()
if __name__ == '__main__':
main()