Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add post-collect #31

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ADD . /single_coordinator
RUN . /usr/local/fix-venv-python3/bin/activate && pip install /single_coordinator && rm -rf /single_coordinator

# Add shim and create symlink
COPY collect_single_shim /usr/local/bin/collect_single_shim
RUN chmod 755 /usr/local/bin/collect_single_shim && ln -s /usr/local/bin/collect_single_shim /usr/bin/collect_single
COPY dispatch_executable_shim.sh /usr/local/bin/dispatch_executable_shim
RUN chmod 755 /usr/local/bin/dispatch_executable_shim && ln -s /usr/local/bin/dispatch_executable_shim /usr/bin/dispatch_executable

ENTRYPOINT ["/bin/dumb-init", "--", "/usr/local/sbin/bootstrap", "/usr/bin/collect_single"]
ENTRYPOINT ["/bin/dumb-init", "--", "/usr/local/sbin/bootstrap", "/usr/bin/dispatch_executable"]
131 changes: 0 additions & 131 deletions collect_single/__main__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,37 @@

import asyncio
import logging
import os
import sys
from argparse import Namespace, ArgumentParser
from datetime import timedelta
from itertools import takewhile
from pathlib import Path
from typing import List, Optional, Any, Tuple, Dict, cast
from typing import List, Tuple, Dict
from typing import Optional, Any, cast

import yaml
from arango.cursor import Cursor
from fixcloudutils.redis.event_stream import RedisStreamPublisher, Json
from fixcloudutils.logging import setup_logger
from fixcloudutils.redis.event_stream import Json
from fixcloudutils.redis.lock import Lock
from fixcloudutils.redis.pub_sub import RedisPubSubPublisher
from fixcloudutils.service import Service
from fixcloudutils.util import utc, utc_str
from fixcloudutils.util import utc
from fixcloudutils.util import utc_str
from fixcore.core_config import parse_config
from fixcore.db.async_arangodb import AsyncArangoDB
from fixcore.db.db_access import DbAccess
from fixcore.db.timeseriesdb import TimeSeriesDB
from fixcore.system_start import parse_args as core_parse_args
from redis.asyncio import Redis
import prometheus_client

from collect_single.core_client import CoreClient
from collect_single.job import Job
from collect_single.model import MetricQuery
from collect_single.process import ProcessWrapper

log = logging.getLogger("fix.coordinator")


class CollectAndSync(Service):
class CollectSingle(Job):
def __init__(
self,
*,
Expand All @@ -55,36 +59,20 @@ def __init__(
worker_args: List[str],
logging_context: Dict[str, str],
push_gateway_url: Optional[str] = None,
core_url: str = "http://localhost:8980",
) -> None:
self.redis = redis
self.tenant_id = tenant_id
super().__init__(redis=redis, job_id=job_id, tenant_id=tenant_id, push_gateway_url=push_gateway_url)
self.cloud = cloud
self.account_id = account_id
self.job_id = job_id
self.core_args = ["--no-scheduling", "--ignore-interrupted-tasks"] + core_args
self.worker_args = worker_args
self.logging_context = logging_context
self.core_client = CoreClient(core_url)
self.task_id: Optional[str] = None
self.push_gateway_url = push_gateway_url
publisher = "collect-and-sync"
self.progress_update_publisher = RedisPubSubPublisher(redis, f"tenant-events::{tenant_id}", publisher)
self.collect_done_publisher = RedisStreamPublisher(redis, "collect-events", publisher)
self.started_at = utc()
self.worker_connected = asyncio.Event()
self.metrics: List[MetricQuery] = []

async def start(self) -> Any:
await self.progress_update_publisher.start()
await self.collect_done_publisher.start()
await super().start()
self.metrics = self.load_metrics()
# note: the client is not started (core is not running and no certificate required)

async def stop(self) -> None:
await self.core_client.stop()
await self.progress_update_publisher.stop()
await self.collect_done_publisher.stop()

@staticmethod
def load_metrics() -> List[MetricQuery]:
Expand Down Expand Up @@ -140,7 +128,7 @@ async def post_process(self) -> Tuple[Json, List[str]]:
# synchronize the security section
benchmarks = await self.core_client.list_benchmarks(providers=[self.cloud] if self.cloud else None)
if benchmarks:
await self.core_client.create_benchmark_reports(account_id, benchmarks, self.task_id)
await self.core_client.create_benchmark_reports([account_id], benchmarks, self.task_id)
# create metrics
for metric in self.metrics:
res = await self.core_client.timeseries_snapshot(metric, account_id)
Expand Down Expand Up @@ -170,14 +158,6 @@ async def send_result_events(self, read_from_process: bool, error_messages: Opti
},
)

async def push_metrics(self) -> None:
if gateway := self.push_gateway_url:
# Possible future option: retrieve metrics from core and worker and push them to prometheus
prometheus_client.push_to_gateway(
gateway=gateway, job="collect_single", registry=prometheus_client.REGISTRY
)
log.info("Metrics pushed to gateway")

