From ffbe5fdce405d842a07600da9ea99306e4ce6e3e Mon Sep 17 00:00:00 2001 From: Krishna Kondaka <41027584+kkondaka@users.noreply.github.com> Date: Tue, 30 Jul 2024 13:33:51 -0700 Subject: [PATCH] Cleanup resources properly when Opensearch sink fails to initialize (#4758) * dplive1.yaml Signed-off-by: Krishna Kondaka * rebased to latest Signed-off-by: Krishna Kondaka * removed unnecessary file Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka Signed-off-by: Krishna Kondaka --- .../sink/opensearch/OpenSearchClientRefresher.java | 5 ++++- .../plugins/sink/opensearch/OpenSearchSink.java | 7 ++++--- .../sink/opensearch/OpenSearchClientRefresherTest.java | 9 ++++++++- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java index 6859dee8be..b697fb26bf 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java @@ -30,7 +30,7 @@ public OpenSearchClientRefresher(final PluginMetrics pluginMetrics, final Function clientFunction) { this.clientFunction = clientFunction; this.currentConfig = connectionConfiguration; - this.currentClient = clientFunction.apply(connectionConfiguration); + this.currentClient = null; credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED); clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS); } @@ -44,6 +44,9 @@ public Class getComponentClass() { public OpenSearchClient get() { readWriteLock.readLock().lock(); try { + if (currentClient == null) { + currentClient = clientFunction.apply(currentConfig); + } return currentClient; } finally { readWriteLock.readLock().unlock(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 199b4e1e0e..1b6f44fde6 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -186,6 +186,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.bulkRequestMap = new ConcurrentHashMap<>(); this.lastFlushTimeMap = new ConcurrentHashMap<>(); this.pluginConfigObservable = pluginConfigObservable; + this.objectMapper = new ObjectMapper(); final Optional dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq(); if (dlqConfig.isPresent()) { @@ -201,7 +202,7 @@ public void doInitialize() { doInitializeInternal(); } catch (IOException e) { LOG.warn("Failed to initialize OpenSearch sink, retrying: {} ", e.getMessage()); - closeFiles(); + this.shutdown(); } catch (InvalidPluginConfigurationException e) { LOG.error("Failed to initialize OpenSearch sink due to a configuration error.", e); this.shutdown(); @@ -212,7 +213,7 @@ public void doInitialize() { throw e; } catch (Exception e) { LOG.warn("Failed to initialize OpenSearch sink with a retryable exception. ", e); - closeFiles(); + this.shutdown(); } } @@ -279,7 +280,6 @@ private void doInitializeInternal() throws IOException { bulkRequestSupplier, pluginSetting); - objectMapper = new ObjectMapper(); this.initialized = true; LOG.info("Initialized OpenSearch sink"); } @@ -615,6 +615,7 @@ private void closeFiles() { public void shutdown() { super.shutdown(); closeFiles(); + openSearchClient.shutdown(); } private void maybeUpdateServerlessNetworkPolicy() { diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java index b9326c606f..584051dff6 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java @@ -71,6 +71,7 @@ void testGet() { @Test void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() { final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); when(connectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD); @@ -91,6 +92,7 @@ void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() { @Test void testGetAfterUpdateWithBasicAuthUnchanged() { final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig); when(authConfig.getUsername()).thenReturn(TEST_USERNAME); @@ -115,6 +117,7 @@ void testGetAfterUpdateWithBasicAuthUnchanged() { void testGetAfterUpdateWithDeprecatedUsernameChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); @@ -138,6 +141,7 @@ void testGetAfterUpdateWithDeprecatedUsernameChanged() { void testGetAfterUpdateWithUsernameChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig); @@ -165,6 +169,7 @@ void testGetAfterUpdateWithUsernameChanged() { void testGetAfterUpdateWithDeprecatedPasswordChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); @@ -190,6 +195,7 @@ void testGetAfterUpdateWithDeprecatedPasswordChanged() { void testGetAfterUpdateWithPasswordChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig); @@ -219,6 +225,7 @@ void testGetAfterUpdateClientFailure() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); when(pluginMetrics.counter(CLIENT_REFRESH_ERRORS)).thenReturn(clientRefreshErrorsCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); @@ -240,4 +247,4 @@ void testGetAfterUpdateClientFailure() { verify(clientRefreshErrorsCounter).increment(); verify(clientFunction, times(2)).apply(any()); } -} \ No newline at end of file +}