From bff69e218aa0b776be0c90e9f1a91ba7e51551c2 Mon Sep 17 00:00:00 2001 From: Marco Antonio Ghiani Date: Mon, 7 Oct 2024 14:24:53 +0200 Subject: [PATCH] [Dataset Quality ] Apply chunking strategy for data stream stats retrieval (#194816) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 📓 Summary Closes #192169 This work fixes the issue with some requests hitting the too-long HTTP line once we combine all the dataset names into a single request. We had a suggested strategy from the work done with #171735 , but it presented a couple of problems. - The HTTP line length issue occurs for an exceeding length of the request URL, which goes over 4096 bytes (4096 characters.) This also includes the whole URL protocol, domain, path and any other parameters, so assuming that we have 4096 characters for the `index` parameter is incorrect, as we would exceed the maximum anyway in a worst-case scenario, where we have a chunk of 16 values with length 255 chars. - Always chunking the requests in groups of 16 items might not be optimal in the most common scenario where we have short data stream patterns. I opted to adopt a different chunking strategy that optimizes each chunk so that we reduce the requests triggered on the cluster to a minimum. I'll leave more notes in the code to help with the review. --------- Co-authored-by: Marco Antonio Ghiani --- .../get_data_stream_details/index.ts | 2 +- .../get_data_streams_metering_stats/index.ts | 13 ++-- .../server/services/data_stream.ts | 12 ++-- .../server/services/index_stats.ts | 7 ++- .../server/utils/reduce_async_chunks.test.ts | 58 ++++++++++++++++++ .../server/utils/reduce_async_chunks.ts | 59 +++++++++++++++++++ 6 files changed, 137 insertions(+), 14 deletions(-) create mode 100644 x-pack/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.test.ts create mode 100644 x-pack/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.ts diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_data_stream_details/index.ts b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_data_stream_details/index.ts index fd117d65ac99d..eb1d70b867dc4 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_data_stream_details/index.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_data_stream_details/index.ts @@ -119,7 +119,7 @@ export async function getDataStreamDetails({ } async function getDataStreamCreatedOn(esClient: ElasticsearchClient, dataStream: string) { - const indexSettings = await dataStreamService.getDataSteamIndexSettings(esClient, dataStream); + const indexSettings = await dataStreamService.getDataStreamIndexSettings(esClient, dataStream); const indexesList = Object.values(indexSettings); diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_data_streams_metering_stats/index.ts b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_data_streams_metering_stats/index.ts index bdf30533cbed9..5bd6f8b15f1c1 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_data_streams_metering_stats/index.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/routes/data_streams/get_data_streams_metering_stats/index.ts @@ -6,6 +6,7 @@ */ import type { ElasticsearchClient } from '@kbn/core/server'; +import { reduceAsyncChunks } from '../../../utils/reduce_async_chunks'; export interface MeteringStatsResponse { datastreams: Array<{ @@ -26,11 +27,13 @@ export async function getDataStreamsMeteringStats({ return {}; } - const { datastreams: dataStreamsStats } = await esClient.transport.request( - { - method: 'GET', - path: `/_metering/stats/` + dataStreams.join(','), - } + const { datastreams: dataStreamsStats } = await reduceAsyncChunks( + dataStreams, + (dataStreamsChunk) => + esClient.transport.request({ + method: 'GET', + path: `/_metering/stats/` + dataStreamsChunk.join(','), + }) ); return dataStreamsStats.reduce( diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_stream.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_stream.ts index 16b283d583fd3..1157b40936a6d 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/services/data_stream.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/data_stream.ts @@ -10,6 +10,7 @@ import type { IndicesDataStreamsStatsDataStreamsStatsItem, } from '@elastic/elasticsearch/lib/api/types'; import type { ElasticsearchClient } from '@kbn/core/server'; +import { reduceAsyncChunks } from '../utils/reduce_async_chunks'; class DataStreamService { public async getMatchingDataStreams( @@ -37,10 +38,11 @@ class DataStreamService { dataStreams: string[] ): Promise { try { - const { data_streams: dataStreamsStats } = await esClient.indices.dataStreamsStats({ - name: dataStreams.join(','), - human: true, - }); + const { data_streams: dataStreamsStats } = await reduceAsyncChunks( + dataStreams, + (dataStreamsChunk) => + esClient.indices.dataStreamsStats({ name: dataStreamsChunk.join(','), human: true }) + ); return dataStreamsStats; } catch (e) { @@ -51,7 +53,7 @@ class DataStreamService { } } - public async getDataSteamIndexSettings( + public async getDataStreamIndexSettings( esClient: ElasticsearchClient, dataStream: string ): Promise>> { diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/services/index_stats.ts b/x-pack/plugins/observability_solution/dataset_quality/server/services/index_stats.ts index 967a8c393bcc8..70cdb6d260afb 100644 --- a/x-pack/plugins/observability_solution/dataset_quality/server/services/index_stats.ts +++ b/x-pack/plugins/observability_solution/dataset_quality/server/services/index_stats.ts @@ -8,6 +8,7 @@ import { chain, sumBy } from 'lodash'; import type { ElasticsearchClient } from '@kbn/core/server'; import { extractIndexNameFromBackingIndex } from '../../common/utils'; +import { reduceAsyncChunks } from '../utils/reduce_async_chunks'; interface IndexStatsResponse { docsCountPerDataStream: { [indexName: string]: number }; @@ -19,9 +20,9 @@ class IndexStatsService { dataStreams: string[] ): Promise { try { - const index = dataStreams; - - const { indices } = await esClient.indices.stats({ index, metric: ['docs'] }); + const { indices } = await reduceAsyncChunks(dataStreams, (indexChunk) => + esClient.indices.stats({ index: indexChunk, metric: ['docs'] }) + ); const docsCountPerDataStream = chain(indices || {}) .map((indexStats, indexName) => ({ diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.test.ts b/x-pack/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.test.ts new file mode 100644 index 0000000000000..49a2ddf672d1d --- /dev/null +++ b/x-pack/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.test.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { reduceAsyncChunks } from './reduce_async_chunks'; + +describe('reduceAsyncChunks', () => { + const spyChunkExecutor = jest + .fn() + .mockImplementation((chunk: string[]) => + Promise.resolve(chunk.map((str) => str.toUpperCase())) + ); + + afterEach(() => { + spyChunkExecutor.mockClear(); + }); + + it('should run a iterator mapping callback on each chunk and merge the result', async () => { + const input = Array(20).fill('logs-dataset-default'); + const expected = Array(20).fill('LOGS-DATASET-DEFAULT'); + + const res = await reduceAsyncChunks(input, spyChunkExecutor); + + expect(res).toEqual(expected); + expect(spyChunkExecutor).toHaveBeenCalledTimes(1); + }); + + it('should create chunks where the total strings length does not exceed the allowed maximum', async () => { + const input = Array(1000).fill('logs-dataset-default'); // 20k chars => 20k/3072 => Expected 7 chunks + const expected = Array(1000).fill('LOGS-DATASET-DEFAULT'); + const expectedChunks = 7; + + const res = await reduceAsyncChunks(input, spyChunkExecutor); + + expect(res).toEqual(expected); + expect(spyChunkExecutor).toHaveBeenCalledTimes(expectedChunks); + }); + + it('should maximize the chunks length the chunks count', async () => { + const input = [ + ...Array(1000).fill('logs-dataset_30letters-default'), + ...Array(1000).fill('logs-dataset-default'), + ]; // 30k chars + 20k chars + ~2k commas => 52k/3072 => Expected 17 chunks + const expected = [ + ...Array(1000).fill('LOGS-DATASET_30LETTERS-DEFAULT'), + ...Array(1000).fill('LOGS-DATASET-DEFAULT'), + ]; + const expectedChunks = 17; + + const res = await reduceAsyncChunks(input, spyChunkExecutor); + + expect(res).toEqual(expected); + expect(spyChunkExecutor).toHaveBeenCalledTimes(expectedChunks); + }); +}); diff --git a/x-pack/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.ts b/x-pack/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.ts new file mode 100644 index 0000000000000..cd47f577b46bf --- /dev/null +++ b/x-pack/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.ts @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { Observable, OperatorFunction, from, lastValueFrom, mergeMap, reduce } from 'rxjs'; +import deepmerge from 'deepmerge'; + +type CallbackFn = (chunk: string[], id: number) => Promise; + +const MAX_HTTP_LINE_LENGTH = 4096; +// Apply an 80% threshold to the http line max length to guarantee enough space for url and potentially other parameters. +// This value might need to vary as it's an estimate of how much we can reserve for the chunked list length. +const MAX_CHUNK_LENGTH = MAX_HTTP_LINE_LENGTH * 0.75; // 4096 *0.75 === 3072 characters, as 1 chars = 1 byte + +export const reduceAsyncChunks = (list: string[], chunkExecutor: CallbackFn) => { + const result$ = from(list).pipe( + bufferUntil(isLessThanMaxChunkLength), + mergeMap((chunk, id) => from(chunkExecutor(chunk, id))), + reduce((result, chunkResult) => deepmerge(result, chunkResult)) + ); + + return lastValueFrom(result$); +}; + +/** + * Support functions for reduceAsyncChunks + */ +const bufferUntil = ( + predicate: (chunk: TItem[], currentItem: TItem) => boolean +): OperatorFunction => { + return (source) => + new Observable((observer) => { + let chunk: TItem[] = []; + + return source.subscribe({ + next(currentItem) { + if (predicate(chunk, currentItem)) { + chunk.push(currentItem); + } else { + // Emit the current chunk, start a new one + if (chunk.length > 0) observer.next(chunk); + chunk = [currentItem]; // Reset the chunk with the current item + } + }, + complete() { + // Emit the final chunk if it has any items + if (chunk.length > 0) observer.next(chunk); + observer.complete(); + }, + }); + }); +}; + +const isLessThanMaxChunkLength = (chunk: string[], currentItem: string) => { + const totalLength = [...chunk, currentItem].join().length; + return totalLength <= MAX_CHUNK_LENGTH; // Allow the chunk until it exceeds the max chunk length +};