Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a feature that flattens custom result index when enabled #1401

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1037,14 +1037,8 @@ public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionLis
choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
actionListener.onResponse(response);
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,19 +472,25 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
String configId = RestHandlerUtils.getConfigIdFromIndexResponse(createConfigResponse);
String flattenedResultIndexAlias = timeSeriesIndices
.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(configId);

timeSeriesIndices
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener.wrap(initResponse -> setupIngestPipeline(configId, ActionListener.wrap(pipelineResponse -> {
updateResultIndexSetting(
pipelineId,
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener.wrap(updateResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
);
}, listener::onFailure)), listener::onFailure)
);
ActionListener.wrap(
initResponse -> setupIngestPipeline(configId, listener, createConfigResponse),
listener::onFailure));

// timeSeriesIndices
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
// .initFlattenedResultIndex(
// flattenedResultIndexAlias,
// ActionListener.wrap(initResponse -> setupIngestPipeline(configId, ActionListener.wrap(pipelineResponse -> {
// updateResultIndexSetting(
// pipelineId,
// flattenedResultIndexAlias,
// ActionListener.wrap(updateResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
// );
// }, listener::onFailure)), listener::onFailure)
// );
} else {
listener.onResponse(createConfigResponse);
}
Expand All @@ -497,7 +503,7 @@ private boolean shouldHandleFlattening(boolean indexingDryRun) {
return !indexingDryRun && config.getCustomResultIndexOrAlias() != null && Boolean.TRUE.equals(flattenResultIndexMapping);
}

protected void setupIngestPipeline(String configId, ActionListener<T> listener) {
protected void setupIngestPipeline(String configId, ActionListener<T> listener, T createConfigResponse) {
String flattenedResultIndexAlias = timeSeriesIndices.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(configId);

Expand All @@ -506,15 +512,11 @@ protected void setupIngestPipeline(String configId, ActionListener<T> listener)

PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);

client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
listener.onResponse(null);
} else {
String errorMessage = "Ingest pipeline creation was not acknowledged for pipelineId: " + pipelineId;
logger.error(errorMessage);
listener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}
client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(
putPipelineResponse -> {
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
bindIngestPipelineWithFlattenedResultIndex(pipelineId, configId, flattenedResultIndexAlias, listener, createConfigResponse);

}, exception -> {
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
listener.onFailure(exception);
Expand Down Expand Up @@ -551,26 +553,15 @@ private BytesReference createPipelineDefinition(String indexName) throws IOExcep
return BytesReference.bytes(pipelineBuilder);
}

protected void updateResultIndexSetting(String pipelineId, String flattenedResultIndex, ActionListener<T> listener) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest();
updateSettingsRequest.indices(flattenedResultIndex);
protected void bindIngestPipelineWithFlattenedResultIndex(String pipelineId, String configId, String flattenedResultIndexAlias, ActionListener<T> listener, T createConfigResponse) {
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(pipelineId, configId);

Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put("index.default_pipeline", pipelineId);

updateSettingsRequest.settings(settingsBuilder);

client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId);
listener.onResponse(null);
} else {
String errorMsg = "Settings update not acknowledged for index: " + flattenedResultIndex;
logger.error(errorMsg);
listener.onFailure(new OpenSearchStatusException(errorMsg, RestStatus.INTERNAL_SERVER_ERROR));
}
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(
updateSettingsResponse -> {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
listener.onResponse(createConfigResponse);
}, exception -> {
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId, exception);
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
listener.onFailure(exception);
}));
}
Expand Down Expand Up @@ -678,6 +669,7 @@ private UpdateSettingsRequest buildUpdateSettingsRequest(String defaultPipelineN
}

private void unbindIngestPipelineWithFlattenedResultIndex(Config existingConfig, ActionListener<T> listener, String id, boolean indexingDryRun) {
// The pipeline name _none specifies that the index does not have an ingest pipeline.
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest("_none", existingConfig.getId());
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(
updateSettingsResponse -> deleteIngestPipeline(existingConfig, listener, id, indexingDryRun),
Expand Down