From 457b0273d6ff1100373200e34c61a8d28c6f27b0 Mon Sep 17 00:00:00 2001 From: Joe LeVeque Date: Thu, 3 Sep 2020 19:11:02 +0000 Subject: [PATCH 1/6] [caclmgrd] Prevent unnecessary iptables updates --- files/image_config/caclmgrd/caclmgrd | 86 ++++++++++++++++++++++++++-- 1 file changed, 82 insertions(+), 4 deletions(-) diff --git a/files/image_config/caclmgrd/caclmgrd b/files/image_config/caclmgrd/caclmgrd index cee0d8f96ceb..c0a91086ef51 100755 --- a/files/image_config/caclmgrd/caclmgrd +++ b/files/image_config/caclmgrd/caclmgrd @@ -15,6 +15,8 @@ try: import os import subprocess import sys + import threading + import time from sonic_py_common import daemon_base, device_info from swsscommon import swsscommon @@ -75,9 +77,21 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): } } + UPDATE_DELAY_SECS = 0.5 + def __init__(self, log_identifier): super(ControlPlaneAclManager, self).__init__(log_identifier) + # Update-thread-specific data per namespace + self.update_thread = {} + self.lock = {} + self.num_changes = {} + + # Initialize update-thread-specific data for default namespace + self.update_thread[''] = None + self.lock[''] = threading.Lock() + self.num_changes[''] = 0 + SonicDBConfig.load_sonic_global_db_config() self.config_db_map = {} self.iptables_cmd_ns_prefix = {} @@ -88,8 +102,13 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.namespace_mgmt_ipv6 = self.get_namespace_mgmt_ipv6(self.iptables_cmd_ns_prefix[''], '') self.namespace_docker_mgmt_ip = {} self.namespace_docker_mgmt_ipv6 = {} + namespaces = device_info.get_all_namespaces() for front_asic_namespace in namespaces['front_ns']: + self.update_thread[front_asic_namespace] = None + self.lock[front_asic_namespace] = threading.Lock() + self.num_changes[front_asic_namespace] = 0 + self.config_db_map[front_asic_namespace] = ConfigDBConnector(use_unix_socket_path=True, namespace=front_asic_namespace) self.config_db_map[front_asic_namespace].connect() self.iptables_cmd_ns_prefix[front_asic_namespace] = "ip netns exec " + front_asic_namespace + " " @@ -99,6 +118,10 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): front_asic_namespace) for back_asic_namespace in namespaces['back_ns']: + self.update_thread[back_asic_namespace] = None + self.lock[back_asic_namespace] = threading.Lock() + self.num_changes[back_asic_namespace] = 0 + self.iptables_cmd_ns_prefix[back_asic_namespace] = "ip netns exec " + back_asic_namespace + " " self.namespace_docker_mgmt_ip[back_asic_namespace] = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[back_asic_namespace], back_asic_namespace) @@ -495,9 +518,44 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.run_commands(iptables_cmds) + def check_and_update_control_plane_acls(self, namespace, num_changes): + """ + This function is intended to be spawned in a separate thread with a delay. + Its purpose is to prevent unnecessary iptables updates if we receive + multiple rapid ACL table update notifications. It will refrain from + updating iptables if any newer updates were detected within the delay window + and spawn a new thread to check again. This process will repeat until there + have been no changes during the delay window, at which point it will update + iptables using the current ACL rules. + """ + while True: + # Sleep for our delay interval + time.sleep(self.UPDATE_DELAY_SECS) + + with self.lock[namespace]: + if self.num_changes[namespace] > num_changes: + # More ACL table changes occurred since this thread was spawned + # spawn a new thread with the current number of changes + new_changes = self.num_changes[namespace] - num_changes + log_info("ACL config not stable for namespace '{}': {} changes detected in the past {} seconds. Skipping update ..." + .format(namespace, new_changes, self.UPDATE_DELAY_SECS)) + num_changes = self.num_changes[namespace] + else: + if num_changes == self.num_changes[namespace] and num_changes > 0: + log_info("ACL config for namespace '{}' has not changed for {} seconds. Applying updates ..." + .format(namespace, self.UPDATE_DELAY_SECS)) + self.update_control_plane_acls(namespace) + else: + log_error("Error updating ACLs for namespace '{}'".format(namespace)) + + # Re-initialize + self.num_changes[namespace] = 0 + self.update_thread[namespace] = None + return + def run(self): - # Select Time-out for 10 Seconds - SELECT_TIMEOUT_MS = 1000 * 10 + # Set select timeout to 1 second + SELECT_TIMEOUT_MS = 1000 self.log_info("Starting up ...") @@ -540,14 +598,20 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # Loop on select to see if any event happen on config db of any namespace while True: ctrl_plane_acl_notification = set() + # update threads will get stuck + time.sleep(0.1) + (state, selectableObj) = sel.select(SELECT_TIMEOUT_MS) # Continue if select is timeout or selectable object is not return if state != swsscommon.Select.OBJECT: continue - # Get the redisselect object from selectable object + + # Get the redisselect object from selectable object redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) + # Get the corresponding namespace from redisselect db connector object namespace = redisSelectObj.getDbConnector().getNamespace() + # Pop data of both Subscriber Table object of namespace that got config db acl table event for table in config_db_subscriber_table_map[namespace]: while True: @@ -568,7 +632,21 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # Update the Control Plane ACL of the namespace that got config db acl table event for namespace in ctrl_plane_acl_notification: - self.update_control_plane_acls(namespace) + with self.lock[namespace]: + if self.num_changes[namespace] == 0: + log_info("ACL change detected for namespace '{}'".format(namespace)) + + # Increment the number of change events we've received for this namespace + self.num_changes[namespace] += 1 + + # If an update thread is not already spawned for the namespace which we received + # the ACL table update event, spawn one now + if not self.update_thread[namespace]: + log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) + self.update_thread[namespace] = threading.Timer(self.UPDATE_DELAY_SECS, + self.check_and_update_control_plane_acls, + [namespace, self.num_changes[namespace]]) + self.update_thread[namespace].start() # ============================= Functions ============================= From 80608381053bd3dffbd6c392248a8218861a1d72 Mon Sep 17 00:00:00 2001 From: Joe LeVeque Date: Fri, 4 Sep 2020 03:02:13 +0000 Subject: [PATCH 2/6] Use single Thread with sleep in loop instead of multiple Timer threads --- files/image_config/caclmgrd/caclmgrd | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/files/image_config/caclmgrd/caclmgrd b/files/image_config/caclmgrd/caclmgrd index c0a91086ef51..770d3140822e 100755 --- a/files/image_config/caclmgrd/caclmgrd +++ b/files/image_config/caclmgrd/caclmgrd @@ -520,7 +520,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): def check_and_update_control_plane_acls(self, namespace, num_changes): """ - This function is intended to be spawned in a separate thread with a delay. + This function is intended to be spawned in a separate thread. Its purpose is to prevent unnecessary iptables updates if we receive multiple rapid ACL table update notifications. It will refrain from updating iptables if any newer updates were detected within the delay window @@ -643,9 +643,8 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # the ACL table update event, spawn one now if not self.update_thread[namespace]: log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) - self.update_thread[namespace] = threading.Timer(self.UPDATE_DELAY_SECS, - self.check_and_update_control_plane_acls, - [namespace, self.num_changes[namespace]]) + self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, + args=(namespace, self.num_changes[namespace])) self.update_thread[namespace].start() # ============================= Functions ============================= From 8aa91bb05401fb49b47bdc76f75a45f74b720bb0 Mon Sep 17 00:00:00 2001 From: Joe LeVeque Date: Fri, 4 Sep 2020 03:33:37 +0000 Subject: [PATCH 3/6] Enhance comment --- files/image_config/caclmgrd/caclmgrd | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/files/image_config/caclmgrd/caclmgrd b/files/image_config/caclmgrd/caclmgrd index 770d3140822e..49ed73c6738f 100755 --- a/files/image_config/caclmgrd/caclmgrd +++ b/files/image_config/caclmgrd/caclmgrd @@ -598,7 +598,10 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # Loop on select to see if any event happen on config db of any namespace while True: ctrl_plane_acl_notification = set() - # update threads will get stuck + + # A brief sleep appears necessary in this loop or any spawned + # update threads will get stuck. Appears to be due to the sel.select() call. + # TODO: Eliminate the need for this sleep. time.sleep(0.1) (state, selectableObj) = sel.select(SELECT_TIMEOUT_MS) From 15ef28b4e4bbadd26ab53d766d5206402cd291a3 Mon Sep 17 00:00:00 2001 From: Joe LeVeque Date: Thu, 15 Oct 2020 01:59:05 +0000 Subject: [PATCH 4/6] Add DEFAULT_NAMESPACE constant for readability --- files/image_config/caclmgrd/caclmgrd | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/files/image_config/caclmgrd/caclmgrd b/files/image_config/caclmgrd/caclmgrd index 49ed73c6738f..d61d1f2a68c9 100755 --- a/files/image_config/caclmgrd/caclmgrd +++ b/files/image_config/caclmgrd/caclmgrd @@ -28,6 +28,8 @@ VERSION = "1.0" SYSLOG_IDENTIFIER = "caclmgrd" +DEFAULT_NAMESPACE = '' + # ========================== Helper Functions ========================= @@ -88,18 +90,18 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.num_changes = {} # Initialize update-thread-specific data for default namespace - self.update_thread[''] = None - self.lock[''] = threading.Lock() - self.num_changes[''] = 0 + self.update_thread[DEFAULT_NAMESPACE] = None + self.lock[DEFAULT_NAMESPACE] = threading.Lock() + self.num_changes[DEFAULT_NAMESPACE] = 0 SonicDBConfig.load_sonic_global_db_config() self.config_db_map = {} self.iptables_cmd_ns_prefix = {} - self.config_db_map[''] = ConfigDBConnector(use_unix_socket_path=True, namespace='') - self.config_db_map[''].connect() - self.iptables_cmd_ns_prefix[''] = "" - self.namespace_mgmt_ip = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[''], '') - self.namespace_mgmt_ipv6 = self.get_namespace_mgmt_ipv6(self.iptables_cmd_ns_prefix[''], '') + self.config_db_map[DEFAULT_NAMESPACE] = ConfigDBConnector(use_unix_socket_path=True, namespace=DEFAULT_NAMESPACE) + self.config_db_map[DEFAULT_NAMESPACE].connect() + self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE] = "" + self.namespace_mgmt_ip = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE], DEFAULT_NAMESPACE) + self.namespace_mgmt_ipv6 = self.get_namespace_mgmt_ipv6(self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE], DEFAULT_NAMESPACE) self.namespace_docker_mgmt_ip = {} self.namespace_docker_mgmt_ipv6 = {} @@ -573,7 +575,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # Map of Namespace <--> susbcriber table's object config_db_subscriber_table_map = {} - # Loop through all asic namespaces (if present) and host (namespace='') + # Loop through all asic namespaces (if present) and host namespace (DEFAULT_NAMESPACE) for namespace in self.config_db_map.keys(): # Unconditionally update control plane ACLs once at start on given namespace self.update_control_plane_acls(namespace) From c484ee97a63a995b866b63193de041bd0ec99bbf Mon Sep 17 00:00:00 2001 From: Joe LeVeque Date: Thu, 15 Oct 2020 17:08:55 +0000 Subject: [PATCH 5/6] New logging syntax --- files/image_config/caclmgrd/caclmgrd | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/files/image_config/caclmgrd/caclmgrd b/files/image_config/caclmgrd/caclmgrd index d61d1f2a68c9..d126ee9010e5 100755 --- a/files/image_config/caclmgrd/caclmgrd +++ b/files/image_config/caclmgrd/caclmgrd @@ -539,16 +539,16 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # More ACL table changes occurred since this thread was spawned # spawn a new thread with the current number of changes new_changes = self.num_changes[namespace] - num_changes - log_info("ACL config not stable for namespace '{}': {} changes detected in the past {} seconds. Skipping update ..." + self.log_info("ACL config not stable for namespace '{}': {} changes detected in the past {} seconds. Skipping update ..." .format(namespace, new_changes, self.UPDATE_DELAY_SECS)) num_changes = self.num_changes[namespace] else: if num_changes == self.num_changes[namespace] and num_changes > 0: - log_info("ACL config for namespace '{}' has not changed for {} seconds. Applying updates ..." + self.log_info("ACL config for namespace '{}' has not changed for {} seconds. Applying updates ..." .format(namespace, self.UPDATE_DELAY_SECS)) self.update_control_plane_acls(namespace) else: - log_error("Error updating ACLs for namespace '{}'".format(namespace)) + self.log_error("Error updating ACLs for namespace '{}'".format(namespace)) # Re-initialize self.num_changes[namespace] = 0 @@ -639,7 +639,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): for namespace in ctrl_plane_acl_notification: with self.lock[namespace]: if self.num_changes[namespace] == 0: - log_info("ACL change detected for namespace '{}'".format(namespace)) + self.log_info("ACL change detected for namespace '{}'".format(namespace)) # Increment the number of change events we've received for this namespace self.num_changes[namespace] += 1 @@ -647,7 +647,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # If an update thread is not already spawned for the namespace which we received # the ACL table update event, spawn one now if not self.update_thread[namespace]: - log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) + self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, args=(namespace, self.num_changes[namespace])) self.update_thread[namespace].start() From afeffe20d2b71b13c323eb21859f4ce75eeb8e38 Mon Sep 17 00:00:00 2001 From: Joe LeVeque Date: Sat, 17 Oct 2020 01:38:57 +0000 Subject: [PATCH 6/6] Update docstring to reflect changed functionality --- files/image_config/caclmgrd/caclmgrd | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/files/image_config/caclmgrd/caclmgrd b/files/image_config/caclmgrd/caclmgrd index d126ee9010e5..c5144a6633f0 100755 --- a/files/image_config/caclmgrd/caclmgrd +++ b/files/image_config/caclmgrd/caclmgrd @@ -524,10 +524,10 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): """ This function is intended to be spawned in a separate thread. Its purpose is to prevent unnecessary iptables updates if we receive - multiple rapid ACL table update notifications. It will refrain from - updating iptables if any newer updates were detected within the delay window - and spawn a new thread to check again. This process will repeat until there - have been no changes during the delay window, at which point it will update + multiple rapid ACL table update notifications. It sleeps for UPDATE_DELAY_SECS + then checks if any more ACL table updates were received in that window. If new + updates were received, it will sleep again and repeat the process until no + updates were received during the delay window, at which point it will update iptables using the current ACL rules. """ while True: