Skip to content

Commit

Permalink
[feat] Add post-collect
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Jul 15, 2024
1 parent aa87a70 commit a065d58
Show file tree
Hide file tree
Showing 12 changed files with 345 additions and 187 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ADD . /single_coordinator
RUN . /usr/local/fix-venv-python3/bin/activate && pip install /single_coordinator && rm -rf /single_coordinator

# Add shim and create symlink
COPY collect_single_shim /usr/local/bin/collect_single_shim
COPY dispatch_executable_shim.sh /usr/local/bin/dispatch_executable_shim
RUN chmod 755 /usr/local/bin/collect_single_shim && ln -s /usr/local/bin/collect_single_shim /usr/bin/collect_single

ENTRYPOINT ["/bin/dumb-init", "--", "/usr/local/sbin/bootstrap", "/usr/bin/collect_single"]
ENTRYPOINT ["/bin/dumb-init", "--", "/usr/local/sbin/bootstrap", "/usr/bin/dispatch_executable_shim"]
131 changes: 0 additions & 131 deletions collect_single/__main__.py

This file was deleted.

148 changes: 113 additions & 35 deletions collect_single/collect_and_sync.py → collect_single/collect_single.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,37 @@

import asyncio
import logging
import os
import sys
from argparse import Namespace, ArgumentParser
from datetime import timedelta
from itertools import takewhile
from pathlib import Path
from typing import List, Optional, Any, Tuple, Dict, cast
from typing import List, Tuple, Dict
from typing import Optional, Any, cast

import yaml
from arango.cursor import Cursor
from fixcloudutils.redis.event_stream import RedisStreamPublisher, Json
from fixcloudutils.logging import setup_logger
from fixcloudutils.redis.event_stream import Json
from fixcloudutils.redis.lock import Lock
from fixcloudutils.redis.pub_sub import RedisPubSubPublisher
from fixcloudutils.service import Service
from fixcloudutils.util import utc, utc_str
from fixcloudutils.util import utc
from fixcloudutils.util import utc_str
from fixcore.core_config import parse_config
from fixcore.db.async_arangodb import AsyncArangoDB
from fixcore.db.db_access import DbAccess
from fixcore.db.timeseriesdb import TimeSeriesDB
from fixcore.system_start import parse_args as core_parse_args
from redis.asyncio import Redis
import prometheus_client

from collect_single.core_client import CoreClient
from collect_single.job import Job
from collect_single.model import MetricQuery
from collect_single.process import ProcessWrapper

log = logging.getLogger("fix.coordinator")


class CollectAndSync(Service):
class CollectSingle(Job):
def __init__(
self,
*,
Expand All @@ -55,36 +59,20 @@ def __init__(
worker_args: List[str],
logging_context: Dict[str, str],
push_gateway_url: Optional[str] = None,
core_url: str = "http://localhost:8980",
) -> None:
self.redis = redis
self.tenant_id = tenant_id
super().__init__(redis=redis, job_id=job_id, tenant_id=tenant_id, push_gateway_url=push_gateway_url)
self.cloud = cloud
self.account_id = account_id
self.job_id = job_id
self.core_args = ["--no-scheduling", "--ignore-interrupted-tasks"] + core_args
self.worker_args = worker_args
self.logging_context = logging_context
self.core_client = CoreClient(core_url)
self.task_id: Optional[str] = None
self.push_gateway_url = push_gateway_url
publisher = "collect-and-sync"
self.progress_update_publisher = RedisPubSubPublisher(redis, f"tenant-events::{tenant_id}", publisher)
self.collect_done_publisher = RedisStreamPublisher(redis, "collect-events", publisher)
self.started_at = utc()
self.worker_connected = asyncio.Event()
self.metrics: List[MetricQuery] = []

async def start(self) -> Any:
await self.progress_update_publisher.start()
await self.collect_done_publisher.start()
await super().start()
self.metrics = self.load_metrics()
# note: the client is not started (core is not running and no certificate required)

async def stop(self) -> None:
await self.core_client.stop()
await self.progress_update_publisher.stop()
await self.collect_done_publisher.stop()

@staticmethod
def load_metrics() -> List[MetricQuery]:
Expand Down Expand Up @@ -140,7 +128,7 @@ async def post_process(self) -> Tuple[Json, List[str]]:
# synchronize the security section
benchmarks = await self.core_client.list_benchmarks(providers=[self.cloud] if self.cloud else None)
if benchmarks:
await self.core_client.create_benchmark_reports(account_id, benchmarks, self.task_id)
await self.core_client.create_benchmark_reports([account_id], benchmarks, self.task_id)
# create metrics
for metric in self.metrics:
res = await self.core_client.timeseries_snapshot(metric, account_id)
Expand Down Expand Up @@ -170,14 +158,6 @@ async def send_result_events(self, read_from_process: bool, error_messages: Opti
},
)

async def push_metrics(self) -> None:
if gateway := self.push_gateway_url:
# Possible future option: retrieve metrics from core and worker and push them to prometheus
prometheus_client.push_to_gateway(
gateway=gateway, job="collect_single", registry=prometheus_client.REGISTRY
)
log.info("Metrics pushed to gateway")

