Skip to content

Commit

Permalink
Cleanup resources properly when Opensearch sink fails to initialize (#…
Browse files Browse the repository at this point in the history
…4758)

* dplive1.yaml

Signed-off-by: Krishna Kondaka <[email protected]>

* rebased to latest

Signed-off-by: Krishna Kondaka <[email protected]>

* removed unnecessary file

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Jul 30, 2024
1 parent 944681f commit e66f9bc
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public OpenSearchClientRefresher(final PluginMetrics pluginMetrics,
final Function<ConnectionConfiguration, OpenSearchClient> clientFunction) {
this.clientFunction = clientFunction;
this.currentConfig = connectionConfiguration;
this.currentClient = clientFunction.apply(connectionConfiguration);
this.currentClient = null;
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS);
}
Expand All @@ -44,6 +44,9 @@ public Class<OpenSearchClient> getComponentClass() {
public OpenSearchClient get() {
readWriteLock.readLock().lock();
try {
if (currentClient == null) {
currentClient = clientFunction.apply(currentConfig);
}
return currentClient;
} finally {
readWriteLock.readLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.bulkRequestMap = new ConcurrentHashMap<>();
this.lastFlushTimeMap = new ConcurrentHashMap<>();
this.pluginConfigObservable = pluginConfigObservable;
this.objectMapper = new ObjectMapper();

final Optional<PluginModel> dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq();
if (dlqConfig.isPresent()) {
Expand All @@ -201,7 +202,7 @@ public void doInitialize() {
doInitializeInternal();
} catch (IOException e) {
LOG.warn("Failed to initialize OpenSearch sink, retrying: {} ", e.getMessage());
closeFiles();
this.shutdown();
} catch (InvalidPluginConfigurationException e) {
LOG.error("Failed to initialize OpenSearch sink due to a configuration error.", e);
this.shutdown();
Expand All @@ -212,7 +213,7 @@ public void doInitialize() {
throw e;
} catch (Exception e) {
LOG.warn("Failed to initialize OpenSearch sink with a retryable exception. ", e);
closeFiles();
this.shutdown();
}
}

Expand Down Expand Up @@ -279,7 +280,6 @@ private void doInitializeInternal() throws IOException {
bulkRequestSupplier,
pluginSetting);

objectMapper = new ObjectMapper();
this.initialized = true;
LOG.info("Initialized OpenSearch sink");
}
Expand Down Expand Up @@ -615,6 +615,7 @@ private void closeFiles() {
public void shutdown() {
super.shutdown();
closeFiles();
openSearchClient.shutdown();
}

private void maybeUpdateServerlessNetworkPolicy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ void testGet() {
@Test
void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() {
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME);
when(connectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD);
Expand All @@ -91,6 +92,7 @@ void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() {
@Test
void testGetAfterUpdateWithBasicAuthUnchanged() {
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
when(authConfig.getUsername()).thenReturn(TEST_USERNAME);
Expand All @@ -115,6 +117,7 @@ void testGetAfterUpdateWithBasicAuthUnchanged() {
void testGetAfterUpdateWithDeprecatedUsernameChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME);
Expand All @@ -138,6 +141,7 @@ void testGetAfterUpdateWithDeprecatedUsernameChanged() {
void testGetAfterUpdateWithUsernameChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
Expand Down Expand Up @@ -165,6 +169,7 @@ void testGetAfterUpdateWithUsernameChanged() {
void testGetAfterUpdateWithDeprecatedPasswordChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME);
Expand All @@ -190,6 +195,7 @@ void testGetAfterUpdateWithDeprecatedPasswordChanged() {
void testGetAfterUpdateWithPasswordChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
Expand Down Expand Up @@ -219,6 +225,7 @@ void testGetAfterUpdateClientFailure() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
when(pluginMetrics.counter(CLIENT_REFRESH_ERRORS)).thenReturn(clientRefreshErrorsCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME);
Expand All @@ -240,4 +247,4 @@ void testGetAfterUpdateClientFailure() {
verify(clientRefreshErrorsCounter).increment();
verify(clientFunction, times(2)).apply(any());
}
}
}

0 comments on commit e66f9bc

Please sign in to comment.