From dd5744554d30a827f7530d9663407962db4973ed Mon Sep 17 00:00:00 2001 From: joknarf Date: Sat, 6 Jul 2024 15:49:14 +0200 Subject: [PATCH] interrupt result/hosts/command log file/-l --- ssh_para/ssh_para.py | 154 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 125 insertions(+), 29 deletions(-) diff --git a/ssh_para/ssh_para.py b/ssh_para/ssh_para.py index 97a8e73..9e92073 100644 --- a/ssh_para/ssh_para.py +++ b/ssh_para/ssh_para.py @@ -29,6 +29,7 @@ SYMBOL_RES = os.environ.get("SSHP_SYM_RES") or "\u25ba" # b6 ▶ DNS_DOMAINS = os.environ.get("SSHP_DOMAINS") or "" SSH_OPTS = os.environ.get("SSHP_OPTS") or "" +INTERRUPT = False jobq = queue.Queue() printq = queue.Queue() @@ -74,17 +75,21 @@ def parse_args(): action="store_true", help="verbose display (fqdn + line for last output)", ) + parser.add_argument("-l", "--ls", action="store_true", help="list ssh-para logs") parser.add_argument("ssh_args", nargs="*") return parser.parse_args() def sigint_handler(*args): """exit all threads if signal""" - try: - curses.endwin() - except curses.error: - pass - os._exit(1) + global INTERRUPT + INTERRUPT = True + + +def hometilde(dir): + """substitute home to tilde in dir""" + home = os.path.expanduser("~/") + return sub(rf"^{escape(home)}", "~/", dir) def resolve_hostname(host): @@ -301,12 +306,14 @@ def __init__( self.verbose = verbose self.maxhostlen = maxhostlen self.killedpid = {} - home = os.path.expanduser("~/") - self.pdirlog = sub(rf"^{escape(home)}", "~/", self.dirlog) + self.pdirlog = hometilde(dirlog) if sys.stdout.isatty(): self.init_curses() super().__init__() + def __del__(self): + self.print_summary() + def init_curses(self): """curses window init""" self.stdscr = curses.initscr() @@ -340,11 +347,36 @@ def init_curses(self): def join(self, *args): """returns nb failed""" + global INTERRUPT super().join(*args) + if INTERRUPT: + return 130 return self.nbfailed > 0 + def interrupt(self, jstatus): + """sigint handler to log/print summary""" + if jstatus and jstatus.status == "FAILED": + self.job_status[-1].status = "KILLED" + self.job_status[-1].exit = 256 + for jstatus in self.th_status: + if jstatus.status != "IDLE" and jstatus.exit == None: + if jstatus.fdlog: + jstatus.fdlog.close() + jstatus.status="KILLED" + jstatus.exit = 256 + self.nbfailed += 1 + self.job_status.append(jstatus) + try: + curses.endwin() + except curses.error: + pass + self.print_summary() + os._exit(1) + + def run(self): """get threads status change""" + global INTERRUPT jobsdur = 0 nbsshjobs = 0 while True: @@ -382,8 +414,10 @@ def run(self): self.check_timeouts() if len(self.job_status) == self.nbjobs: break + if INTERRUPT: + self.interrupt(jstatus) + return self.resume() - end = strftime("%X") if self.stdscr: addstrc(self.stdscr, curses.LINES - 1, 0, "All jobs finished") self.stdscr.refresh() @@ -391,7 +425,7 @@ def run(self): curses.endwin() curses.echo() curses.curs_set(1) - self.print_summary(end, total_dur) + self.print_summary() def check_timeout(self, th_id, duration): """kill ssh if duration exceeds timeout""" @@ -526,6 +560,7 @@ def kill(self, status="KILLED", th_kill=None): try: os.kill(self.th_status[th_kill].pid, 15) self.killedpid[self.th_status[th_kill].pid] = status + self.nbfailed += 1 except ProcessLookupError: pass @@ -581,18 +616,23 @@ def abort_jobs(self): self.aborted.append(job.host) self.resume() - def print_summary(self, end, total_dur): + def print_summary(self): """print/log summary of jobs""" + end = strftime("%X") + total_dur = tdelta(seconds=round(time() - self.startsec)) global_log = open(f"{self.dirlog}/ssh-para.log", "w", encoding="UTF-8") if self.aborted: print_tee( - "Cancelled hosts:", file=global_log, color=Style.BRIGHT + Fore.RED + "Cancelled hosts:", str(self.nbjobs), file=global_log, color=Style.BRIGHT + Fore.RED ) for host in self.aborted: print_tee(host, file=global_log) - self.nbjobs -= 1 + #self.nbjobs -= 1 print_tee("", file=global_log) + nbrun = 0 for jstatus in self.job_status: + if jstatus.status != "ABORTED": + nbrun += 1 if jstatus.exit != 0: color = Style.BRIGHT + Fore.RED else: @@ -608,11 +648,13 @@ def print_summary(self, end, total_dur): print_tee(" ", jstatus.log, file=global_log) print_tee("command:", self.command, file=global_log) print_tee("log directory:", self.pdirlog, file=global_log) + start = strftime('%X', datetime.fromtimestamp(self.startsec).timetuple()) print_tee( - f"{self.nbjobs} jobs run : Start: {strftime('%X', datetime.fromtimestamp(self.startsec).timetuple())}", + f"{nbrun}/{self.nbjobs} jobs run : Begin: {start}", f"End: {end} Duration: {total_dur}", file=global_log, ) + printfile(f"{self.dirlog}/ssh-para.result", f"Begin: {start} End: {end} Dur: {total_dur} Runs: {nbrun}/{self.nbjobs} Failed: {self.nbfailed}") if self.nbfailed == 0: print_tee("All Jobs with exit code 0", file=global_log) else: @@ -743,24 +785,73 @@ def get_hosts(hostsfile, hosts): sys.exit(1) return hosts +def tstodatetime(ts): + try: + tsi = int(ts) + except ValueError: + return None + return datetime.fromtimestamp(tsi).strftime("%Y-%m-%d %H:%M:%S") -def main(): - """argument read / read hosts file / prepare commands / launch jobs""" - init(autoreset=True) - args = parse_args() - dirlog = args.dirlog - if args.job: - dirlog += f"/{args.job}" +def printfile(file, text): + try: + with open(file, 'w', encoding="UTF-8") as fd: + print(text, file=fd) + except OSError: + return False + return True + +def readfile(file): + try: + with open(file, 'r', encoding="UTF-8") as fd: + text = fd.read() + except OSError: + return None + return text.strip() + +def log_ls(dirlog, job): + if job: + dirlog = f"{dirlog}/{job}" + try: + logdirs = os.listdir(dirlog) + except OSError: + print(f"no logs found in {dirlog}", file=sys.stderr) + sys.exit(1) + logdirs.sort() + for dir in logdirs: + result = readfile(f"{dirlog}/{dir}/ssh-para.result") + command = readfile(f"{dirlog}/{dir}/ssh-para.command") + if command: + print(f"{hometilde(dirlog)}/{dir:10}:", result, "Command:", command) + sys.exit(0) + + +def make_logdir(dirlog, job): + """create log directory""" + latest = f"{dirlog}/latest" + if job: + dirlog += f"/{job}" dirlog += "/" + str(int(time())) - if not os.path.isdir(dirlog): - os.makedirs(dirlog) - latest = f"{args.dirlog}/latest" - if os.path.exists(latest): - os.unlink(latest) try: + if not os.path.isdir(dirlog): + os.makedirs(dirlog) + except OSError: + print(f"Error: ssh-para: cannot create log directory: {dirlog}") + sys.exit(1) + try: + if os.path.exists(latest): + os.unlink(latest) os.symlink(dirlog, latest) except OSError: pass + return dirlog + + +def main(): + """argument read / read hosts file / prepare commands / launch jobs""" + init(autoreset=True) + args = parse_args() + if args.ls: + log_ls(args.dirlog, args.job) if args.script: args.ssh_args.append(script_command(args.script, args.args)) command = [args.script] @@ -768,17 +859,22 @@ def main(): command += args.args else: command = args.ssh_args + if not args.ssh_args: + print("ERROR: ssh-para: No ssh command supplied", file=sys.stderr) + sys.exit(1) hosts = get_hosts(args.hostsfile, args.hosts) - max_len = 0 + dirlog = make_logdir(args.dirlog, args.job) + printfile(f"{dirlog}/ssh-para.command", " ".join(command)) + printfile(f"{dirlog}/ssh-para.hosts", "\n".join(hosts)) if args.resolve: print("Notice: ssh-para: Resolving hosts...", file=sys.stderr) hosts = resolve_hosts(hosts, DNS_DOMAINS.split()) print("Notice: ssh-para: Resolve done", file=sys.stderr) + printfile(f"{dirlog}/ssh-para.hosts_resolved", "\n".join(hosts)) + max_len = 0 for host in hosts: max_len = max(max_len, len(short_host(host))) - if not args.ssh_args: - print("ERROR: ssh-para: No ssh command supplied", file=sys.stderr) - sys.exit(1) + for host in hosts: jobq.put(Job(host=host, command=args.ssh_args)) parallel = min(len(hosts), args.parallel)