Skip to content

Commit

Permalink
Per-service metrics http server
Browse files Browse the repository at this point in the history
* Organize metrics into their respective service
* Server per-service metrics on a per-service http server
* Increase prometheus client usage over our custom metrics fields
  • Loading branch information
chrismeyersfsu committed Feb 5, 2024
1 parent 6dcaa09 commit 8a902de
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 71 deletions.
5 changes: 3 additions & 2 deletions awx/main/analytics/analytics_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

# AWX
from awx.main.analytics.subsystem_metrics import Metrics
from awx.main.analytics.subsystem_metrics import DispatcherMetrics, CallbackReceiverMetrics
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename

Expand All @@ -11,4 +11,5 @@

@task(queue=get_task_queuename)
def send_subsystem_metrics():
Metrics().send_metrics()
DispatcherMetrics().send_metrics()
CallbackReceiverMetrics().send_metrics()
228 changes: 168 additions & 60 deletions awx/main/analytics/subsystem_metrics.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion awx/main/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def receive_json(self, data):
if group == "metrics":
message = json.loads(message['text'])
conn = redis.Redis.from_url(settings.BROKER_URL)
conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics'])
conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "-" + message['metrics_namespace'] + "_instance_" + message['instance'], message['metrics'])
else:
await self.channel_layer.group_send(group, message)

Expand Down
2 changes: 1 addition & 1 deletion awx/main/dispatch/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(self, *args, schedule=None, **kwargs):
init_time = time.time()
self.pg_down_time = init_time - self.pg_max_wait # allow no grace period
self.last_cleanup = init_time
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False)
self.last_metrics_gather = init_time
self.listen_cumulative_time = 0.0
if schedule:
Expand Down
2 changes: 1 addition & 1 deletion awx/main/dispatch/worker/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class CallbackBrokerWorker(BaseWorker):
def __init__(self):
self.buff = {}
self.redis = redis.Redis.from_url(settings.BROKER_URL)
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
self.subsystem_metrics = s_metrics.CallbackReceiverMetrics(auto_pipe_execute=False)
self.queue_pop = 0
self.queue_name = settings.CALLBACK_QUEUE
self.prof = AWXProfiler("CallbackBrokerWorker")
Expand Down
4 changes: 4 additions & 0 deletions awx/main/management/commands/run_callback_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from django.conf import settings
from django.core.management.base import BaseCommand
from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer

from awx.main.dispatch.control import Control
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
Expand All @@ -25,6 +26,9 @@ def handle(self, *arg, **options):
print(Control('callback_receiver').status())
return
consumer = None

CallbackReceiverMetricsServer().start()

try:
consumer = AWXConsumerRedis(
'callback_receiver',
Expand Down
3 changes: 3 additions & 0 deletions awx/main/management/commands/run_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from awx.main.dispatch.control import Control
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
from awx.main.analytics.subsystem_metrics import DispatcherMetricsServer

logger = logging.getLogger('awx.main.dispatch')

Expand Down Expand Up @@ -62,6 +63,8 @@ def handle(self, *arg, **options):

consumer = None

DispatcherMetricsServer().start()

try:
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
Expand Down
3 changes: 3 additions & 0 deletions awx/main/management/commands/run_wsrelay.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
RelayWebsocketStatsManager,
safe_name,
)
from awx.main.analytics.subsystem_metrics import WebsocketsMetricsServer
from awx.main.wsrelay import WebSocketRelayManager


Expand Down Expand Up @@ -91,6 +92,8 @@ def get_connection_stats(cls, hostnames, data):
return host_stats

def handle(self, *arg, **options):
WebsocketsMetricsServer().start()

# it's necessary to delay this import in case
# database migrations are still running
from awx.main.models.ha import Instance
Expand Down
4 changes: 2 additions & 2 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(self, prefix=""):
# initialize each metric to 0 and force metric_has_changed to true. This
# ensures each task manager metric will be overridden when pipe_execute
# is called later.
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False)
self.start_time = time.time()

# We want to avoid calling settings in loops, so cache these settings at init time
Expand Down Expand Up @@ -105,7 +105,7 @@ def record_aggregate_metrics(self, *args):
try:
# increment task_manager_schedule_calls regardless if the other
# metrics are recorded
s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
s_metrics.DispatcherMetrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
# Only record metrics if the last time recording was more
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
# Prevents a short-duration task manager that runs directly after a
Expand Down
4 changes: 2 additions & 2 deletions awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
from awx.conf import settings_registry
from awx.main.analytics.subsystem_metrics import Metrics
from awx.main.analytics.subsystem_metrics import DispatcherMetrics

from rest_framework.exceptions import PermissionDenied

Expand Down Expand Up @@ -113,7 +113,7 @@ def dispatch_startup():
cluster_node_heartbeat()
reaper.startup_reaping()
reaper.reap_waiting(grace_period=0)
m = Metrics()
m = DispatcherMetrics()
m.reset_values()


Expand Down
2 changes: 0 additions & 2 deletions awx/main/wsrelay.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
RelayWebsocketStats,
RelayWebsocketStatsManager,
)
import awx.main.analytics.subsystem_metrics as s_metrics

logger = logging.getLogger('awx.main.wsrelay')

Expand Down Expand Up @@ -54,7 +53,6 @@ def __init__(
self.protocol = protocol
self.verify_ssl = verify_ssl
self.channel_layer = None
self.subsystem_metrics = s_metrics.Metrics(instance_name=name)
self.producers = dict()
self.connected = False

Expand Down
29 changes: 29 additions & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,35 @@
HOST_METRIC_SUMMARY_TASK_INTERVAL = 7 # days


# TODO: cmeyers, replace with with register pattern
# The register pattern is particularly nice for this because we need
# to know the process to start the thread that will be the server.
# The registration location should be the same location as we would
# call MetricsServer.start()
# Note: if we don't get to this TODO, then at least create constants
# for the services strings below.
# TODO: cmeyers, break this out into a separate django app so other
# projects can take advantage.

METRICS_SERVICE_CALLBACK_RECEIVER = 'callback_receiver'
METRICS_SERVICE_DISPATCHER = 'dispatcher'
METRICS_SERVICE_WEBSOCKETS = 'websockets'

METRICS_SUBSYSTEM_CONFIG = {
'server': {
METRICS_SERVICE_CALLBACK_RECEIVER: {
'port': 8014,
},
METRICS_SERVICE_DISPATCHER: {
'port': 8015,
},
METRICS_SERVICE_WEBSOCKETS: {
'port': 8016,
},
}
}


# django-ansible-base
ANSIBLE_BASE_TEAM_MODEL = 'main.Team'
ANSIBLE_BASE_ORGANIZATION_MODEL = 'main.Organization'
Expand Down

0 comments on commit 8a902de

Please sign in to comment.