Skip to content

Commit

Permalink
Update approach to logging in connector component (#2809)
Browse files Browse the repository at this point in the history
Co-authored-by: Jedr Blaszyk <[email protected]>
  • Loading branch information
artem-shelkovnikov and jedrazb authored Sep 24, 2024
1 parent 4f6ca9b commit 6e74c3a
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 62 deletions.
4 changes: 3 additions & 1 deletion connectors/agent/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from elastic_agent_client.util.async_tools import (
sleeps_for_retryable,
)
from elastic_agent_client.util.logger import logger

from connectors.agent.component import ConnectorsAgentComponent
from connectors.agent.logger import get_logger

logger = get_logger("cli")


def main(args=None):
Expand Down
5 changes: 5 additions & 0 deletions connectors/agent/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
from elastic_agent_client.service.checkin import CheckinV2Service

from connectors.agent.config import ConnectorsAgentConfigurationWrapper
from connectors.agent.logger import get_logger
from connectors.agent.protocol import ConnectorActionHandler, ConnectorCheckinHandler
from connectors.agent.service_manager import ConnectorServiceManager
from connectors.services.base import MultiService

logger = get_logger("component")

CONNECTOR_SERVICE = "connector-service"


Expand Down Expand Up @@ -51,6 +54,7 @@ async def run(self):
Additionally services for handling Check-in and Actions will be started to implement the protocol correctly.
"""
logger.info("Starting connectors agent component")
client = new_v2_from_reader(self.buffer, self.ver, self.opts)
action_handler = ConnectorActionHandler()
self.connector_service_manager = ConnectorServiceManager(self.config_wrapper)
Expand All @@ -71,4 +75,5 @@ def stop(self, sig):
Attempts to gracefully shutdown the services that are running under the component.
"""
logger.info("Shutting down connectors agent component")
self.multi_service.shutdown(sig)
86 changes: 75 additions & 11 deletions connectors/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
#
import base64

from connectors.agent.logger import get_logger
from connectors.config import add_defaults
from connectors.utils import nested_get_from_dict

logger = get_logger("config")


class ConnectorsAgentConfigurationWrapper:
Expand All @@ -25,10 +29,11 @@ def __init__(self):
Service config and specific config coming from Agent.
"""
self._default_config = {
"_force_allow_native": True,
"service": {
"log_level": "INFO",
"_use_native_connector_api_keys": False,
},
"_force_allow_native": True,
"native_service_types": [
"azure_blob_storage",
"box",
Expand Down Expand Up @@ -60,7 +65,7 @@ def __init__(self):

self.specific_config = {}

def try_update(self, source):
def try_update(self, unit):
"""Try update the configuration and see if it changed.
This method takes the check-in event coming from Agent and checks if config needs an update.
Expand All @@ -69,33 +74,92 @@ def try_update(self, source):
the method returns False.
"""

source = unit.config.source

# TODO: find a good link to what this object is.
has_hosts = source.fields.get("hosts")
has_api_key = source.fields.get("api_key")
has_basic_auth = source.fields.get("username") and source.fields.get("password")

assumed_configuration = {}

# Log-related
assumed_configuration["service"] = {}
assumed_configuration["service"]["log_level"] = unit.log_level

# Auth-related
if has_hosts and (has_api_key or has_basic_auth):
es_creds = {
"host": source["hosts"][0],
}
es_creds = {"host": source["hosts"][0]}

if source.fields.get("api_key"):
logger.debug("Found api_key")
api_key = source["api_key"]
# if beats_logstash_format we need to base64 the key
if ":" in api_key:
api_key = base64.b64encode(api_key.encode()).decode()

es_creds["api_key"] = api_key
elif source.fields.get("username") and source.fields.get("password"):
logger.debug("Found username and passowrd")
es_creds["username"] = source["username"]
es_creds["password"] = source["password"]
else:
msg = "Invalid Elasticsearch credentials"
raise ValueError(msg)

new_config = {
"elasticsearch": es_creds,
}
self.specific_config = new_config
assumed_configuration["elasticsearch"] = es_creds

if self.config_changed(assumed_configuration):
logger.debug("Changes detected for connectors-relevant configurations")
# This is a partial update.
# Agent can send different data in updates.
# For example, updating only log_level will not send credentials.
# Thus we don't overwrite configuration, we only update fields that
# were received
self.specific_config.update(assumed_configuration)
return True

logger.debug("No changes detected for connectors-relevant configurations")
return False

def config_changed(self, new_config):
"""See if configuration passed in new_config will update currently stored configuration
This method takes the new configuration received from the agent and see if there are any changes
to existing configuration.
If new_config contains new values for relevant fields, then True is returned, otherwise it returns False.
"""
# TODO: For now manually check, need to think of a better way?
# Not super proud of this function, but hey it's tested
logger.debug("Checking if config changed")
current_config = self._default_config.copy()
current_config.update(self.specific_config)

def _log_level_changed():
new_config_log_level = nested_get_from_dict(
new_config, ("service", "log_level")
)
current_config_log_level = nested_get_from_dict(
current_config, ("service", "log_level")
)

if new_config_log_level is None:
return False

return current_config_log_level != new_config_log_level

def _elasticsearch_config_changed():
return current_config.get("elasticsearch") != new_config.get(
"elasticsearch"
)

if _log_level_changed():
logger.debug("log_level changed")
return True

if _elasticsearch_config_changed():
logger.debug("elasticsearch changed")
return True

return False
Expand All @@ -112,8 +176,8 @@ def get(self):
"""
# First take "default config"
config = self._default_config.copy()
# Then override with what we get from Agent
config.update(self.specific_config)
# Then merge with what we get from Agent
config = dict(add_defaults(self.specific_config, default_config=config))
# Then merge with default connectors config
configuration = dict(add_defaults(config))

Expand Down
29 changes: 29 additions & 0 deletions connectors/agent/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
import logging

import ecs_logging

root_logger = logging.getLogger("agent_component")
handler = logging.StreamHandler()
handler.setFormatter(ecs_logging.StdlibFormatter())
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)


