Skip to content

Commit

Permalink
[Chore][Perf] Remove temp allocation from tabifyAggs (elastic#182959)
Browse files Browse the repository at this point in the history
## Summary

Removes temporary allocation from `tabifyAggResponse`.

We saw some encouraging performance metrics after the removal of
temporary allocation from `tabifyDocs`
elastic#180647. This PR introduces
similar improvements for the tabify-function for aggregation responses.

### For maintainers

- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
  • Loading branch information
thomasneirynck authored May 10, 2024
1 parent 73864cc commit fc042bf
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 134 deletions.
10 changes: 6 additions & 4 deletions src/plugins/data/common/search/tabify/buckets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ export class TabifyBuckets {
const buckets = this.buckets;

if (this.objectMode) {
this._keys.forEach((key) => {
for (let i = 0; i < this._keys.length; i++) {
const key = this._keys[i];
fn(buckets[key], key);
});
}
} else {
buckets.forEach((bucket: AggResponseBucket) => {
for (let i = 0; i < buckets.length; i++) {
const bucket = buckets[i];
fn(bucket, bucket.key);
});
}
}
}

Expand Down
20 changes: 15 additions & 5 deletions src/plugins/data/common/search/tabify/response_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,25 @@ export class TabbedAggResponseWriter {
row() {
const rowBuffer: TabbedAggRow = {};

this.bucketBuffer.forEach((bucket) => {
for (let i = 0; i < this.bucketBuffer.length; i++) {
const bucket = this.bucketBuffer[i];
rowBuffer[bucket.id] = bucket.value;
});
}

this.metricBuffer.forEach((metric) => {
for (let i = 0; i < this.metricBuffer.length; i++) {
const metric = this.metricBuffer[i];
rowBuffer[metric.id] = metric.value;
});
}

let isPartialRow = false;
for (let i = 0; i < this.columns.length; i++) {
const column = this.columns[i];
if (!rowBuffer.hasOwnProperty(column.id)) {
isPartialRow = true;
break;
}
}

const isPartialRow = !this.columns.every((column) => rowBuffer.hasOwnProperty(column.id));
const removePartial = isPartialRow && !this.partialRows;
if (!isEmpty(rowBuffer) && !removePartial) {
this.rows.push(rowBuffer);
Expand Down
262 changes: 137 additions & 125 deletions src/plugins/data/common/search/tabify/tabify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,141 +15,153 @@ import { AggResponseBucket } from './types';
import { AggGroupNames, IAggConfigs } from '../aggs';

/**
* Sets up the ResponseWriter and kicks off bucket collection.
* read an aggregation from a bucket, which *might* be found at key (if
* the response came in object form), and will recurse down the aggregation
* tree and will pass the read values to the ResponseWriter.
*/
export function tabifyAggResponse(
aggConfigs: IAggConfigs,
esResponse: Record<string, any>,
function collectBucket(
aggs: IAggConfigs,
write: TabbedAggResponseWriter,
bucket: AggResponseBucket,
key: string,
aggScale: number,
respOpts?: Partial<TabbedResponseWriterOptions>
): Datatable {
/**
* read an aggregation from a bucket, which *might* be found at key (if
* the response came in object form), and will recurse down the aggregation
* tree and will pass the read values to the ResponseWriter.
*/
function collectBucket(
aggs: IAggConfigs,
write: TabbedAggResponseWriter,
bucket: AggResponseBucket,
key: string,
aggScale: number
) {
const column = write.columns.shift();

if (column) {
const agg = column.aggConfig;
if (agg.getParam('scaleMetricValues')) {
const aggInfo = agg.write(aggs);
aggScale *= aggInfo.metricScale || 1;
}

switch (agg.type.type) {
case AggGroupNames.Buckets:
const aggBucket = get(bucket, agg.id) as Record<string, unknown>;
const tabifyBuckets = new TabifyBuckets(aggBucket, agg, respOpts?.timeRange);
const precisionError = agg.type.hasPrecisionError?.(aggBucket);

if (precisionError) {
// "сolumn" mutation, we have to do this here as this value is filled in based on aggBucket value
column.hasPrecisionError = true;
}

if (tabifyBuckets.length) {
tabifyBuckets.forEach((subBucket, tabifyBucketKey) => {
// if the bucket doesn't have value don't add it to the row
// we don't want rows like: { column1: undefined, column2: 10 }
const bucketValue = agg.getKey(subBucket, tabifyBucketKey);
const hasBucketValue = typeof bucketValue !== 'undefined';

if (hasBucketValue) {
write.bucketBuffer.push({ id: column.id, value: bucketValue });
}

collectBucket(
aggs,
write,
subBucket,
agg.getKey(subBucket, tabifyBucketKey),
aggScale
);

if (hasBucketValue) {
write.bucketBuffer.pop();
}
});
} else if (respOpts?.partialRows) {
// we don't have any buckets, but we do have metrics at this
// level, then pass all the empty buckets and jump back in for
// the metrics.
write.columns.unshift(column);
passEmptyBuckets(aggs, write, bucket, key, aggScale);
write.columns.shift();
} else {
// we don't have any buckets, and we don't have isHierarchical
// data, so no metrics, just try to write the row
write.row();
}
break;
case AggGroupNames.Metrics:
let value = agg.getValue(bucket);
// since the aggregation could be a non integer (such as a max date)
// only do the scaling calculation if it is needed.
if (aggScale !== 1) {
value *= aggScale;
}
write.metricBuffer.push({ id: column.id, value });

if (!write.columns.length) {
// row complete
write.row();
} else {
// process the next agg at this same level
collectBucket(aggs, write, bucket, key, aggScale);
}

write.metricBuffer.pop();

break;
}

write.columns.unshift(column);
) {
const column = write.columns.shift();

if (column) {
const agg = column.aggConfig;
if (agg.getParam('scaleMetricValues')) {
const aggInfo = agg.write(aggs);
aggScale *= aggInfo.metricScale || 1;
}
}

// write empty values for each bucket agg, then write
// the metrics from the initial bucket using collectBucket()
function passEmptyBuckets(
aggs: IAggConfigs,
write: TabbedAggResponseWriter,
bucket: AggResponseBucket,
key: string,
aggScale: number
) {
const column = write.columns.shift();

if (column) {
const agg = column.aggConfig;

switch (agg.type.type) {
case AggGroupNames.Metrics:
// pass control back to collectBucket()
switch (agg.type.type) {
case AggGroupNames.Buckets:
const aggBucket = get(bucket, agg.id) as Record<string, unknown>;
const tabifyBuckets = new TabifyBuckets(aggBucket, agg, respOpts?.timeRange);
const precisionError = agg.type.hasPrecisionError?.(aggBucket);

if (precisionError) {
// "сolumn" mutation, we have to do this here as this value is filled in based on aggBucket value
column.hasPrecisionError = true;
}

if (tabifyBuckets.length) {
tabifyBuckets.forEach((subBucket, tabifyBucketKey) => {
// if the bucket doesn't have value don't add it to the row
// we don't want rows like: { column1: undefined, column2: 10 }
const bucketValue = agg.getKey(subBucket, tabifyBucketKey);
const hasBucketValue = typeof bucketValue !== 'undefined';

if (hasBucketValue) {
write.bucketBuffer.push({ id: column.id, value: bucketValue });
}

collectBucket(
aggs,
write,
subBucket,
agg.getKey(subBucket, tabifyBucketKey),
aggScale,
respOpts
);

if (hasBucketValue) {
write.bucketBuffer.pop();
}
});
} else if (respOpts?.partialRows) {
// we don't have any buckets, but we do have metrics at this
// level, then pass all the empty buckets and jump back in for
// the metrics.
write.columns.unshift(column);
collectBucket(aggs, write, bucket, key, aggScale);
return;

case AggGroupNames.Buckets:
passEmptyBuckets(aggs, write, bucket, key, aggScale);
}
write.columns.shift();
} else {
// we don't have any buckets, and we don't have isHierarchical
// data, so no metrics, just try to write the row
write.row();
}
break;
case AggGroupNames.Metrics:
let value = agg.getValue(bucket);
// since the aggregation could be a non integer (such as a max date)
// only do the scaling calculation if it is needed.
if (aggScale !== 1) {
value *= aggScale;
}
write.metricBuffer.push({ id: column.id, value });

if (!write.columns.length) {
// row complete
write.row();
} else {
// process the next agg at this same level
collectBucket(aggs, write, bucket, key, aggScale, respOpts);
}

write.metricBuffer.pop();

break;
}

write.columns.unshift(column);
}
}

write.columns.unshift(column);
// write empty values for each bucket agg, then write
// the metrics from the initial bucket using collectBucket()
function passEmptyBuckets(
aggs: IAggConfigs,
write: TabbedAggResponseWriter,
bucket: AggResponseBucket,
key: string,
aggScale: number
) {
const column = write.columns.shift();

if (column) {
const agg = column.aggConfig;

switch (agg.type.type) {
case AggGroupNames.Metrics:
// pass control back to collectBucket()
write.columns.unshift(column);
collectBucket(aggs, write, bucket, key, aggScale);
return;

case AggGroupNames.Buckets:
passEmptyBuckets(aggs, write, bucket, key, aggScale);
}

write.columns.unshift(column);
}
}

function hasDocCount(key: string) {
return /doc_count_/.test(key);
}

/**
* Sets up the ResponseWriter and kicks off bucket collection.
*/
export function tabifyAggResponse(
aggConfigs: IAggConfigs,
esResponse: Record<string, any>,
respOpts?: Partial<TabbedResponseWriterOptions>
): Datatable {
const write = new TabbedAggResponseWriter(aggConfigs, respOpts || {});
// Check whether there's a time shift for a count operation at root level
const hasMultipleDocCountAtRootWithFilters = Object.keys(esResponse.aggregations ?? {}).some(
(key) => /doc_count_/.test(key)
);

let hasMultipleDocCountAtRootWithFilters = false;
if (esResponse.aggregations) {
for (const key in esResponse.aggregations) {
if (hasDocCount(key)) {
hasMultipleDocCountAtRootWithFilters = true;
break;
}
}
}

const topLevelBucket: AggResponseBucket = {
...(aggConfigs.isSamplingEnabled()
Expand All @@ -167,7 +179,7 @@ export function tabifyAggResponse(
}
}

collectBucket(aggConfigs, write, topLevelBucket, '', 1);
collectBucket(aggConfigs, write, topLevelBucket, '', 1, respOpts);

return {
...write.response(),
Expand Down

0 comments on commit fc042bf

Please sign in to comment.