async def migrate_ts_data(self) -> None:
ts_with_account = "for doc in ts filter doc.group.account!=null"
update = (
Expand Down Expand Up @@ -243,3 +223,101 @@ async def sync(self, send_on_failed: bool) -> bool:
await asyncio.wait_for(self.send_result_events(False, [str(ex)]), 600) # wait up to 10 minutes
result_send = True
return result_send


async def startup(
args: Namespace, core_args: List[str], worker_args: List[str], logging_context: Dict[str, str]
) -> None:
redis_args = {}
if args.redis_password:
redis_args["password"] = args.redis_password
if args.redis_url.startswith("rediss://") and args.ca_cert:
redis_args["ssl_ca_certs"] = args.ca_cert
async with Redis.from_url(args.redis_url, decode_responses=True, **redis_args) as redis:

async def collect_and_sync(send_on_failed: bool) -> bool:
async with CollectSingle(
redis=redis,
tenant_id=args.tenant_id,
cloud=args.cloud,
account_id=args.account_id,
job_id=args.job_id,
core_args=core_args,
worker_args=worker_args,
push_gateway_url=args.push_gateway_url,
logging_context=logging_context,
) as cas:
return await cas.sync(send_on_failed)

if retry := args.retry_failed_for:
log.info(f"Collect job with retry enabled for {retry}.")
has_result = False
deadline = utc() + retry
while not has_result and utc() < deadline:
# collect and do not send a message in the failing case
has_result = await collect_and_sync(False)
if not has_result:
log.info("Failed collect with retry enabled. Retrying in 30s.")
await asyncio.sleep(30)
# if we come here without a result, collect and also send a message in the failing case
if not has_result:
log.info("Last attempt to collect with retry enabled.")
await collect_and_sync(True)
else:
await collect_and_sync(True)


def kv_pairs(s: str) -> Tuple[str, str]:
return tuple(s.split("=", maxsplit=1)) # type: ignore


def main() -> None:
# 3 argument sets delimited by "---": <coordinator args> --- <core args> --- <worker args>
# coordinator --main-arg1 --main-arg2 --- --core-arg1 --core-arg2 --- --worker-arg1 --worker-arg2
args = iter(sys.argv[1:])
coordinator_args = list(takewhile(lambda x: x != "---", args))
core_args = list(takewhile(lambda x: x != "---", args))
worker_args = list(args)
# handle coordinator args
parser = ArgumentParser()
parser.add_argument(
"--write",
type=kv_pairs,
help="Write config files in home dir from env vars. Format: --write path/in/home/dir=env-var-name",
default=[],
action="append",
)
parser.add_argument("--job-id", required=True, help="Job Id of the coordinator")
parser.add_argument("--tenant-id", required=True, help="Id of the tenant")
parser.add_argument("--account-id", help="Id of the account")
parser.add_argument("--cloud", help="Cloud provider.")
parser.add_argument("--redis-url", default="redis://localhost:6379/0", help="Redis host.")
parser.add_argument("--redis-password", default=os.environ.get("REDIS_PASSWORD"), help="Redis password")
parser.add_argument("--push-gateway-url", help="Prometheus push gateway url")
parser.add_argument("--ca-cert", help="Path to CA cert file")
parser.add_argument(
"--retry-failed-for", type=lambda x: timedelta(seconds=float(x)), help="Seconds to retry failed jobs."
)
parsed = parser.parse_args(coordinator_args)

# setup logging
logging_context = dict(job_id=parsed.job_id, workspace_id=parsed.tenant_id, cloud_account_id=parsed.account_id)
setup_logger("collect-single", get_logging_context=lambda: {"process": "coordinator", **logging_context})

# write config files from env vars
env_vars = {k.lower(): v for k, v in os.environ.items()}
for home_path, env_var_name in parsed.write:
path = (Path.home() / Path(home_path)).absolute()
content = env_vars.get(env_var_name.lower())
assert content is not None, f"Env var {env_var_name} not found"
log.info(f"Writing file: {path} from env var: {env_var_name}")
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w+") as f:
f.write(content)

log.info(f"Coordinator args:({coordinator_args}) Core args:({core_args}) Worker args:({worker_args})")
asyncio.run(startup(parsed, core_args, worker_args, logging_context))


if __name__ == "__main__":
main()
18 changes: 14 additions & 4 deletions collect_single/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ async def list_benchmarks(self, *, providers: Optional[List[str]] = None) -> Lis
else:
raise AttributeError(await response.text())

async def create_benchmark_reports(self, account_id: str, benchmarks: List[str], task_id: Optional[str]) -> None:
async def create_benchmark_reports(
self, account_ids: List[str], benchmarks: List[str], run_id: Optional[str]
) -> None:
bn = " ".join(benchmarks)
run_id = task_id or str(uuid.uuid4())
command = f"report benchmark run {bn} --accounts {account_id} --sync-security-section --run-id {run_id} | count"
log.info(f"Create reports for following benchmarks: {bn} for accounts: {account_id}. Command: {command}")
an = " ".join(account_ids)
rid = run_id or str(uuid.uuid4())
command = f"report benchmark run {bn} --accounts {an} --sync-security-section --run-id {rid} | count"
log.info(f"Create reports for following benchmarks: {bn} for accounts: {an}. Command: {command}")
async for _ in self.client.cli_execute(command, headers={"Accept": "application/json"}):
pass # ignore the result

Expand Down Expand Up @@ -138,3 +141,10 @@ async def account_id_by_name(self) -> Dict[Optional[str], Optional[str]]:
value_in_path(r, "reported.name"): value_in_path(r, "reported.id")
async for r in self.client.cli_execute("search is(account) | dump")
}

async def merge_deferred_edges(self, task_ids: List[str], *, graph: str = "fix") -> None:
response = await self.client._post(f"/graph/{graph}/merge/deferred_edges", json=task_ids)
if response.status_code == 200:
return await response.json() # type: ignore
else:
raise AttributeError(await response.text())
Loading

0 comments on commit a065d58

Please sign in to comment.