Skip to content

Commit

Permalink
[Dataset Quality ] Apply chunking strategy for data stream stats retr…
Browse files Browse the repository at this point in the history
…ieval (elastic#194816)

## 📓 Summary

Closes elastic#192169 

This work fixes the issue with some requests hitting the too-long HTTP
line once we combine all the dataset names into a single request.

We had a suggested strategy from the work done with elastic#171735 , but it
presented a couple of problems.
- The HTTP line length issue occurs for an exceeding length of the
request URL, which goes over 4096 bytes (4096 characters.) This also
includes the whole URL protocol, domain, path and any other parameters,
so assuming that we have 4096 characters for the `index` parameter is
incorrect, as we would exceed the maximum anyway in a worst-case
scenario, where we have a chunk of 16 values with length 255 chars.
- Always chunking the requests in groups of 16 items might not be
optimal in the most common scenario where we have short data stream
patterns.

I opted to adopt a different chunking strategy that optimizes each chunk
so that we reduce the requests triggered on the cluster to a minimum.

I'll leave more notes in the code to help with the review.

---------

Co-authored-by: Marco Antonio Ghiani <[email protected]>
  • Loading branch information
tonyghiani and Marco Antonio Ghiani authored Oct 7, 2024
1 parent 3a3f130 commit bff69e2
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export async function getDataStreamDetails({
}

async function getDataStreamCreatedOn(esClient: ElasticsearchClient, dataStream: string) {
const indexSettings = await dataStreamService.getDataSteamIndexSettings(esClient, dataStream);
const indexSettings = await dataStreamService.getDataStreamIndexSettings(esClient, dataStream);

const indexesList = Object.values(indexSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import type { ElasticsearchClient } from '@kbn/core/server';
import { reduceAsyncChunks } from '../../../utils/reduce_async_chunks';

export interface MeteringStatsResponse {
datastreams: Array<{
Expand All @@ -26,11 +27,13 @@ export async function getDataStreamsMeteringStats({
return {};
}

const { datastreams: dataStreamsStats } = await esClient.transport.request<MeteringStatsResponse>(
{
method: 'GET',
path: `/_metering/stats/` + dataStreams.join(','),
}
const { datastreams: dataStreamsStats } = await reduceAsyncChunks(
dataStreams,
(dataStreamsChunk) =>
esClient.transport.request<MeteringStatsResponse>({
method: 'GET',
path: `/_metering/stats/` + dataStreamsChunk.join(','),
})
);

return dataStreamsStats.reduce(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
IndicesDataStreamsStatsDataStreamsStatsItem,
} from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient } from '@kbn/core/server';
import { reduceAsyncChunks } from '../utils/reduce_async_chunks';

class DataStreamService {
public async getMatchingDataStreams(
Expand Down Expand Up @@ -37,10 +38,11 @@ class DataStreamService {
dataStreams: string[]
): Promise<IndicesDataStreamsStatsDataStreamsStatsItem[]> {
try {
const { data_streams: dataStreamsStats } = await esClient.indices.dataStreamsStats({
name: dataStreams.join(','),
human: true,
});
const { data_streams: dataStreamsStats } = await reduceAsyncChunks(
dataStreams,
(dataStreamsChunk) =>
esClient.indices.dataStreamsStats({ name: dataStreamsChunk.join(','), human: true })
);

return dataStreamsStats;
} catch (e) {
Expand All @@ -51,7 +53,7 @@ class DataStreamService {
}
}

public async getDataSteamIndexSettings(
public async getDataStreamIndexSettings(
esClient: ElasticsearchClient,
dataStream: string
): Promise<Awaited<ReturnType<ElasticsearchClient['indices']['getSettings']>>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import { chain, sumBy } from 'lodash';
import type { ElasticsearchClient } from '@kbn/core/server';
import { extractIndexNameFromBackingIndex } from '../../common/utils';
import { reduceAsyncChunks } from '../utils/reduce_async_chunks';

interface IndexStatsResponse {
docsCountPerDataStream: { [indexName: string]: number };
Expand All @@ -19,9 +20,9 @@ class IndexStatsService {
dataStreams: string[]
): Promise<IndexStatsResponse> {
try {
const index = dataStreams;

const { indices } = await esClient.indices.stats({ index, metric: ['docs'] });
const { indices } = await reduceAsyncChunks(dataStreams, (indexChunk) =>
esClient.indices.stats({ index: indexChunk, metric: ['docs'] })
);

const docsCountPerDataStream = chain(indices || {})
.map((indexStats, indexName) => ({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { reduceAsyncChunks } from './reduce_async_chunks';

describe('reduceAsyncChunks', () => {
const spyChunkExecutor = jest
.fn()
.mockImplementation((chunk: string[]) =>
Promise.resolve(chunk.map((str) => str.toUpperCase()))
);

afterEach(() => {
spyChunkExecutor.mockClear();
});

it('should run a iterator mapping callback on each chunk and merge the result', async () => {
const input = Array(20).fill('logs-dataset-default');
const expected = Array(20).fill('LOGS-DATASET-DEFAULT');

const res = await reduceAsyncChunks(input, spyChunkExecutor);

expect(res).toEqual(expected);
expect(spyChunkExecutor).toHaveBeenCalledTimes(1);
});

it('should create chunks where the total strings length does not exceed the allowed maximum', async () => {
const input = Array(1000).fill('logs-dataset-default'); // 20k chars => 20k/3072 => Expected 7 chunks
const expected = Array(1000).fill('LOGS-DATASET-DEFAULT');
const expectedChunks = 7;

const res = await reduceAsyncChunks(input, spyChunkExecutor);

expect(res).toEqual(expected);
expect(spyChunkExecutor).toHaveBeenCalledTimes(expectedChunks);
});

it('should maximize the chunks length the chunks count', async () => {
const input = [
...Array(1000).fill('logs-dataset_30letters-default'),
...Array(1000).fill('logs-dataset-default'),
]; // 30k chars + 20k chars + ~2k commas => 52k/3072 => Expected 17 chunks
const expected = [
...Array(1000).fill('LOGS-DATASET_30LETTERS-DEFAULT'),
...Array(1000).fill('LOGS-DATASET-DEFAULT'),
];
const expectedChunks = 17;

const res = await reduceAsyncChunks(input, spyChunkExecutor);

expect(res).toEqual(expected);
expect(spyChunkExecutor).toHaveBeenCalledTimes(expectedChunks);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Observable, OperatorFunction, from, lastValueFrom, mergeMap, reduce } from 'rxjs';
import deepmerge from 'deepmerge';

type CallbackFn<TResult> = (chunk: string[], id: number) => Promise<TResult>;

const MAX_HTTP_LINE_LENGTH = 4096;
// Apply an 80% threshold to the http line max length to guarantee enough space for url and potentially other parameters.
// This value might need to vary as it's an estimate of how much we can reserve for the chunked list length.
const MAX_CHUNK_LENGTH = MAX_HTTP_LINE_LENGTH * 0.75; // 4096 *0.75 === 3072 characters, as 1 chars = 1 byte

export const reduceAsyncChunks = <TResult>(list: string[], chunkExecutor: CallbackFn<TResult>) => {
const result$ = from(list).pipe(
bufferUntil(isLessThanMaxChunkLength),
mergeMap((chunk, id) => from(chunkExecutor(chunk, id))),
reduce((result, chunkResult) => deepmerge(result, chunkResult))
);

return lastValueFrom(result$);
};

/**
* Support functions for reduceAsyncChunks
*/
const bufferUntil = <TItem>(
predicate: (chunk: TItem[], currentItem: TItem) => boolean
): OperatorFunction<TItem, TItem[]> => {
return (source) =>
new Observable((observer) => {
let chunk: TItem[] = [];

return source.subscribe({
next(currentItem) {
if (predicate(chunk, currentItem)) {
chunk.push(currentItem);
} else {
// Emit the current chunk, start a new one
if (chunk.length > 0) observer.next(chunk);
chunk = [currentItem]; // Reset the chunk with the current item
}
},
complete() {
// Emit the final chunk if it has any items
if (chunk.length > 0) observer.next(chunk);
observer.complete();
},
});
});
};

const isLessThanMaxChunkLength = (chunk: string[], currentItem: string) => {
const totalLength = [...chunk, currentItem].join().length;
return totalLength <= MAX_CHUNK_LENGTH; // Allow the chunk until it exceeds the max chunk length
};

0 comments on commit bff69e2

Please sign in to comment.