Skip to content

Commit

Permalink
支持按网关拉取请求量数据 (#372)
Browse files Browse the repository at this point in the history
* 支持按网关拉取请求量数据
  • Loading branch information
alex-smile authored Nov 27, 2023
1 parent e7bcd6e commit 406e8a2
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,45 @@
# We undertake not to change the open source license (MIT license) applicable
# to the current version of the project delivered to anyone in the future.
#
import logging
import random
from abc import abstractmethod
from time import sleep
from typing import Optional

from django.conf import settings

from apigateway.components.exceptions import RemoteAPIResultError, RemoteRequestError
from apigateway.components.prometheus import prometheus_component

from .base import BasePrometheusMetrics

logger = logging.getLogger(__name__)


class BaseStatisticsMetrics(BasePrometheusMetrics):
@abstractmethod
def _get_query_promql(self, step: str):
def _get_query_promql(self, step: str, gateway_name: Optional[str] = None):
pass

def query(self, time_: int, step: str):
return prometheus_component.query(
bk_biz_id=getattr(settings, "BCS_CLUSTER_BK_BIZ_ID", ""),
promql=self._get_query_promql(step),
time_=time_,
)
def query(self, time_: int, step: str, gateway_name: Optional[str] = None):
bk_biz_id = getattr(settings, "BCS_CLUSTER_BK_BIZ_ID", "")
promql = self._get_query_promql(step, gateway_name)

try:
return prometheus_component.query(bk_biz_id=bk_biz_id, promql=promql, time_=time_)
except (RemoteRequestError, RemoteAPIResultError) as err:
logger.warning("fetch statistics metrics data error: %s, will retry.", err)
# 此接口涉及定时拉取网关请求量数据,为保证成功率,添加重试
sleep(random.uniform(0.2, 1))
return prometheus_component.query(bk_biz_id=bk_biz_id, promql=promql, time_=time_)


class StatisticsAPIRequestMetrics(BaseStatisticsMetrics):
def _get_query_promql(self, step):
def _get_query_promql(self, step: str, gateway_name: Optional[str] = None):
labels = self._get_labels_expression(
[
("api_name", "=", gateway_name),
*self.default_labels,
]
)
Expand All @@ -53,9 +66,10 @@ def _get_query_promql(self, step):


class StatisticsAPIRequestDurationMetrics(BaseStatisticsMetrics):
def _get_query_promql(self, step):
def _get_query_promql(self, step: str, gateway_name: Optional[str] = None):
labels = self._get_labels_expression(
[
("api_name", "=", gateway_name),
*self.default_labels,
]
)
Expand All @@ -71,9 +85,10 @@ class StatisticsAppRequestMetrics(BaseStatisticsMetrics):
根据网关、环境、资源,统计应用请求量
"""

def _get_query_promql(self, step):
def _get_query_promql(self, step: str, gateway_name: Optional[str] = None):
labels = self._get_labels_expression(
[
("api_name", "=", gateway_name),
*self.default_labels,
]
)
Expand All @@ -89,9 +104,10 @@ class StatisticsAppRequestByResourceMetrics(BaseStatisticsMetrics):
根据网关、资源,统计应用请求量
"""

def _get_query_promql(self, step):
def _get_query_promql(self, step: str, gateway_name: Optional[str] = None):
labels = self._get_labels_expression(
[
("api_name", "=", gateway_name),
*self.default_labels,
]
)
Expand Down
104 changes: 52 additions & 52 deletions src/dashboard/apigateway/apigateway/apps/metrics/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
#
import logging
from collections import defaultdict
from typing import Optional
from typing import Dict, List, Optional

from apigateway.core.constants import APIStatusEnum
from apigateway.core.models import Gateway, Resource
from apigateway.utils.time import utctime

from .models import StatisticsAPIRequestByDay, StatisticsAppRequestByDay
from .prometheus.statistics import (
StatisticsAPIRequestDurationMetrics,
StatisticsAPIRequestMetrics,
StatisticsAppRequestMetrics,
)
Expand All @@ -41,60 +41,54 @@ def __init__(self):
self._gateway_id_to_resources = {}

def stats(self, start: int, end: int, step: str):
# 1. 清理统计时间重复的数据
self._clear_data_by_stats_time(start)

# 2. 写入统计数据
self._save_api_request_data(start, end, step)
self._save_app_request_data(start, end, step)

def _clear_data_by_stats_time(self, start):
start_time = utctime(start).datetime

StatisticsAPIRequestByDay.objects.filter(start_time=start_time).delete()
StatisticsAppRequestByDay.objects.filter(start_time=start_time).delete()

def _save_api_request_data(self, start, end, step):
api_request_count = StatisticsAPIRequestMetrics().query(end, step)
if not api_request_count.get("series"):
logger.error("The resource request data obtained from Prometheus is empty, skip statistics.")
self._stats_gateway_request_data(start, end, step)
self._stats_app_request_data(start, end, step)

def _stats_gateway_request_data(self, start: int, end: int, step: str):
# 清理旧数据
StatisticsAPIRequestByDay.objects.filter(start_time=utctime(start).datetime).delete()

# 按网关拉取,写入新数据;全量拉取时,数据量过大可能拉不到
for gateway_name in self._get_active_gateway_names():
self._save_gateway_request_data(start, end, step, gateway_name)

def _stats_app_request_data(self, start: int, end: int, step: str):
# 清理旧数据
StatisticsAppRequestByDay.objects.filter(start_time=utctime(start).datetime).delete()

# 按网关拉取,写入新数据;全量拉取时,数据量过大可能拉不到
for gateway_name in self._get_active_gateway_names():
self._save_app_request_data(start, end, step, gateway_name)

def _save_gateway_request_data(self, start: int, end: int, step: str, gateway_name: str):
gateway_request_count = StatisticsAPIRequestMetrics().query(end, step, gateway_name)
if not gateway_request_count.get("series"):
logger.info(
"gateway: %s, the resource request data obtained from Prometheus is empty, skip statistics.",
gateway_name,
)
return

api_request_duration = StatisticsAPIRequestDurationMetrics().query(end, step)
if not api_request_duration.get("series"):
logger.warning("The resource request duration data obtained from Prometheus is empty.")
# 获取失败,则数据中不记录耗时,但不影响核心服务

# 统计请求数/失败请求数
api_request_data = defaultdict(dict)
for item in api_request_count["series"]:
gateway_name_to_request_data: Dict = defaultdict(dict)
for item in gateway_request_count["series"]:
dimensions = item["dimensions"]

gateway_name = dimensions["api_name"]
_gateway_name = dimensions["api_name"]
key = f'{dimensions["stage_name"]}:{dimensions["resource_name"]}'
api_request_data[gateway_name].setdefault(key, defaultdict(float))
gateway_name_to_request_data[_gateway_name].setdefault(key, defaultdict(float))

count = item["datapoints"][0][0]
api_request_data[gateway_name][key]["total_count"] += count
gateway_name_to_request_data[_gateway_name][key]["total_count"] += count
if dimensions["proxy_error"] != "0":
api_request_data[gateway_name][key]["failed_count"] += count

# 统计请求总耗时
for item in api_request_duration.get("series", []):
dimensions = item["dimensions"]

gateway_name = dimensions["api_name"]
key = f'{dimensions["stage_name"]}:{dimensions["resource_name"]}'

if gateway_name in api_request_data and key in api_request_data[gateway_name]:
api_request_data[gateway_name][key]["total_msecs"] = item["datapoints"][0][0]
gateway_name_to_request_data[_gateway_name][key]["failed_count"] += count

# 保存数据
statistics_record = []
for gateway_name, gateway_request_data in api_request_data.items():
gateway_id = self._get_gateway_id(gateway_name)
for _gateway_name, gateway_request_data in gateway_name_to_request_data.items():
gateway_id = self._get_gateway_id(_gateway_name)
if not gateway_id:
logger.warning("gateway (name=%s) does not exist, skip save api statistics.", gateway_name)
logger.warning("gateway (name=%s) does not exist, skip save api statistics.", _gateway_name)
continue

for key, request_data in gateway_request_data.items():
Expand All @@ -107,15 +101,14 @@ def _save_api_request_data(self, start, end, step):
logger.warning(
"resource (name=%s) of gateway (name=%s) does not exist, skip save api statistics.",
resource_name,
gateway_name,
_gateway_name,
)
continue

statistics_record.append(
StatisticsAPIRequestByDay(
total_count=int(request_data["total_count"]),
failed_count=int(request_data["failed_count"]),
total_msecs=int(request_data["total_msecs"]),
start_time=utctime(start).datetime,
end_time=utctime(end).datetime,
api_id=gateway_id,
Expand All @@ -126,10 +119,13 @@ def _save_api_request_data(self, start, end, step):

StatisticsAPIRequestByDay.objects.bulk_create(statistics_record, batch_size=100)

def _save_app_request_data(self, start, end, step):
app_request_count = StatisticsAppRequestMetrics().query(end, step)
def _save_app_request_data(self, start: int, end: int, step: str, gateway_name: str):
app_request_count = StatisticsAppRequestMetrics().query(end, step, gateway_name)
if not app_request_count.get("series"):
logger.error("The app request data obtained from Prometheus is empty, skip statistics.")
logger.info(
"gateway: %s, the app request data obtained from Prometheus is empty, skip statistics.",
gateway_name,
)
return

# 保存数据
Expand All @@ -141,21 +137,21 @@ def _save_app_request_data(self, start, end, step):
continue

dimensions = item["dimensions"]
gateway_name = dimensions["api_name"]
_gateway_name = dimensions["api_name"]
resource_name = dimensions["resource_name"]
bk_app_code = dimensions.get("bk_app_code") or dimensions.get("app_code", "")

gateway_id = self._get_gateway_id(gateway_name)
gateway_id = self._get_gateway_id(_gateway_name)
if not gateway_id:
logger.warning("gateway (name=%s) does not exist, skip save app statistics.", gateway_name)
logger.warning("gateway (name=%s) does not exist, skip save app statistics.", _gateway_name)
continue

resource_id = self._get_resource_id(gateway_id, resource_name)
if not resource_id:
logger.warning(
"resource (name=%s) of gateway (name=%s) does not exist, skip save app statistics.",
resource_name,
gateway_name,
_gateway_name,
)
continue

Expand All @@ -173,6 +169,10 @@ def _save_app_request_data(self, start, end, step):

StatisticsAppRequestByDay.objects.bulk_create(statistics_record, batch_size=100)

def _get_active_gateway_names(self) -> List[str]:
# 如果网关已下线,则不再统计
return list(Gateway.objects.filter(status=APIStatusEnum.ACTIVE.value).values_list("name", flat=True))

def _get_gateway_id(self, gateway_name: str) -> Optional[int]:
return self._gateway_name_to_id.get(gateway_name)

Expand Down
3 changes: 3 additions & 0 deletions src/dashboard/apigateway/apigateway/components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@
# We undertake not to change the open source license (MIT license) applicable
# to the current version of the project delivered to anyone in the future.
#
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
4 changes: 2 additions & 2 deletions src/dashboard/apigateway/apigateway/conf/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,11 @@
{
"apigateway.apps.metrics.tasks.statistics_request_by_day": {
"task": "apigateway.apps.metrics.tasks.statistics_request_by_day",
"schedule": crontab(minute=30, hour=0),
"schedule": crontab(minute=30, hour=8),
},
"apigateway.apps.permission.tasks.renew_app_resource_permission": {
"task": "apigateway.apps.permission.tasks.renew_app_resource_permission",
"schedule": crontab(minute=50, hour=0),
"schedule": crontab(minute=50, hour=10),
},
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,27 @@ def fake_statistics_app_request_metrics(fake_resource):


class TestStatisticsHandler:
def test_save_api_request_data(
def test_save_gateway_request_data(
self,
mocker,
fake_resource,
fake_statistics_api_request_metrics,
fake_statistics_api_request_duration_metrics,
):
mocker.patch(
"apigateway.apps.metrics.statistics.StatisticsAPIRequestMetrics.query",
return_value=fake_statistics_api_request_metrics,
)
mocker.patch(
"apigateway.apps.metrics.statistics.StatisticsAPIRequestDurationMetrics.query",
return_value=fake_statistics_api_request_duration_metrics,
)
fake_gateway = fake_resource.api

now = now_datetime()

handler = StatisticsHandler()
handler._save_api_request_data(now, now, "1m")
handler._save_gateway_request_data(now, now, "1m", "my-gateway")

assert StatisticsAPIRequestByDay.objects.filter(api_id=fake_gateway.id).count() == 1
record = StatisticsAPIRequestByDay.objects.get(api_id=fake_gateway.id, resource_id=fake_resource.id)
assert record.total_count == 7
assert record.failed_count == 2
assert record.total_msecs == 154

def test_save_app_request_data(self, mocker, fake_resource, fake_statistics_app_request_metrics):
fake_gateway = fake_resource.api
Expand All @@ -149,7 +143,7 @@ def test_save_app_request_data(self, mocker, fake_resource, fake_statistics_app_
now = now_datetime()

handler = StatisticsHandler()
handler._save_app_request_data(now, now, "1m")
handler._save_app_request_data(now, now, "1m", "my-gateway")

assert StatisticsAppRequestByDay.objects.filter(api_id=fake_gateway.id).count() == 3
assert StatisticsAppRequestByDay.objects.filter(api_id=fake_gateway.id, bk_app_code="app2").count() == 2
Expand Down

0 comments on commit 406e8a2

Please sign in to comment.