Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Use account id instead of name #27

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 39 additions & 30 deletions collect_single/collect_and_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
import logging
from datetime import timedelta
from pathlib import Path
from typing import List, Optional, Any, Tuple, Dict
from typing import List, Optional, Any, Tuple, Dict, cast

import yaml
from arango.cursor import Cursor
from fixcloudutils.redis.event_stream import RedisStreamPublisher, Json
from fixcloudutils.redis.lock import Lock
from fixcloudutils.redis.pub_sub import RedisPubSubPublisher
from fixcloudutils.service import Service
from fixcloudutils.util import utc, utc_str
from fixcore.core_config import parse_config
from fixcore.db.async_arangodb import AsyncArangoDB
from fixcore.db.db_access import DbAccess
from fixcore.db.timeseriesdb import TimeSeriesDB
from fixcore.system_start import parse_args as core_parse_args
from redis.asyncio import Redis
import prometheus_client

Expand Down Expand Up @@ -55,8 +61,8 @@ def __init__(
self.cloud = cloud
self.account_id = account_id
self.job_id = job_id
self.core_args = ["fixcore", "--no-scheduling", "--ignore-interrupted-tasks"] + core_args
self.worker_args = ["fixworker"] + worker_args
self.core_args = ["--no-scheduling", "--ignore-interrupted-tasks"] + core_args
self.worker_args = worker_args
self.logging_context = logging_context
self.core_client = CoreClient(core_url)
self.task_id: Optional[str] = None
Expand All @@ -71,9 +77,11 @@ def __init__(
async def start(self) -> Any:
await self.progress_update_publisher.start()
await self.collect_done_publisher.start()
await self.core_client.start()
self.metrics = self.load_metrics()

async def stop(self) -> None:
await self.core_client.stop()
await self.progress_update_publisher.stop()
await self.collect_done_publisher.stop()

Expand Down Expand Up @@ -169,45 +177,46 @@ async def push_metrics(self) -> None:
)
log.info("Metrics pushed to gateway")

async def migrate_resoto_graph(self) -> None:
async def copy_graph() -> None:
# double check with lock
graphs = await self.core_client.graphs()
if "resoto" not in graphs:
return
log.info("Found resoto graph. Copy to fix graph.")
await self.core_client.copy_graph("resoto", "fix", force=True)
log.info("Resoto graph copied to fix graph. Delete resoto graph.")
await self.core_client.delete_graph("resoto")
log.info("Delete old resoto configs.")
try:
async for cfg in self.core_client.client.configs():
if cfg.startswith("resoto"):
log.info(f"Delete config: {cfg}")
await self.core_client.client.delete_config(cfg)
except Exception as ex:
log.info(f"Failed to delete resoto configs: {ex}")

# Migrate the resoto graph
graphs = await self.core_client.graphs()
if "resoto" in graphs:
async def migrate_ts_data(self) -> None:
ts_with_account = "for doc in ts filter doc.group.account!=null"
update = (
ts_with_account + " let account_id = NOT_NULL(@accounts[doc.group.account], doc.group.account) "
'UPDATE doc WITH { group: MERGE(UNSET(doc.group, "account"), { account_id: account_id }) } '
"IN ts OPTIONS { mergeObjects: false }"
)

args = core_parse_args(self.core_args)
_, _, sdb = DbAccess.connect(args, timedelta(seconds=120), verify=False)

async def migrate_ts() -> None:
log.info("Redis lock taken. Migrate data.")
config = parse_config(args, {}, lambda: None)
tsdb = TimeSeriesDB(AsyncArangoDB(sdb), "ts", config)
name_by_id = await self.core_client.account_id_by_name()
async with tsdb._lock(): # also take the tsdb lock, so no one else is allowed to change ts data
log.info("TS Lock taken. Migrate TS data to account id")
sdb.aql.execute(update, bind_vars={"accounts": name_by_id})
log.info("TS data migrated")

result = cast(Cursor, sdb.aql.execute(ts_with_account + " LIMIT 1 RETURN doc", count=True))
if result.count():
# pick a global lock
lock = Lock(self.redis, "collect_single_" + self.tenant_id, timedelta(minutes=15).total_seconds())
await lock.with_lock(copy_graph())
await lock.with_lock(migrate_ts())

async def sync(self, send_on_failed: bool) -> bool:
result_send = False
try:
async with ProcessWrapper(self.core_args, self.logging_context):
async with ProcessWrapper(["fixcore", *self.core_args], self.logging_context):
log.info("Core started.")
async with await asyncio.wait_for(self.core_client.wait_connected(), timeout=60):
log.info("Core client connected")
# migrate resoto graph
await self.migrate_resoto_graph()
# migrate timeseries data to account id
await self.migrate_ts_data()
# wait up to 5 minutes for all running workflows to finish
await asyncio.wait_for(self.core_client.wait_for_collect_tasks_to_finish(), timeout=300)
log.info("All collect workflows finished")
async with ProcessWrapper(self.worker_args, self.logging_context):
async with ProcessWrapper(["fixworker", *self.worker_args], self.logging_context):
log.info("Worker started")
try:
# wait for worker to be connected
Expand Down
18 changes: 13 additions & 5 deletions collect_single/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@
import asyncio
import logging
import uuid
from typing import Any, Optional, Dict, List, Set, cast
from typing import Optional, Dict, List, Set, cast

from fixcloudutils.types import Json, JsonElement
from fixclient import Subscriber
from fixclient.async_client import FixInventoryClient
from fixcloudutils.service import Service
from fixcloudutils.types import Json, JsonElement
from fixcore.query import query_parser, Query
from fixcore.query.model import P
from fixlib.json import value_in_path

log = logging.getLogger("fix.coordinator")


class CoreClient:
class CoreClient(Service):
def __init__(self, url: str) -> None:
self.client = FixInventoryClient(url)

async def __aenter__(self) -> None:
async def start(self) -> None:
await self.client.start()

async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
async def stop(self) -> None:
await self.client.shutdown()

async def wait_connected(self) -> CoreClient:
Expand Down Expand Up @@ -125,3 +127,9 @@ async def copy_graph(self, from_graph: str, to_graph: str, *, force: bool = Fals
async def delete_graph(self, graph: str) -> None:
async for _ in self.client.cli_execute(f"graph delete {graph}"):
pass

async def account_id_by_name(self) -> Dict[Optional[str], Optional[str]]:
return {
value_in_path(r, "reported.name"): value_in_path(r, "reported.id")
async for r in self.client.cli_execute("search is(account) | dump")
}
28 changes: 15 additions & 13 deletions collect_single/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ cores_total:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
/ancestors.region.reported.name as region, instance_type as type: sum(instance_cores) as cores_total
): is(instance) and instance_status == running

Expand All @@ -30,7 +30,7 @@ memory_bytes:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
/ancestors.region.reported.name as region,
instance_type as type: sum(instance_memory * 1024 * 1024 * 1024) as memory_bytes
): is(instance) and instance_status == running
Expand All @@ -40,7 +40,7 @@ volume_bytes:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
/ancestors.region.reported.name as region,
volume_type as type,
volume_status as status: sum(volume_size * 1024 * 1024 * 1024) as volume_bytes
Expand All @@ -51,7 +51,7 @@ databases_bytes:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
/ancestors.region.reported.name as region,
db_type as type,
instance_type as instance_type: sum(volume_size * 1024 * 1024 * 1024) as databases_bytes
Expand All @@ -62,7 +62,7 @@ instances_hourly_cost_estimate:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
/ancestors.region.reported.name as region,
instance_type as type: sum(/ancestors.instance_type.reported.ondemand_cost) as instances_hourly_cost_estimate
): is(instance) and instance_status == running
Expand All @@ -72,7 +72,7 @@ volumes_monthly_cost_estimate:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
/ancestors.region.reported.name as region,
volume_type as type,
volume_status as status: sum(/ancestors.volume_type.reported.ondemand_cost) as volumes_monthly_cost_estimate
Expand All @@ -83,7 +83,7 @@ databases_total:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
/ancestors.region.reported.name as region,
db_type as type,
instance_type as instance_type: sum(1) as databases_total
Expand All @@ -94,7 +94,7 @@ volumes_total:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
/ancestors.region.reported.name as region,
volume_type as type,
volume_status as status: sum(1) as volumes_total
Expand All @@ -105,7 +105,7 @@ instances_total:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
/ancestors.region.reported.name as region,
instance_type as type,
instance_status as status: sum(1) as instances_total
Expand All @@ -114,15 +114,17 @@ instances_total:
account_score:
description: Account score by account and cloud
search: |
aggregate( id as account_id, /ancestors.cloud.reported.id as cloud_id: avg(/metadata.score) as score ):
is(account) and /metadata.score!=null
aggregate(
id as account_id,
/ancestors.cloud.reported.id as cloud_id: avg(/metadata.score) as score
): is(account) and /metadata.score!=null

