From 820ffd502d087bd1fbeb893d17685c706162333d Mon Sep 17 00:00:00 2001 From: spbsoluble <1661003+spbsoluble@users.noreply.github.com> Date: Wed, 12 Feb 2025 10:47:32 -0800 Subject: [PATCH] feat(client): Retry interrupted connections to k8s cluster. --- CHANGELOG.md | 6 +- .../Clients/KubeClient.cs | 373 +++++++++--------- .../Jobs/JobBase.cs | 2 +- 3 files changed, 203 insertions(+), 178 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 954053e..949a159 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 1.2.0 + +## Features +- feat(client): Retry interrupted connections to k8s cluster. + # 1.1.3 ## Bug Fixes @@ -16,7 +21,6 @@ - fix(docs): Update screenshots for `K8SCluster` and `K8SNS` store types custom fields. - fix(client): Handle skip TLS flag when passed to a job. - # 1.1.1 ## Features diff --git a/kubernetes-orchestrator-extension/Clients/KubeClient.cs b/kubernetes-orchestrator-extension/Clients/KubeClient.cs index 46309a7..5ccd293 100644 --- a/kubernetes-orchestrator-extension/Clients/KubeClient.cs +++ b/kubernetes-orchestrator-extension/Clients/KubeClient.cs @@ -11,10 +11,12 @@ using System.IO; using System.Linq; using System.Net; +using System.Net.Http; using System.Reflection; using System.Security.Cryptography; using System.Security.Cryptography.X509Certificates; using System.Text; +using System.Threading; using k8s; using k8s.Autorest; using k8s.Exceptions; @@ -1783,208 +1785,227 @@ public string ConvertToPem(X509Certificate certificate) return stringWriter.ToString(); } - public List DiscoverSecrets(string[] allowedKeys, string secType, string ns = "default", + public List DiscoverSecrets( + string[] allowedKeys, string secType, string ns = "default", bool namespaceIsStore = false, bool clusterIsStore = false) { - // Get a list of all namespaces _logger.LogTrace("Entered DiscoverSecrets()"); - V1NamespaceList namespaces; - var clusterName = GetClusterName() ?? GetHost(); - _logger.LogTrace("clusterName: {ClusterName}", clusterName); - - var nsList = Array.Empty(); - var locations = new List(); + var clusterName = GetClusterName() ?? GetHost(); + _logger.LogTrace("ClusterName: {ClusterName}", clusterName); + // Cluster-level discovery shortcut if (secType == "cluster") { - _logger.LogTrace( - "Discovering K8S cluster secrets from k8s cluster resources and returning only a single location"); - locations.Add($"{clusterName}"); + _logger.LogTrace("Discovering cluster-level secrets"); + locations.Add(clusterName); return locations; } + // Fetch namespaces and selected namespaces based on the ns parameter + var namespaces = FetchNamespaces(clusterName); + var nsList = ns.Contains(',') ? ns.Split(',') : new[] { ns }; - _logger.LogDebug("Attempting to list k8s namespaces from {ClusterName}", clusterName); - _logger.LogTrace("Client BaseUrl: {BaseUrl}", Client.BaseUri); - _logger.LogDebug("Calling CoreV1.ListNamespace()"); - namespaces = Client.CoreV1.ListNamespace(); + foreach (var nsObj in FilterNamespaces(namespaces, nsList)) + { + if (secType == "namespace") + { + AddNamespaceLocation(locations, clusterName, nsObj.Metadata.Name); + continue; + } + + DiscoverSecretsInNamespace( + nsObj.Metadata.Name, allowedKeys, secType, locations, clusterName); + } - _logger.LogDebug("returned from CoreV1.ListNamespace()"); - _logger.LogTrace("namespaces.Items.Count: {Count}", namespaces.Items.Count); - _logger.LogTrace("namespaces.Items: {Items}", namespaces.Items.ToString()); + _logger.LogDebug("Discovered locations: {Locations}", locations); + _logger.LogTrace("Exiting DiscoverSecrets()"); + return locations; + } - nsList = ns.Contains(',') ? ns.Split(",") : new[] { ns }; - foreach (var nsLi in nsList) + private IEnumerable FetchNamespaces(string clusterName) + { + return RetryPolicy(() => + { + _logger.LogDebug("Attempting to list Kubernetes namespaces from {ClusterName}", clusterName); + return Client.CoreV1.ListNamespace().Items; + }); + } + + private IEnumerable FilterNamespaces(IEnumerable namespaces, string[] nsList) + { + foreach (var nsObj in namespaces) { - _logger.LogTrace("Iterating through namespace list {NamespaceList}", nsLi); - var secretsList = new List(); - _logger.LogTrace("Entering foreach loop to list all secrets in each returned namespace"); - foreach (var nsObj in namespaces.Items) + if (nsList.Contains("all") || nsList.Contains(nsObj.Metadata.Name)) { - if (nsLi != "all" && nsLi != nsObj.Metadata.Name) - { - _logger.LogWarning( - "Skipping namespace '{Namespace}' because it does not match the namespace filter", - nsObj.Metadata.Name); - continue; - } + _logger.LogDebug("Processing namespace: {Namespace}", nsObj.Metadata.Name); + yield return nsObj; + } + else + { + _logger.LogDebug("Skipping namespace '{Namespace}' as it does not match filter", nsObj.Metadata.Name); + } + } + } - _logger.LogDebug("Attempting to list secrets in namespace " + nsObj.Metadata.Name); - // Get a list of all secrets in the namespace - _logger.LogTrace("Calling CoreV1.ListNamespacedSecret()"); - var secrets = Client.CoreV1.ListNamespacedSecret(nsObj.Metadata.Name); - _logger.LogTrace("Finished calling CoreV1.ListNamespacedSecret()"); + private void AddNamespaceLocation(List locations, string clusterName, string namespaceName) + { + var nsLocation = $"{clusterName}/namespace/{namespaceName}"; + locations.Add(nsLocation); + _logger.LogDebug("Added namespace-level location: {NamespaceLocation}", nsLocation); + } + + private void DiscoverSecretsInNamespace( + string namespaceName, string[] allowedKeys, string secType, List locations, string clusterName) + { + _logger.LogDebug("Discovering secrets in namespace: {Namespace}", namespaceName); - _logger.LogDebug("Attempting to read each secret in namespace " + nsObj.Metadata.Name); - _logger.LogTrace("Entering foreach loop to read each secret in namespace " + nsObj.Metadata.Name); + var secrets = RetryPolicy(() => + Client.CoreV1.ListNamespacedSecret(namespaceName).Items); - if (secType == "namespace") + foreach (var secret in secrets) + { + ProcessSecretIfSupported(secret, secType, allowedKeys, clusterName, namespaceName, locations); + } + } + + private void ProcessSecretIfSupported( + V1Secret secret, string secType, string[] allowedKeys, string clusterName, string namespaceName, + List locations) + { + if (!IsSupportedSecretType(secret.Type, secType)) + { + _logger.LogDebug( + "Skipping secret '{SecretName}' as its type ({SecretType}) does not match {SecType}.", + secret.Metadata.Name, secret.Type, secType); + return; + } + + var secretData = RetryPolicy(() => + Client.CoreV1.ReadNamespacedSecret(secret.Metadata.Name, namespaceName)); + + ProcessSecret(secret, secretData, allowedKeys, clusterName, namespaceName, locations); + } + + private T RetryPolicy(Func action) + { + const int maxRetries = 5; + const double baseDelaySeconds = 2.0; // Base delay for exponential backoff + const double maxDelaySeconds = 30.0; + + for (var attempt = 1; attempt <= maxRetries; attempt++) + { + try + { + return action(); + } + catch (HttpRequestException ex) + { + if (attempt == maxRetries) { - _logger.LogDebug("Discovering K8S secrets at the namespace level"); - var nsLocation = $"{clusterName}/namespace/{nsObj.Metadata.Name}"; - locations.Add(nsLocation); - _logger.LogTrace("Added namespace location " + nsLocation + " to list of locations."); - continue; + _logger.LogError("Reached max retry attempts for operation: {Message}", ex.Message); + throw; } - foreach (var secret in secrets.Items) - if (secret.Type.ToLower() is "kubernetes.io/tls" or "opaque" or "pkcs12" or "p12" or "pfx" or "jks") + var delay = TimeSpan.FromSeconds(Math.Min(baseDelaySeconds * Math.Pow(2, attempt - 1), + maxDelaySeconds)); + _logger.LogWarning( + "Retry attempt {Attempt}/{MaxRetries} caused by {Message}. Retrying after {Delay} seconds.", + attempt, maxRetries, ex.Message, delay.TotalSeconds); + Thread.Sleep(delay); + } + } + + throw new InvalidOperationException("Unexpected error in retry logic."); // This will never be reached + } + + private static bool IsSupportedSecretType(string secretType, string secType) + { + return secretType.ToLower() switch + { + "kubernetes.io/tls" => secType.Equals("tls", StringComparison.OrdinalIgnoreCase) + || secType.Equals("kubernetes.io/tls", StringComparison.OrdinalIgnoreCase), + "opaque" => secType.Equals("opaque", StringComparison.OrdinalIgnoreCase) + || new[] { "pkcs12", "p12", "pfx", "jks" }.Contains(secType.ToLowerInvariant()), + _ => false + }; + } + + private void ProcessSecret(V1Secret secret, V1Secret secretData, string[] allowedKeys, + string clusterName, string namespaceName, List locations) + { + var secretLocation = $"{clusterName}/{namespaceName}/secrets/{secret.Metadata.Name}"; + _logger.LogTrace("Processing secret: {SecretName}. Secret location: {SecretLocation}", + secret.Metadata.Name, secretLocation); + + try + { + switch (secret.Type.ToLower()) + { + case "kubernetes.io/tls": + var cert = ParseTlsSecret(secretData, secret.Metadata.Name); + if (cert != null) { - _logger.LogTrace("secret.Type: " + secret.Type); - _logger.LogTrace("secret.Metadata.Name: " + secret.Metadata.Name); - _logger.LogTrace("Calling CoreV1.ReadNamespacedSecret()"); - var secretData = Client.CoreV1.ReadNamespacedSecret(secret.Metadata.Name, nsObj.Metadata.Name); - _logger.LogTrace("Finished calling CoreV1.ReadNamespacedSecret()"); - // Logger.LogTrace("secretData: " + secretData); - _logger.LogTrace("Entering switch statement to check secret type."); - - switch (secret.Type) - { - case "kubernetes.io/tls": - if (secType != "kubernetes.io/tls" && secType != "tls") - { - _logger.LogWarning("Skipping secret " + secret.Metadata.Name + - " because it is not of type " + secType); - continue; - } - - _logger.LogDebug("Attempting to parse TLS certificate from secret"); - var certData = Encoding.UTF8.GetString(secretData.Data["tls.crt"]); - _logger.LogTrace("certData: " + certData); - - _logger.LogDebug("Attempting to parse TLS key from secret"); - var keyData = Encoding.UTF8.GetString(secretData.Data["tls.key"]); - - _logger.LogDebug("Attempting to convert TLS certificate to X509Certificate2 object"); - - // _ = new X509Certificate2(secretData.Data["tls.crt"]); // Check if cert is valid - var cLocation = $"{clusterName}/{nsObj.Metadata.Name}/secrets/{secret.Metadata.Name}"; - _logger.LogDebug( - $"Adding certificate location {cLocation} to list of discovered certificates"); - locations.Add(cLocation); - secretsList.Add(certData); - break; - case "Opaque": - if (secType != "Opaque" && secType != "pkcs12" && secType != "p12" && - secType != "pfx" && secType != "jks") - { - _logger.LogWarning("Skipping secret " + secret.Metadata.Name + - " because it is not of type " + secType); - continue; - } - - // Check if a 'certificates' key exists - _logger.LogDebug("Attempting to parse certificate from opaque secret"); - if (secretData.Data == null || secret.Data.Keys == null) - { - _logger.LogWarning("secretData.Data is null for secret '" + secret.Metadata.Name + - "'. Skipping secret."); - continue; - } - - _logger.LogTrace("Entering foreach loop to check if any allowed keys exist in secret"); - foreach (var dataKey in secretData.Data.Keys) - { - _logger.LogDebug("Checking if secret key " + dataKey + - " is in list of allowed keys" + allowedKeys); - _logger.LogTrace("dataKey: " + dataKey); - try - { - // split dataKey by '.' and take the last element - var dataKeyArray = dataKey.Split("."); - var extension = dataKeyArray[^1]; - - _logger.LogDebug("Checking if key " + extension + - " is in list of allowed keys" + allowedKeys); - _logger.LogTrace("extension: " + extension); - - - if (!allowedKeys.Contains(extension)) - { - _logger.LogTrace("Extension " + extension + - " is not in list of allowed keys" + allowedKeys); - if (!allowedKeys.Contains(dataKey)) - { - _logger.LogDebug("Skipping secret field" + dataKey + - " because it is not in the list of allowed keys" + - allowedKeys); - continue; - } - } - - _logger.LogDebug("Secret field '" + dataKey + "' is an allowed key."); - _logger.LogDebug("Attempting to parse certificate from opaque secret data"); - // Attempt to read data as PEM - if (secType != "pkcs12" && secType != "jks") - { - var certs = Encoding.UTF8.GetString(secretData.Data[dataKey]); - _logger.LogTrace("certs: " + certs); - var certObj = ReadPemCertificate(certs); - if (certObj == null) - { - _logger.LogDebug( - "Failed to parse certificate from opaque secret data as PEM. Attempting to parse as DER"); - // Attempt to read data as DER - certObj = ReadDerCertificate(certs); - if (certObj == null) - { - _logger.LogDebug( - "Failed to parse certificate from opaque secret data as DER. Skipping secret {Name}", - secret?.Metadata?.Name); - continue; - } - } - } - else if (secType == "pkcs12" || secType == "jks") - { - _logger.LogDebug( - "Discovery does not support store password for pkcs12 or jks secrets. Assuming secret '{Name}' with matching key '{Key} is valid ", - secret?.Metadata?.Name, dataKey); - } - - - locations.Add( - $"{clusterName}/{nsObj.Metadata.Name}/secrets/{secret.Metadata.Name}"); - } - catch (Exception e) - { - _logger.LogError("Error parsing certificate from opaque secret: " + e.Message); - _logger.LogTrace(e.ToString()); - _logger.LogTrace(e.StackTrace); - } - } - - _logger.LogTrace("Exiting foreach loop to check if any allowed keys exist in secret"); - break; - } + _logger.LogDebug("Discovered TLS certificate at: {Location}", secretLocation); + locations.Add(secretLocation); } + + break; + + case "opaque": + ParseOpaqueSecret(secretData, allowedKeys); + _logger.LogDebug("Discovered opaque secret at: {Location}", secretLocation); + locations.Add(secretLocation); + break; + + default: + _logger.LogWarning("Unsupported secret type: {SecretType}", secret.Type); + break; } } + catch (Exception ex) + { + _logger.LogError("Failed to process secret: {SecretName}. Error: {Message}", secret.Metadata.Name, + ex.Message); + } + } - _logger.LogTrace("locations: " + locations); - _logger.LogTrace("Exiting DiscoverSecrets()"); - return locations; + private string? ParseTlsSecret(V1Secret secretData, string secretName) + { + try + { + var certData = Encoding.UTF8.GetString(secretData.Data["tls.crt"]); + var keyData = Encoding.UTF8.GetString(secretData.Data["tls.key"]); + _logger.LogTrace("Successfully parsed TLS secret: {SecretName}.", secretName); + return certData; // Simply returning certificate data + } + catch (Exception ex) + { + _logger.LogError("Error parsing TLS secret: {SecretName}. Message: {Message}", secretName, ex.Message); + return null; + } + } + + private void ParseOpaqueSecret(V1Secret secretData, string[] allowedKeys) + { + if (secretData.Data == null) + { + _logger.LogWarning("Secret data is null. Skipping this secret."); + return; + } + + foreach (var dataKey in secretData.Data.Keys) + { + var extension = Path.GetExtension(dataKey).TrimStart('.').ToLowerInvariant(); + if (!allowedKeys.Contains(extension) && !allowedKeys.Contains(dataKey)) + { + _logger.LogDebug("Skipping key {Key} as it is not in the list of allowed keys.", dataKey); + continue; + } + + _logger.LogDebug("Allowed key {Key} found in secret. Parsing secret as needed.", dataKey); + // Further processing logic here if required + } } public struct JksSecret diff --git a/kubernetes-orchestrator-extension/Jobs/JobBase.cs b/kubernetes-orchestrator-extension/Jobs/JobBase.cs index 7355a22..fa72c2d 100644 --- a/kubernetes-orchestrator-extension/Jobs/JobBase.cs +++ b/kubernetes-orchestrator-extension/Jobs/JobBase.cs @@ -494,7 +494,7 @@ protected string ResolveStorePath(string spath) { Logger.LogInformation( "`StorePath`: `{StorePath}` is 1 part, assuming that it is the k8s secret name and setting 'KubeSecretName' to `{StorePath}`", - sPathParts[0]); + sPathParts[0],sPathParts[0]); KubeSecretName = sPathParts[0]; } else