Skip to content

Commit

Permalink
Merge pull request #4 from joknarf/interrupt
Browse files Browse the repository at this point in the history
log/print summary on interrupt + log hosts/result/command + -l to list ssh-para results
  • Loading branch information
joknarf authored Jul 6, 2024
2 parents 2e3144b + dd57445 commit 2b11f75
Showing 1 changed file with 125 additions and 29 deletions.
154 changes: 125 additions & 29 deletions ssh_para/ssh_para.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
SYMBOL_RES = os.environ.get("SSHP_SYM_RES") or "\u25ba" # b6 ▶
DNS_DOMAINS = os.environ.get("SSHP_DOMAINS") or ""
SSH_OPTS = os.environ.get("SSHP_OPTS") or ""
INTERRUPT = False

jobq = queue.Queue()
printq = queue.Queue()
Expand Down Expand Up @@ -74,17 +75,21 @@ def parse_args():
action="store_true",
help="verbose display (fqdn + line for last output)",
)
parser.add_argument("-l", "--ls", action="store_true", help="list ssh-para logs")
parser.add_argument("ssh_args", nargs="*")
return parser.parse_args()


def sigint_handler(*args):
"""exit all threads if signal"""
try:
curses.endwin()
except curses.error:
pass
os._exit(1)
global INTERRUPT
INTERRUPT = True


def hometilde(dir):
"""substitute home to tilde in dir"""
home = os.path.expanduser("~/")
return sub(rf"^{escape(home)}", "~/", dir)