buckets_size_bytes:
description: Size of Buckets in bytes by cloud, account, region and type
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
bucket_location as region,
name as name: sum(/usage.bucket_size_bytes.avg) as bucket_size_bytes
): is(bucket)
Expand All @@ -132,7 +134,7 @@ buckets_objects_total:
search: |
aggregate(
/ancestors.cloud.reported.name as cloud,
/ancestors.account.reported.name as account,
/ancestors.account.reported.id as account_id,
bucket_location as region,
name as name: sum(/usage.number_of_objects_count.avg) as buckets_objects_total
): is(bucket)
6 changes: 6 additions & 0 deletions tests/collect_and_sync_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ async def test_collect_and_sync(collect_and_sync: CollectAndSync) -> None:
await collect_and_sync.send_result_events(True)


@pytest.mark.asyncio
@pytest.mark.skip(reason="Only for manual testing")
async def test_migrate_ts(collect_and_sync: CollectAndSync) -> None:
await collect_and_sync.migrate_ts_data()


def test_load_metrics() -> None:
metrics = CollectAndSync.load_metrics()
assert len(metrics) >= 14
Expand Down
7 changes: 4 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ async def core_client() -> AsyncIterator[FixInventoryClient]:


@fixture
async def collect_and_sync(redis: Redis) -> CollectAndSync: # type: ignore
return CollectAndSync(
async def collect_and_sync(redis: Redis) -> AsyncIterator[CollectAndSync]: # type: ignore
async with CollectAndSync(
redis=redis,
tenant_id="tenant_id",
cloud="aws",
Expand All @@ -56,4 +56,5 @@ async def collect_and_sync(redis: Redis) -> CollectAndSync: # type: ignore
core_args=[],
worker_args=[],
logging_context={},
)
) as collect_and_sync:
yield collect_and_sync
21 changes: 14 additions & 7 deletions tests/core_client_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import os
import uuid
from typing import AsyncIterator