async def migrate_ts_data(self) -> None:
ts_with_account = "for doc in ts filter doc.group.account!=null"
update = (
Expand Down Expand Up @@ -243,3 +223,101 @@ async def sync(self, send_on_failed: bool) -> bool:
await asyncio.wait_for(self.send_result_events(False, [str(ex)]), 600) # wait up to 10 minutes
result_send = True
return result_send


async def startup(
args: Namespace, core_args: List[str], worker_args: List[str], logging_context: Dict[str, str]
) -> None:
redis_args = {}
if args.redis_password:
redis_args["password"] = args.redis_password
if args.redis_url.startswith("rediss://") and args.ca_cert:
redis_args["ssl_ca_certs"] = args.ca_cert
async with Redis.from_url(args.redis_url, decode_responses=True, **redis_args) as redis:

async def collect_and_sync(send_on_failed: bool) -> bool:
async with CollectSingle(
redis=redis,
tenant_id=args.tenant_id,
cloud=args.cloud,
account_id=args.account_id,
job_id=args.job_id,
core_args=core_args,
worker_args=worker_args,
push_gateway_url=args.push_gateway_url,
logging_context=logging_context,
) as cas:
return await cas.sync(send_on_failed)

if retry := args.retry_failed_for:
log.info(f"Collect job with retry enabled for {retry}.")
has_result = False
deadline = utc() + retry
while not has_result and utc() < deadline:
# collect and do not send a message in the failing case
has_result = await collect_and_sync(False)
if not has_result:
log.info("Failed collect with retry enabled. Retrying in 30s.")
await asyncio.sleep(30)
# if we come here without a result, collect and also send a message in the failing case
if not has_result:
log.info("Last attempt to collect with retry enabled.")
await collect_and_sync(True)
else:
await collect_and_sync(True)


def kv_pairs(s: str) -> Tuple[str, str]:
return tuple(s.split("=", maxsplit=1)) # type: ignore


def main() -> None:
# 3 argument sets delimited by "---": <coordinator args> --- <core args> --- <worker args>
# coordinator --main-arg1 --main-arg2 --- --core-arg1 --core-arg2 --- --worker-arg1 --worker-arg2
args = iter(sys.argv[1:])
coordinator_args = list(takewhile(lambda x: x != "---", args))
core_args = list(takewhile(lambda x: x != "---", args))
worker_args = list(args)
# handle coordinator args
parser = ArgumentParser()
parser.add_argument(
"--write",
type=kv_pairs,
help="Write config files in home dir from env vars. Format: --write path/in/home/dir=env-var-name",
default=[],
action="append",
)
parser.add_argument("--job-id", required=True, help="Job Id of the coordinator")
parser.add_argument("--tenant-id", required=True, help="Id of the tenant")
parser.add_argument("--account-id", help="Id of the account")
parser.add_argument("--cloud", help="Cloud provider.")
parser.add_argument("--redis-url", default="redis://localhost:6379/0", help="Redis host.")
parser.add_argument("--redis-password", default=os.environ.get("REDIS_PASSWORD"), help="Redis password")
parser.add_argument("--push-gateway-url", help="Prometheus push gateway url")
parser.add_argument("--ca-cert", help="Path to CA cert file")
parser.add_argument(
"--retry-failed-for", type=lambda x: timedelta(seconds=float(x)), help="Seconds to retry failed jobs."
)
parsed = parser.parse_args(coordinator_args)

# setup logging
logging_context = dict(job_id=parsed.job_id, workspace_id=parsed.tenant_id, cloud_account_id=parsed.account_id)
setup_logger("collect-single", get_logging_context=lambda: {"process": "coordinator", **logging_context})

# write config files from env vars
env_vars = {k.lower(): v for k, v in os.environ.items()}
for home_path, env_var_name in parsed.write:
path = (Path.home() / Path(home_path)).absolute()
content = env_vars.get(env_var_name.lower())
assert content is not None, f"Env var {env_var_name} not found"
log.info(f"Writing file: {path} from env var: {env_var_name}")
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w+") as f:
f.write(content)

log.info(f"Coordinator args:({coordinator_args}) Core args:({core_args}) Worker args:({worker_args})")
asyncio.run(startup(parsed, core_args, worker_args, logging_context))


if __name__ == "__main__":
main()
18 changes: 14 additions & 4 deletions collect_single/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ async def list_benchmarks(self, *, providers: Optional[List[str]] = None) -> Lis
else:
raise AttributeError(await response.text())

async def create_benchmark_reports(self, account_id: str, benchmarks: List[str], task_id: Optional[str]) -> None:
async def create_benchmark_reports(
self, account_ids: List[str], benchmarks: List[str], run_id: Optional[str]
) -> None:
bn = " ".join(benchmarks)
run_id = task_id or str(uuid.uuid4())
command = f"report benchmark run {bn} --accounts {account_id} --sync-security-section --run-id {run_id} | count"
log.info(f"Create reports for following benchmarks: {bn} for accounts: {account_id}. Command: {command}")
an = " ".join(account_ids)
rid = run_id or str(uuid.uuid4())
command = f"report benchmark run {bn} --accounts {an} --sync-security-section --run-id {rid} | count"
log.info(f"Create reports for following benchmarks: {bn} for accounts: {an}. Command: {command}")
async for _ in self.client.cli_execute(command, headers={"Accept": "application/json"}):
pass # ignore the result

Expand Down Expand Up @@ -138,3 +141,10 @@ async def account_id_by_name(self) -> Dict[Optional[str], Optional[str]]:
value_in_path(r, "reported.name"): value_in_path(r, "reported.id")
async for r in self.client.cli_execute("search is(account) | dump")
}

async def merge_deferred_edges(self, task_ids: List[str], *, graph: str = "fix") -> None:
response = await self.client._post(f"/graph/{graph}/merge/deferred_edges", json=task_ids)
if response.status_code == 200:
return await response.json() # type: ignore
else:
raise AttributeError(await response.text())
Loading
Loading