From 09bf858092d71b95c6cbed3bd43665107fb8bc8f Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Mon, 3 Feb 2025 16:45:44 -0500 Subject: [PATCH] Run the Get Connector request in a stashed threadcontext (#3492) * Run the Get Connector request in a stashed threadcontext Signed-off-by: Craig Perkins * Add restore context Signed-off-by: Craig Perkins --------- Signed-off-by: Craig Perkins --- .../opensearch/ml/model/MLModelManager.java | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java b/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java index 7cfc76ecdc..5bfc4fc4f5 100644 --- a/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java +++ b/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java @@ -2077,40 +2077,44 @@ public void getConnector(String connectorId, String tenantId, ActionListener { - log.debug("Completed Get Connector Request, id:{}", connectorId); - if (throwable != null) { - Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); - if (ExceptionsHelper.unwrap(cause, IndexNotFoundException.class) != null) { - log.error("Failed to get connector index", cause); - listener.onFailure(new OpenSearchStatusException("Failed to find connector", RestStatus.NOT_FOUND)); + try (ThreadContext.StoredContext ctx = client.threadPool().getThreadContext().stashContext()) { + sdkClient.getDataObjectAsync(getDataObjectRequest).whenComplete((r, throwable) -> { + log.debug("Completed Get Connector Request, id:{}", connectorId); + ctx.restore(); + if (throwable != null) { + Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); + if (ExceptionsHelper.unwrap(cause, IndexNotFoundException.class) != null) { + log.error("Failed to get connector index", cause); + listener.onFailure(new OpenSearchStatusException("Failed to find connector", RestStatus.NOT_FOUND)); + } else { + log.error("Failed to get ML connector {}", connectorId, cause); + listener.onFailure(cause); + } } else { - log.error("Failed to get ML connector {}", connectorId, cause); - listener.onFailure(cause); - } - } else { - try { - GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser()); - if (gr != null && gr.isExists()) { - try ( - XContentParser parser = MLNodeUtils - .createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, gr.getSourceAsBytesRef()) - ) { - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - Connector connector = Connector.createConnector(parser); - listener.onResponse(connector); - } catch (Exception e) { - log.error("Failed to parse connector:{}", connectorId); - listener.onFailure(e); + try { + GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser()); + if (gr != null && gr.isExists()) { + try ( + XContentParser parser = MLNodeUtils + .createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, gr.getSourceAsBytesRef()) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + Connector connector = Connector.createConnector(parser); + listener.onResponse(connector); + } catch (Exception e) { + log.error("Failed to parse connector:{}", connectorId); + listener.onFailure(e); + } + } else { + listener + .onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND)); } - } else { - listener.onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND)); + } catch (Exception e) { + listener.onFailure(e); } - } catch (Exception e) { - listener.onFailure(e); } - } - }); + }); + } } /**