From 3a70f10a87396af78b5b5b8ff78d7829696b3fc1 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 23 Jul 2024 23:31:23 +0000 Subject: [PATCH] rebased to latest Signed-off-by: Krishna Kondaka --- .../plugins/sink/opensearch/OpenSearchClientRefresher.java | 7 +++++-- .../plugins/sink/opensearch/OpenSearchSink.java | 7 ++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java index 6859dee8be..b0cd84ccef 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java @@ -30,7 +30,7 @@ public OpenSearchClientRefresher(final PluginMetrics pluginMetrics, final Function clientFunction) { this.clientFunction = clientFunction; this.currentConfig = connectionConfiguration; - this.currentClient = clientFunction.apply(connectionConfiguration); + this.currentClient = null; credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED); clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS); } @@ -44,6 +44,9 @@ public Class getComponentClass() { public OpenSearchClient get() { readWriteLock.readLock().lock(); try { + if (currentClient == null) { + currentClient = clientFunction.apply(currentConfig); + } return currentClient; } finally { readWriteLock.readLock().unlock(); @@ -57,8 +60,8 @@ public void update(PluginSetting pluginSetting) { credentialsChangeCounter.increment(); readWriteLock.writeLock().lock(); try { - currentClient = clientFunction.apply(newConfig); currentConfig = newConfig; + currentClient = clientFunction.apply(currentConfig); } catch (Exception e) { clientRefreshErrorsCounter.increment(); LOG.error("Refreshing {} failed.", getComponentClass(), e); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 199b4e1e0e..1b6f44fde6 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -186,6 +186,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.bulkRequestMap = new ConcurrentHashMap<>(); this.lastFlushTimeMap = new ConcurrentHashMap<>(); this.pluginConfigObservable = pluginConfigObservable; + this.objectMapper = new ObjectMapper(); final Optional dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq(); if (dlqConfig.isPresent()) { @@ -201,7 +202,7 @@ public void doInitialize() { doInitializeInternal(); } catch (IOException e) { LOG.warn("Failed to initialize OpenSearch sink, retrying: {} ", e.getMessage()); - closeFiles(); + this.shutdown(); } catch (InvalidPluginConfigurationException e) { LOG.error("Failed to initialize OpenSearch sink due to a configuration error.", e); this.shutdown(); @@ -212,7 +213,7 @@ public void doInitialize() { throw e; } catch (Exception e) { LOG.warn("Failed to initialize OpenSearch sink with a retryable exception. ", e); - closeFiles(); + this.shutdown(); } } @@ -279,7 +280,6 @@ private void doInitializeInternal() throws IOException { bulkRequestSupplier, pluginSetting); - objectMapper = new ObjectMapper(); this.initialized = true; LOG.info("Initialized OpenSearch sink"); } @@ -615,6 +615,7 @@ private void closeFiles() { public void shutdown() { super.shutdown(); closeFiles(); + openSearchClient.shutdown(); } private void maybeUpdateServerlessNetworkPolicy() {