Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[caclmgrd] Prevent unnecessary iptables updates #5312

Merged
merged 6 commits into from
Oct 19, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 92 additions & 10 deletions files/image_config/caclmgrd/caclmgrd
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ try:
import os
import subprocess
import sys
import threading
import time

from sonic_py_common import daemon_base, device_info
from swsscommon import swsscommon
Expand All @@ -26,6 +28,8 @@ VERSION = "1.0"

SYSLOG_IDENTIFIER = "caclmgrd"

DEFAULT_NAMESPACE = ''


# ========================== Helper Functions =========================

Expand Down Expand Up @@ -75,21 +79,38 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
}
}

UPDATE_DELAY_SECS = 0.5

def __init__(self, log_identifier):
super(ControlPlaneAclManager, self).__init__(log_identifier)

# Update-thread-specific data per namespace
self.update_thread = {}
self.lock = {}
self.num_changes = {}

# Initialize update-thread-specific data for default namespace
self.update_thread[DEFAULT_NAMESPACE] = None
self.lock[DEFAULT_NAMESPACE] = threading.Lock()
self.num_changes[DEFAULT_NAMESPACE] = 0

SonicDBConfig.load_sonic_global_db_config()
self.config_db_map = {}
self.iptables_cmd_ns_prefix = {}
self.config_db_map[''] = ConfigDBConnector(use_unix_socket_path=True, namespace='')
self.config_db_map[''].connect()
self.iptables_cmd_ns_prefix[''] = ""
self.namespace_mgmt_ip = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[''], '')
self.namespace_mgmt_ipv6 = self.get_namespace_mgmt_ipv6(self.iptables_cmd_ns_prefix[''], '')
self.config_db_map[DEFAULT_NAMESPACE] = ConfigDBConnector(use_unix_socket_path=True, namespace=DEFAULT_NAMESPACE)
self.config_db_map[DEFAULT_NAMESPACE].connect()
self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE] = ""
self.namespace_mgmt_ip = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE], DEFAULT_NAMESPACE)
self.namespace_mgmt_ipv6 = self.get_namespace_mgmt_ipv6(self.iptables_cmd_ns_prefix[DEFAULT_NAMESPACE], DEFAULT_NAMESPACE)
self.namespace_docker_mgmt_ip = {}
self.namespace_docker_mgmt_ipv6 = {}

namespaces = device_info.get_all_namespaces()
for front_asic_namespace in namespaces['front_ns']:
self.update_thread[front_asic_namespace] = None
self.lock[front_asic_namespace] = threading.Lock()
self.num_changes[front_asic_namespace] = 0

self.config_db_map[front_asic_namespace] = ConfigDBConnector(use_unix_socket_path=True, namespace=front_asic_namespace)
self.config_db_map[front_asic_namespace].connect()
self.iptables_cmd_ns_prefix[front_asic_namespace] = "ip netns exec " + front_asic_namespace + " "
Expand All @@ -99,6 +120,10 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
front_asic_namespace)

for back_asic_namespace in namespaces['back_ns']:
self.update_thread[back_asic_namespace] = None
self.lock[back_asic_namespace] = threading.Lock()
self.num_changes[back_asic_namespace] = 0

self.iptables_cmd_ns_prefix[back_asic_namespace] = "ip netns exec " + back_asic_namespace + " "
self.namespace_docker_mgmt_ip[back_asic_namespace] = self.get_namespace_mgmt_ip(self.iptables_cmd_ns_prefix[back_asic_namespace],
back_asic_namespace)
Expand Down Expand Up @@ -495,9 +520,44 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):

self.run_commands(iptables_cmds)

def check_and_update_control_plane_acls(self, namespace, num_changes):
"""
This function is intended to be spawned in a separate thread.
Its purpose is to prevent unnecessary iptables updates if we receive
multiple rapid ACL table update notifications. It sleeps for UPDATE_DELAY_SECS
then checks if any more ACL table updates were received in that window. If new
updates were received, it will sleep again and repeat the process until no
updates were received during the delay window, at which point it will update
iptables using the current ACL rules.
"""
while True:
# Sleep for our delay interval
time.sleep(self.UPDATE_DELAY_SECS)