import pytest

Expand All @@ -9,8 +10,9 @@


@pytest.fixture
async def core_client() -> CoreClient:
return CoreClient("http://localhost:8980")
async def core_client() -> AsyncIterator[CoreClient]:
async with CoreClient("http://localhost:8980") as client:
yield client


@pytest.mark.skipif(os.environ.get("CORE_RUNNING") is None, reason="No core running")
Expand Down Expand Up @@ -62,11 +64,6 @@ async def test_create_benchmark_report(core_client: CoreClient) -> None:
assert res[0]["count"] > 10


@pytest.mark.skipif(os.environ.get("CORE_RUNNING") is None, reason="No core running")
async def test_wait_for_worker_subscribed(core_client: CoreClient) -> None:
await asyncio.wait_for(core_client.wait_for_worker_subscribed(), timeout=1)


@pytest.mark.skipif(os.environ.get("CORE_RUNNING") is None, reason="No core running")
async def test_wait_for_collect_task_to_finish(core_client: CoreClient) -> None:
await core_client.start_workflow("collect")
Expand Down Expand Up @@ -94,3 +91,13 @@ async def test_list_graphs(core_client: CoreClient) -> None:
assert isinstance(result, set)
for i in result:
assert isinstance(i, str)


@pytest.mark.skipif(os.environ.get("CORE_RUNNING") is None, reason="No core running")
async def test_account_by_id(core_client: CoreClient) -> None:
result = await core_client.account_id_by_name()
assert isinstance(result, dict)
assert len(result) > 0
for k, v in result.items():
assert isinstance(k, str)
assert isinstance(v, str)
Loading