def get_logger(module):
logger = root_logger.getChild(module)

if logger.hasHandlers():
return logger

logger.addHandler(handler)

return logger


def update_logger_level(log_level):
root_logger.setLevel(log_level)
10 changes: 6 additions & 4 deletions connectors/agent/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#

from elastic_agent_client.generated import elastic_agent_client_pb2 as proto
from elastic_agent_client.handler.action import BaseActionHandler
from elastic_agent_client.handler.checkin import BaseCheckinHandler
from elastic_agent_client.util.logger import logger

from connectors.agent.logger import get_logger

logger = get_logger("protocol")


class ConnectorActionHandler(BaseActionHandler):
Expand Down Expand Up @@ -64,12 +68,10 @@ async def apply_from_client(self):
for unit in self.client.units
if unit.unit_type == proto.UnitType.OUTPUT
]

if len(outputs) > 0 and outputs[0].config:
logger.debug("Outputs were found")
source = outputs[0].config.source

changed = self.agent_connectors_config_wrapper.try_update(source)
changed = self.agent_connectors_config_wrapper.try_update(outputs[0])
if changed:
logger.info("Updating connector service manager config")
self.service_manager.restart()
Expand Down
34 changes: 21 additions & 13 deletions connectors/agent/service_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
from connectors.logger import DocumentLogger
import logging

import connectors.agent.logger
import connectors.logger
from connectors.agent.logger import get_logger
from connectors.services.base import (
ServiceAlreadyRunningError,
get_services,
)
from connectors.utils import CancellableSleeps

logger = get_logger("service_manager")


class ConnectorServiceManager:
"""Class responsible for properly configuring and running Connectors Service in Elastic Agent
Expand All @@ -22,8 +28,6 @@ class ConnectorServiceManager:
"""

name = "connector-service-manager"

def __init__(self, configuration):
"""Inits ConnectorServiceManager with shared ConnectorsAgentConfigurationWrapper.
Expand All @@ -32,10 +36,6 @@ def __init__(self, configuration):
There is nothing enforcing it, but expect problems if that happens.
"""
service_name = self.__class__.name
self._logger = DocumentLogger(
f"[{service_name}]", {"service_name": service_name}
)
self._agent_config = configuration
self._multi_service = None
self._running = False
Expand All @@ -59,26 +59,34 @@ async def run(self):
try:
while self._running:
try:
self._logger.info("Starting connector services")
logger.info("Starting connector services")
config = self._agent_config.get()
self._multi_service = get_services(
["schedule", "sync_content", "sync_access_control", "cleanup"],
self._agent_config.get(),
config,
)
log_level = config.get("service", {}).get(
"log_level", logging.INFO
) # Log Level for connectors is managed like this
connectors.logger.set_logger(log_level, filebeat=True)
# Log Level for agent connectors component itself
connectors.agent.logger.update_logger_level(log_level)

await self._multi_service.run()
except Exception as e:
self._logger.exception(
logger.exception(
f"Error while running services in ConnectorServiceManager: {e}"
)
raise
finally:
self._logger.info("Finished running, exiting")
logger.info("Finished running, exiting")

def stop(self):
"""Stop the service manager and all running subservices.
Running stop attempts to gracefully shutdown all subservices currently running.
"""
self._logger.info("Stopping connector services.")
logger.info("Stopping connector services.")
self._running = False
self._done = True
if self._multi_service:
Expand All @@ -91,6 +99,6 @@ def restart(self):
After services are gracefully stopped, they will be started again with fresh configuration
that comes from ConnectorsAgentConfigurationWrapper.
"""
self._logger.info("Restarting connector services")
logger.info("Restarting connector services")
if self._multi_service:
self._multi_service.shutdown(None)
6 changes: 4 additions & 2 deletions connectors/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ def load_config(config_file):
return configuration


def add_defaults(config):
configuration = dict(_merge_dicts(_default_config(), config))
def add_defaults(config, default_config=None):
if default_config is None:
default_config = _default_config()
configuration = dict(_merge_dicts(default_config, config))
return configuration


Expand Down
Loading

0 comments on commit 6e74c3a

Please sign in to comment.