diff --git a/async_api.py b/async_api.py index 1a1b739..4473d58 100644 --- a/async_api.py +++ b/async_api.py @@ -65,45 +65,47 @@ def slave_thread(self): #else: # log.debug(f"slave thread {self} received job") - hostobj = self.cluster.get_hostobj_byname(job.hostname) - try: - job.result = hostobj.call_api(job.method, job.parms) - job.exception = False - except wekalib.exceptions.HTTPError as exc: - if exc.code == 502: # Bad Gateway - a transient error - log.error(f"slave thread received Bad Gateway on host {job.hostname}") - if job.times_in_q <= 2: # lowered from 10 retries + for retries in range(1,3): + hostobj = self.cluster.get_hostobj_byname(job.hostname) + try: + job.result = hostobj.call_api(job.method, job.parms) + job.exception = False + break + except wekalib.exceptions.HTTPError as exc: + if exc.code == 502: # Bad Gateway - a transient error + log.error(f"slave thread {self} received Bad Gateway on host {job.hostname}") # retry a few times + log.debug(f"{self} retrying after Bad Gateway") job.times_in_q += 1 - self.submit(job) + #self.submit(job) + time.sleep(0.5) # make it yield so maybe the server will recover #self.inputq.task_done() + job.result = wekalib.exceptions.APIError(f"{exc.host}: ({exc.code}) {exc.message}") # send as APIError + job.exception = True continue # go back to the inputq.get() - # trex - take this out for now... extending scrape times too much - #elif job.times_in_q <= 12: # give it 2 more chances - # # then sleep to give the cluster a little time to recover - # time.sleep(0.5) # might be the only thing in the queue... - # job.times_in_q += 1 - # self.submit(job) - # self.inputq.task_done() - # continue # go back to the inputq.get() - - # else, give up and return the error - note: multiprocessing.queue module hates HTTPErrors - can't unpickle correctly - job.result = wekalib.exceptions.APIError(f"{exc.host}: ({exc.code}) {exc.message}") # send as APIError - job.exception = True - except wekalib.exceptions.TimeoutError as exc: - job.result = exc - job.exception = True - log.error(f"{exc}") - except Exception as exc: - job.result = exc - job.exception = True - log.info(f"Exception recieved on host {job.hostname}:{exc}") - log.info(traceback.format_exc()) + + except wekalib.exceptions.TimeoutError as exc: + job.result = exc + job.exception = True + log.error(f"{exc}") + break + except Exception as exc: + job.result = exc + job.exception = True + log.info(f"Exception recieved on host {job.hostname}:{exc}") + log.info(traceback.format_exc()) + break + + # else, give up and return the error - note: multiprocessing.queue module hates HTTPErrors - can't unpickle correctly + #job.result = wekalib.exceptions.APIError(f"{exc.host}: ({exc.code}) {exc.message}") # send as APIError + #job.exception = True + if job.exception and type(job.result) is wekalib.exceptions.APIError: + log.debug(f"{self} noting Bad Gateway exception, returning exception") # this will send back the above exeptions as well as good results #log.info(f"job.result={json.dumps(job.result, indent=2)}") - log.debug(f"slave thread {self} queuing output") + log.debug(f"slave thread {self} queuing output.. Is exception = {job.exception}") self.outputq.put(job) #self.inputq.task_done() @@ -330,7 +332,7 @@ def wait(self): #timeout_period = 5.0 # testing while self.num_outstanding > 0: try: - log.debug(f"outputq size is {self.outputq.qsize()}") + log.debug(f"outputq size is {self.outputq.qsize()}, num_outstanding is {self.num_outstanding}") result = self.outputq.get(True, timeout=timeout_period) # don't block because they should be dead except queue.Empty as exc: # timed out - if timeout is specified, it either returns an item or queue.Empty on timeout diff --git a/collector.py b/collector.py index 3e0ca26..b6e5fbf 100644 --- a/collector.py +++ b/collector.py @@ -249,8 +249,10 @@ def collect(self): return # raise? # yield for each metric + log.debug("Yielding metrics") for metric in metric_objs.values(): yield metric + log.debug("Yielding complete") # report time if we gathered, otherwise, it's meaningless if should_gather: @@ -258,9 +260,11 @@ def collect(self): else: elapsed = self.last_elapsed + log.debug("Yielding process metrics") yield GaugeMetricFamily('weka_collect_seconds', 'Total Time spent in Prometheus collect', value=self.last_elapsed) yield GaugeMetricFamily('weka_collect_apicalls', 'Total number of api calls', value=self.api_stats['num_calls']) + log.debug("Yielding process metrics complete") if not second_pass: log.info( @@ -651,8 +655,8 @@ def gather(self): # # yes, I know it's convoluted... it was hard to write, so it *should* be hard to read. ;) #print(json.dumps(wekadata, indent=2)) - log.debug(f"io stats cluster={cluster.name}") - log.debug(json.dumps(stats_data, indent=2)) + #log.debug(f"io stats cluster={cluster.name}") + #log.debug(json.dumps(stats_data, indent=2)) """ [{ @@ -671,7 +675,7 @@ def gather(self): }] """ - log.debug(json.dumps(weka_stat_list, indent=2)) + #log.debug(json.dumps(weka_stat_list, indent=2)) for statistic in stats_data: node = statistic['node'] diff --git a/docker_build b/docker_build index 3a6c9b0..d7efa2d 100755 --- a/docker_build +++ b/docker_build @@ -2,3 +2,4 @@ ./build.sh VERSION=$(./tarball/export/export --version | awk '{print $3}') docker build --tag wekasolutions/export:latest --tag wekasolutions/export:$VERSION . +#docker build --tag wekasolutions/export:$VERSION . diff --git a/export.py b/export.py index be52406..9fc3b4a 100644 --- a/export.py +++ b/export.py @@ -28,7 +28,7 @@ from wekalib.wekacluster import WekaCluster import wekalib.exceptions -VERSION = "1.5.1" +VERSION = "1.5.2" #VERSION = "experimental" # set the root log @@ -196,6 +196,7 @@ def configure_logging(logger, verbosity): # local modules logging.getLogger("collector").setLevel(loglevel) logging.getLogger("lokilogs").setLevel(loglevel) + logging.getLogger("async_api").setLevel(loglevel) def main():