def resolve_hostname(host):
Expand Down Expand Up @@ -303,12 +308,14 @@ def __init__(
self.verbose = verbose
self.maxhostlen = maxhostlen
self.killedpid = {}
home = os.path.expanduser("~/")
self.pdirlog = sub(rf"^{escape(home)}", "~/", self.dirlog)
self.pdirlog = hometilde(dirlog)
if sys.stdout.isatty():
self.init_curses()
super().__init__()

def __del__(self):
self.print_summary()

def init_curses(self):
"""curses window init"""
self.stdscr = curses.initscr()
Expand Down Expand Up @@ -342,11 +349,36 @@ def init_curses(self):

def join(self, *args):
"""returns nb failed"""
global INTERRUPT
super().join(*args)
if INTERRUPT:
return 130
return self.nbfailed > 0

def interrupt(self, jstatus):
"""sigint handler to log/print summary"""
if jstatus and jstatus.status == "FAILED":
self.job_status[-1].status = "KILLED"
self.job_status[-1].exit = 256
for jstatus in self.th_status:
if jstatus.status != "IDLE" and jstatus.exit == None:
if jstatus.fdlog:
jstatus.fdlog.close()
jstatus.status="KILLED"
jstatus.exit = 256
self.nbfailed += 1
self.job_status.append(jstatus)
try:
curses.endwin()
except curses.error:
pass
self.print_summary()
os._exit(1)


def run(self):
"""get threads status change"""
global INTERRUPT
jobsdur = 0
nbsshjobs = 0
while True:
Expand Down Expand Up @@ -384,16 +416,18 @@ def run(self):
self.check_timeouts()
if len(self.job_status) == self.nbjobs:
break
if INTERRUPT:
self.interrupt(jstatus)
return
self.resume()
end = strftime("%X")
if self.stdscr:
addstrc(self.stdscr, curses.LINES - 1, 0, "All jobs finished")
self.stdscr.refresh()
self.stdscr.getch()
curses.endwin()
curses.echo()
curses.curs_set(1)
self.print_summary(end, total_dur)
self.print_summary()

def check_timeout(self, th_id, duration):
"""kill ssh if duration exceeds timeout"""
Expand Down Expand Up @@ -528,6 +562,7 @@ def kill(self, status="KILLED", th_kill=None):
try:
os.kill(self.th_status[th_kill].pid, 15)
self.killedpid[self.th_status[th_kill].pid] = status
self.nbfailed += 1
except ProcessLookupError:
pass

Expand Down Expand Up @@ -583,18 +618,23 @@ def abort_jobs(self):
self.aborted.append(job.host)
self.resume()

def print_summary(self, end, total_dur):
def print_summary(self):
"""print/log summary of jobs"""
end = strftime("%X")
total_dur = tdelta(seconds=round(time() - self.startsec))
global_log = open(f"{self.dirlog}/ssh-para.log", "w", encoding="UTF-8")
if self.aborted:
print_tee(
"Cancelled hosts:", file=global_log, color=Style.BRIGHT + Fore.RED
"Cancelled hosts:", str(self.nbjobs), file=global_log, color=Style.BRIGHT + Fore.RED
)
for host in self.aborted:
print_tee(host, file=global_log)
self.nbjobs -= 1
#self.nbjobs -= 1
print_tee("", file=global_log)
nbrun = 0
for jstatus in self.job_status:
if jstatus.status != "ABORTED":
nbrun += 1
if jstatus.exit != 0:
color = Style.BRIGHT + Fore.RED
else:
Expand All @@ -610,11 +650,13 @@ def print_summary(self, end, total_dur):
print_tee(" ", jstatus.log, file=global_log)
print_tee("command:", self.command, file=global_log)
print_tee("log directory:", self.pdirlog, file=global_log)
start = strftime('%X', datetime.fromtimestamp(self.startsec).timetuple())
print_tee(
f"{self.nbjobs} jobs run : Start: {strftime('%X', datetime.fromtimestamp(self.startsec).timetuple())}",
f"{nbrun}/{self.nbjobs} jobs run : Begin: {start}",
f"End: {end} Duration: {total_dur}",
file=global_log,
)
printfile(f"{self.dirlog}/ssh-para.result", f"Begin: {start} End: {end} Dur: {total_dur} Runs: {nbrun}/{self.nbjobs} Failed: {self.nbfailed}")
if self.nbfailed == 0:
print_tee("All Jobs with exit code 0", file=global_log)
else:
Expand Down Expand Up @@ -745,42 +787,96 @@ def get_hosts(hostsfile, hosts):
sys.exit(1)
return hosts

def tstodatetime(ts):
try:
tsi = int(ts)
except ValueError:
return None
return datetime.fromtimestamp(tsi).strftime("%Y-%m-%d %H:%M:%S")

def main():
"""argument read / read hosts file / prepare commands / launch jobs"""
init(autoreset=True)
args = parse_args()
dirlog = args.dirlog
if args.job:
dirlog += f"/{args.job}"
def printfile(file, text):
try:
with open(file, 'w', encoding="UTF-8") as fd:
print(text, file=fd)
except OSError:
return False
return True

def readfile(file):
try:
with open(file, 'r', encoding="UTF-8") as fd:
text = fd.read()
except OSError:
return None
return text.strip()

def log_ls(dirlog, job):
if job:
dirlog = f"{dirlog}/{job}"
try:
logdirs = os.listdir(dirlog)
except OSError:
print(f"no logs found in {dirlog}", file=sys.stderr)
sys.exit(1)
logdirs.sort()
for dir in logdirs:
result = readfile(f"{dirlog}/{dir}/ssh-para.result")
command = readfile(f"{dirlog}/{dir}/ssh-para.command")
if command:
print(f"{hometilde(dirlog)}/{dir:10}:", result, "Command:", command)
sys.exit(0)


def make_logdir(dirlog, job):
"""create log directory"""
latest = f"{dirlog}/latest"
if job:
dirlog += f"/{job}"
dirlog += "/" + str(int(time()))
if not os.path.isdir(dirlog):
os.makedirs(dirlog)
latest = f"{args.dirlog}/latest"
if os.path.exists(latest):
os.unlink(latest)
try:
if not os.path.isdir(dirlog):
os.makedirs(dirlog)
except OSError:
print(f"Error: ssh-para: cannot create log directory: {dirlog}")
sys.exit(1)
try:
if os.path.exists(latest):
os.unlink(latest)
os.symlink(dirlog, latest)
except OSError:
pass
return dirlog


def main():
"""argument read / read hosts file / prepare commands / launch jobs"""
init(autoreset=True)
args = parse_args()
if args.ls:
log_ls(args.dirlog, args.job)
if args.script:
args.ssh_args.append(script_command(args.script, args.args))
command = [args.script]
if args.args:
command += args.args
else:
command = args.ssh_args
if not args.ssh_args:
print("ERROR: ssh-para: No ssh command supplied", file=sys.stderr)
sys.exit(1)
hosts = get_hosts(args.hostsfile, args.hosts)
max_len = 0
dirlog = make_logdir(args.dirlog, args.job)
printfile(f"{dirlog}/ssh-para.command", " ".join(command))
printfile(f"{dirlog}/ssh-para.hosts", "\n".join(hosts))
if args.resolve:
print("Notice: ssh-para: Resolving hosts...", file=sys.stderr)
hosts = resolve_hosts(hosts, DNS_DOMAINS.split())
print("Notice: ssh-para: Resolve done", file=sys.stderr)
printfile(f"{dirlog}/ssh-para.hosts_resolved", "\n".join(hosts))
max_len = 0
for host in hosts:
max_len = max(max_len, len(short_host(host)))
if not args.ssh_args:
print("ERROR: ssh-para: No ssh command supplied", file=sys.stderr)
sys.exit(1)

for host in hosts:
jobq.put(Job(host=host, command=args.ssh_args))
parallel = min(len(hosts), args.parallel)
Expand Down

0 comments on commit 2b11f75

Please sign in to comment.