with self.lock[namespace]:
if self.num_changes[namespace] > num_changes:
# More ACL table changes occurred since this thread was spawned
# spawn a new thread with the current number of changes
new_changes = self.num_changes[namespace] - num_changes
self.log_info("ACL config not stable for namespace '{}': {} changes detected in the past {} seconds. Skipping update ..."
.format(namespace, new_changes, self.UPDATE_DELAY_SECS))
num_changes = self.num_changes[namespace]
else:
if num_changes == self.num_changes[namespace] and num_changes > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jleveque do we need num_changes > 0 check ?

Copy link
Contributor Author

@jleveque jleveque Oct 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's basically a safeguard, in case num_changes == 0 and self.num_changes[namespace] == 0. We shouldn't get here in this situation, but if we do we will hit the else case and log an error message. It could be helpful for debugging.

self.log_info("ACL config for namespace '{}' has not changed for {} seconds. Applying updates ..."
.format(namespace, self.UPDATE_DELAY_SECS))
self.update_control_plane_acls(namespace)
else:
self.log_error("Error updating ACLs for namespace '{}'".format(namespace))

# Re-initialize
self.num_changes[namespace] = 0
self.update_thread[namespace] = None
return

def run(self):
# Select Time-out for 10 Seconds
SELECT_TIMEOUT_MS = 1000 * 10
# Set select timeout to 1 second
SELECT_TIMEOUT_MS = 1000

self.log_info("Starting up ...")

Expand All @@ -515,7 +575,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
# Map of Namespace <--> susbcriber table's object
config_db_subscriber_table_map = {}

# Loop through all asic namespaces (if present) and host (namespace='')
# Loop through all asic namespaces (if present) and host namespace (DEFAULT_NAMESPACE)
for namespace in self.config_db_map.keys():
# Unconditionally update control plane ACLs once at start on given namespace
self.update_control_plane_acls(namespace)
Expand All @@ -540,14 +600,23 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
# Loop on select to see if any event happen on config db of any namespace
while True:
ctrl_plane_acl_notification = set()

# A brief sleep appears necessary in this loop or any spawned
# update threads will get stuck. Appears to be due to the sel.select() call.
# TODO: Eliminate the need for this sleep.
time.sleep(0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jleveque Do we need this still ? Can we update SELECT_TIMEOUT_MS to take care of of this sleep ?

Copy link
Contributor Author

@jleveque jleveque Oct 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot explain the root cause of the need for this sleep. I wish I could. Adjusting SELECT_TIMEOUT_MS doesn't make a difference. It seems to be related to some internal Python context switching in conjunction with the sel.select() call. If we don't sleep a tiny bit here, any spawned update threads will not make progress. It's very strange. By commenting out the sel.select() call and mocking it up, the problem subsided, which is why I feel the sel.select() call is somehow triggering the problem. I was unable to find any similar complaints on the web. This will require further, deep investigation in order to eliminate this sleep.


(state, selectableObj) = sel.select(SELECT_TIMEOUT_MS)
# Continue if select is timeout or selectable object is not return
if state != swsscommon.Select.OBJECT:
continue
# Get the redisselect object from selectable object

# Get the redisselect object from selectable object
redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj)

# Get the corresponding namespace from redisselect db connector object
namespace = redisSelectObj.getDbConnector().getNamespace()

# Pop data of both Subscriber Table object of namespace that got config db acl table event
for table in config_db_subscriber_table_map[namespace]:
while True:
Expand All @@ -568,7 +637,20 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):

# Update the Control Plane ACL of the namespace that got config db acl table event
for namespace in ctrl_plane_acl_notification:
self.update_control_plane_acls(namespace)
with self.lock[namespace]:
if self.num_changes[namespace] == 0:
self.log_info("ACL change detected for namespace '{}'".format(namespace))

# Increment the number of change events we've received for this namespace
self.num_changes[namespace] += 1

# If an update thread is not already spawned for the namespace which we received
# the ACL table update event, spawn one now
if not self.update_thread[namespace]:
self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace))
self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls,
args=(namespace, self.num_changes[namespace]))
self.update_thread[namespace].start()

# ============================= Functions =============================

Expand Down