Skip to content

Commit

Permalink
Run the Get Connector request in a stashed threadcontext (#3492) (#3496)
Browse files Browse the repository at this point in the history
* Run the Get Connector request in a stashed threadcontext

Signed-off-by: Craig Perkins <[email protected]>

* Add restore context

Signed-off-by: Craig Perkins <[email protected]>

---------

Signed-off-by: Craig Perkins <[email protected]>
(cherry picked from commit 09bf858)

Co-authored-by: Craig Perkins <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and cwperks authored Feb 3, 2025
1 parent 5a81701 commit ee2ab75
Showing 1 changed file with 34 additions and 30 deletions.
64 changes: 34 additions & 30 deletions plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -2074,40 +2074,44 @@ public void getConnector(String connectorId, String tenantId, ActionListener<Con
.tenantId(tenantId)
.build();

sdkClient.getDataObjectAsync(getDataObjectRequest).whenComplete((r, throwable) -> {
log.debug("Completed Get Connector Request, id:{}", connectorId);
if (throwable != null) {
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable);
if (ExceptionsHelper.unwrap(cause, IndexNotFoundException.class) != null) {
log.error("Failed to get connector index", cause);
listener.onFailure(new OpenSearchStatusException("Failed to find connector", RestStatus.NOT_FOUND));
try (ThreadContext.StoredContext ctx = client.threadPool().getThreadContext().stashContext()) {
sdkClient.getDataObjectAsync(getDataObjectRequest).whenComplete((r, throwable) -> {
log.debug("Completed Get Connector Request, id:{}", connectorId);
ctx.restore();
if (throwable != null) {
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable);
if (ExceptionsHelper.unwrap(cause, IndexNotFoundException.class) != null) {
log.error("Failed to get connector index", cause);
listener.onFailure(new OpenSearchStatusException("Failed to find connector", RestStatus.NOT_FOUND));
} else {
log.error("Failed to get ML connector {}", connectorId, cause);
listener.onFailure(cause);
}
} else {
log.error("Failed to get ML connector {}", connectorId, cause);
listener.onFailure(cause);
}
} else {
try {
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
if (gr != null && gr.isExists()) {
try (
XContentParser parser = MLNodeUtils
.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, gr.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Connector connector = Connector.createConnector(parser);
listener.onResponse(connector);
} catch (Exception e) {
log.error("Failed to parse connector:{}", connectorId);
listener.onFailure(e);
try {
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
if (gr != null && gr.isExists()) {
try (
XContentParser parser = MLNodeUtils
.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, gr.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Connector connector = Connector.createConnector(parser);
listener.onResponse(connector);
} catch (Exception e) {
log.error("Failed to parse connector:{}", connectorId);
listener.onFailure(e);
}
} else {
listener
.onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND));
}
} else {
listener.onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND));
} catch (Exception e) {
listener.onFailure(e);
}
} catch (Exception e) {
listener.onFailure(e);
}
}
});
});
}
}

/**
Expand Down

0 comments on commit ee2ab75

Please sign in to comment.