Skip to content

Commit

Permalink
latest ver
Browse files Browse the repository at this point in the history
  • Loading branch information
vince-weka committed Sep 29, 2021
1 parent adab29d commit 59d386e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 36 deletions.
66 changes: 34 additions & 32 deletions async_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,45 +65,47 @@ def slave_thread(self):
#else:
# log.debug(f"slave thread {self} received job")

hostobj = self.cluster.get_hostobj_byname(job.hostname)
try:
job.result = hostobj.call_api(job.method, job.parms)
job.exception = False
except wekalib.exceptions.HTTPError as exc:
if exc.code == 502: # Bad Gateway - a transient error
log.error(f"slave thread received Bad Gateway on host {job.hostname}")
if job.times_in_q <= 2: # lowered from 10 retries
for retries in range(1,3):
hostobj = self.cluster.get_hostobj_byname(job.hostname)
try:
job.result = hostobj.call_api(job.method, job.parms)
job.exception = False
break
except wekalib.exceptions.HTTPError as exc:
if exc.code == 502: # Bad Gateway - a transient error
log.error(f"slave thread {self} received Bad Gateway on host {job.hostname}")
# retry a few times
log.debug(f"{self} retrying after Bad Gateway")
job.times_in_q += 1
self.submit(job)
#self.submit(job)
time.sleep(0.5) # make it yield so maybe the server will recover
#self.inputq.task_done()
job.result = wekalib.exceptions.APIError(f"{exc.host}: ({exc.code}) {exc.message}") # send as APIError
job.exception = True
continue # go back to the inputq.get()
# trex - take this out for now... extending scrape times too much
#elif job.times_in_q <= 12: # give it 2 more chances
# # then sleep to give the cluster a little time to recover
# time.sleep(0.5) # might be the only thing in the queue...
# job.times_in_q += 1
# self.submit(job)
# self.inputq.task_done()
# continue # go back to the inputq.get()

# else, give up and return the error - note: multiprocessing.queue module hates HTTPErrors - can't unpickle correctly
job.result = wekalib.exceptions.APIError(f"{exc.host}: ({exc.code}) {exc.message}") # send as APIError
job.exception = True
except wekalib.exceptions.TimeoutError as exc:
job.result = exc
job.exception = True
log.error(f"{exc}")
except Exception as exc:
job.result = exc
job.exception = True
log.info(f"Exception recieved on host {job.hostname}:{exc}")
log.info(traceback.format_exc())

except wekalib.exceptions.TimeoutError as exc:
job.result = exc
job.exception = True
log.error(f"{exc}")
break
except Exception as exc:
job.result = exc
job.exception = True
log.info(f"Exception recieved on host {job.hostname}:{exc}")
log.info(traceback.format_exc())
break

# else, give up and return the error - note: multiprocessing.queue module hates HTTPErrors - can't unpickle correctly
#job.result = wekalib.exceptions.APIError(f"{exc.host}: ({exc.code}) {exc.message}") # send as APIError
#job.exception = True
if job.exception and type(job.result) is wekalib.exceptions.APIError:
log.debug(f"{self} noting Bad Gateway exception, returning exception")


# this will send back the above exeptions as well as good results
#log.info(f"job.result={json.dumps(job.result, indent=2)}")
log.debug(f"slave thread {self} queuing output")
log.debug(f"slave thread {self} queuing output.. Is exception = {job.exception}")
self.outputq.put(job)
#self.inputq.task_done()

Expand Down Expand Up @@ -330,7 +332,7 @@ def wait(self):
#timeout_period = 5.0 # testing
while self.num_outstanding > 0:
try:
log.debug(f"outputq size is {self.outputq.qsize()}")
log.debug(f"outputq size is {self.outputq.qsize()}, num_outstanding is {self.num_outstanding}")
result = self.outputq.get(True, timeout=timeout_period) # don't block because they should be dead
except queue.Empty as exc:
# timed out - if timeout is specified, it either returns an item or queue.Empty on timeout
Expand Down
10 changes: 7 additions & 3 deletions collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,22 @@ def collect(self):
return # raise?

# yield for each metric
log.debug("Yielding metrics")
for metric in metric_objs.values():
yield metric
log.debug("Yielding complete")

# report time if we gathered, otherwise, it's meaningless
if should_gather:
self.last_elapsed = time.time() - start_time
else:
elapsed = self.last_elapsed

log.debug("Yielding process metrics")
yield GaugeMetricFamily('weka_collect_seconds', 'Total Time spent in Prometheus collect', value=self.last_elapsed)
yield GaugeMetricFamily('weka_collect_apicalls', 'Total number of api calls',
value=self.api_stats['num_calls'])
log.debug("Yielding process metrics complete")

if not second_pass:
log.info(
Expand Down Expand Up @@ -651,8 +655,8 @@ def gather(self):
#
# yes, I know it's convoluted... it was hard to write, so it *should* be hard to read. ;)
#print(json.dumps(wekadata, indent=2))
log.debug(f"io stats cluster={cluster.name}")
log.debug(json.dumps(stats_data, indent=2))
#log.debug(f"io stats cluster={cluster.name}")
#log.debug(json.dumps(stats_data, indent=2))

"""
[{
Expand All @@ -671,7 +675,7 @@ def gather(self):
}]
"""

log.debug(json.dumps(weka_stat_list, indent=2))
#log.debug(json.dumps(weka_stat_list, indent=2))

for statistic in stats_data:
node = statistic['node']
Expand Down
1 change: 1 addition & 0 deletions docker_build
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
./build.sh
VERSION=$(./tarball/export/export --version | awk '{print $3}')
docker build --tag wekasolutions/export:latest --tag wekasolutions/export:$VERSION .
#docker build --tag wekasolutions/export:$VERSION .
3 changes: 2 additions & 1 deletion export.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from wekalib.wekacluster import WekaCluster
import wekalib.exceptions

VERSION = "1.5.1"
VERSION = "1.5.2"
#VERSION = "experimental"

# set the root log
Expand Down Expand Up @@ -196,6 +196,7 @@ def configure_logging(logger, verbosity):
# local modules
logging.getLogger("collector").setLevel(loglevel)
logging.getLogger("lokilogs").setLevel(loglevel)
logging.getLogger("async_api").setLevel(loglevel)


def main():
Expand Down

0 comments on commit 59d386e

Please sign in to comment.