Skip to content

Commit

Permalink
rebased to latest
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jul 23, 2024
1 parent 36a3e59 commit 3a70f10
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public OpenSearchClientRefresher(final PluginMetrics pluginMetrics,
final Function<ConnectionConfiguration, OpenSearchClient> clientFunction) {
this.clientFunction = clientFunction;
this.currentConfig = connectionConfiguration;
this.currentClient = clientFunction.apply(connectionConfiguration);
this.currentClient = null;
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS);
}
Expand All @@ -44,6 +44,9 @@ public Class<OpenSearchClient> getComponentClass() {
public OpenSearchClient get() {
readWriteLock.readLock().lock();
try {
if (currentClient == null) {
currentClient = clientFunction.apply(currentConfig);
}
return currentClient;
} finally {
readWriteLock.readLock().unlock();
Expand All @@ -57,8 +60,8 @@ public void update(PluginSetting pluginSetting) {
credentialsChangeCounter.increment();
readWriteLock.writeLock().lock();
try {
currentClient = clientFunction.apply(newConfig);
currentConfig = newConfig;
currentClient = clientFunction.apply(currentConfig);
} catch (Exception e) {
clientRefreshErrorsCounter.increment();
LOG.error("Refreshing {} failed.", getComponentClass(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.bulkRequestMap = new ConcurrentHashMap<>();
this.lastFlushTimeMap = new ConcurrentHashMap<>();
this.pluginConfigObservable = pluginConfigObservable;
this.objectMapper = new ObjectMapper();

final Optional<PluginModel> dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq();
if (dlqConfig.isPresent()) {
Expand All @@ -201,7 +202,7 @@ public void doInitialize() {
doInitializeInternal();
} catch (IOException e) {
LOG.warn("Failed to initialize OpenSearch sink, retrying: {} ", e.getMessage());
closeFiles();
this.shutdown();
} catch (InvalidPluginConfigurationException e) {
LOG.error("Failed to initialize OpenSearch sink due to a configuration error.", e);
this.shutdown();
Expand All @@ -212,7 +213,7 @@ public void doInitialize() {
throw e;
} catch (Exception e) {
LOG.warn("Failed to initialize OpenSearch sink with a retryable exception. ", e);
closeFiles();
this.shutdown();
}
}

Expand Down Expand Up @@ -279,7 +280,6 @@ private void doInitializeInternal() throws IOException {
bulkRequestSupplier,
pluginSetting);

objectMapper = new ObjectMapper();
this.initialized = true;
LOG.info("Initialized OpenSearch sink");
}
Expand Down Expand Up @@ -615,6 +615,7 @@ private void closeFiles() {
public void shutdown() {
super.shutdown();
closeFiles();
openSearchClient.shutdown();
}

private void maybeUpdateServerlessNetworkPolicy() {
Expand Down

0 comments on commit 3a70f10

Please sign in to comment.