diff --git a/src/backend/api/bk_base/default.py b/src/backend/api/bk_base/default.py index 4764140e..21d012ea 100644 --- a/src/backend/api/bk_base/default.py +++ b/src/backend/api/bk_base/default.py @@ -305,6 +305,13 @@ class GetResultTable(BkBaseResource): url_keys = ["result_table_id"] +class GetRtStorages(BkBaseResource): + name = gettext_lazy("获取结果表的存储信息") + action = "/v3/meta/result_tables/{result_table_id}/storages/" + url_keys = ["result_table_id"] + method = "GET" + + class GetProjectData(BkBaseResource): name = gettext_lazy("列举项目相关数据") action = "/v3/auth/projects/{project_id}/data/" diff --git a/src/backend/services/web/analyze/controls/rule_audit.py b/src/backend/services/web/analyze/controls/rule_audit.py index 4065b29c..723e869f 100644 --- a/src/backend/services/web/analyze/controls/rule_audit.py +++ b/src/backend/services/web/analyze/controls/rule_audit.py @@ -53,6 +53,7 @@ from services.web.analyze.controls.base import BaseControl from services.web.analyze.exceptions import NotSupportDataSource from services.web.analyze.storage_node import ( + BaseStorageNode, ESStorageNode, HDFSStorageNode, QueueStorageNode, @@ -160,9 +161,8 @@ def check_source_type(self, result_table_id: str) -> str: result_table["processing_type"] == ResultTableType.CDC or result_table["result_table_type"] == ResultTableType.STATIC ): - if ( - source_type == FlowDataSourceNodeType.REALTIME - and BkBaseStorageType.REDIS in result_table["storage_types"] + if source_type == FlowDataSourceNodeType.REALTIME and BkBaseStorageType.REDIS in result_table.get( + "storages", {} ): return FlowDataSourceNodeType.REDIS_KV_SOURCE return FlowDataSourceNodeType.BATCH @@ -371,11 +371,12 @@ def create_or_update_storage_nodes(self, need_create: bool, flow_id: int, sql_no self.x += self.x_interval self.y = self.y_interval bk_biz_id = int(self.rt_ids[0].split("_", 1)[0]) + from_result_table_ids = [BaseStorageNode.build_rt_id(bk_biz_id, self.raw_table_name)] storage_node_ids = [] if need_create else self.strategy.backend_data.get("storage_node_ids", []) storage_nodes = [ESStorageNode, QueueStorageNode, HDFSStorageNode] for idx, storage_node in enumerate(storage_nodes): node_config = storage_node(namespace=self.strategy.namespace).build_node_config( - bk_biz_id=bk_biz_id, raw_table_name=self.raw_table_name + bk_biz_id=bk_biz_id, raw_table_name=self.raw_table_name, from_result_table_ids=from_result_table_ids ) if not node_config: continue diff --git a/src/backend/services/web/analyze/storage_node.py b/src/backend/services/web/analyze/storage_node.py index 2966752f..1a39f1ae 100644 --- a/src/backend/services/web/analyze/storage_node.py +++ b/src/backend/services/web/analyze/storage_node.py @@ -16,7 +16,7 @@ to the current version of the project delivered to anyone in the future. """ from abc import ABC, abstractmethod -from typing import Union +from typing import List, Union from bk_resource import resource from django.conf import settings @@ -51,10 +51,11 @@ def expires(self) -> int: def cluster(self) -> Union[str, int]: raise NotImplementedError() - def build_rt_id(self, bk_biz_id: int, table_name: str) -> str: + @classmethod + def build_rt_id(cls, bk_biz_id: int, table_name: str) -> str: return f"{bk_biz_id}_{table_name}" - def build_node_config(self, bk_biz_id: int, raw_table_name: str) -> dict: + def build_node_config(self, bk_biz_id: int, raw_table_name: str, from_result_table_ids: List[str]) -> dict: if not self.cluster: return {} result_table_id = self.build_rt_id(bk_biz_id, raw_table_name) @@ -93,10 +94,10 @@ def cluster(self) -> Union[str, int]: bkbase_cluster_id = cluster_info["cluster_config"].get("custom_option", {}).get("bkbase_cluster_id") return bkbase_cluster_id - def build_node_config(self, bk_biz_id: int, raw_table_name: str) -> dict: + def build_node_config(self, bk_biz_id: int, raw_table_name: str, from_result_table_ids: List[str]) -> dict: table_id = EventHandler.get_table_id().replace(".", "_") return { - **super().build_node_config(bk_biz_id, raw_table_name), + **super().build_node_config(bk_biz_id, raw_table_name, from_result_table_ids), "indexed_fields": [], "has_replica": False, "has_unique_key": False,