From 424233e02645c5aecc4e03b01c2bd34377a96f4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Thu, 31 Oct 2024 09:34:42 -0400 Subject: [PATCH] Apply task manager backpressure whenever a 500 error is returned in the task store (#198418) In this PR, I'm making the task manager apply backpressure whenever a 500 error is returned in the task store (msearch or other SO I/O). ## To verify 1. Apply the following diff, run Kibana and notice logs about poll interval and capacity configuration changing ``` diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 2b3440e87c0..d2ffaa2f50f 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -574,6 +574,8 @@ export class TaskStore { const versionMap = this.createVersionMap([]); let allTasks = new Array(); + responses[0].status = 500; + for (const response of responses) { if (response.status !== 200) { const err = new MsearchError(response.status); ``` 2. Undo previous changes, apply the following diff, run Kibana and notice logs about poll interval and capacity configuration changing ``` diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 2b3440e87c0..95d14152e1d 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -12,6 +12,7 @@ import murmurhash from 'murmurhash'; import { v4 } from 'uuid'; import { Subject } from 'rxjs'; import { omit, defaults, get } from 'lodash'; +import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import { SavedObjectError } from '@kbn/core-saved-objects-common'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; @@ -474,6 +475,7 @@ export class TaskStore { public async bulkGet(ids: string[]): Promise { let result; try { + throw SavedObjectsErrorHelpers.decorateGeneralError(new Error('foo')); result = await this.savedObjectsRepository.bulkGet( ids.map((id) => ({ type: 'task', id })) ); ``` --- .../lib/create_managed_configuration.test.ts | 32 +++++++++++++++++++ .../lib/create_managed_configuration.ts | 2 ++ 2 files changed, 34 insertions(+) diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts index 5e0a5ed4f2e67..1da1bb11d1c5d 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts @@ -185,6 +185,17 @@ describe('createManagedConfiguration()', () => { expect(subscription).toHaveBeenNthCalledWith(2, 8); }); + test('should decrease configuration at the next interval when a 500 error is emitted', async () => { + const { subscription, errors$ } = setupScenario(10); + errors$.next(SavedObjectsErrorHelpers.decorateGeneralError(new Error('a'), 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + expect(subscription).toHaveBeenNthCalledWith(1, 10); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 8); + }); + test('should decrease configuration at the next interval when a 503 error is emitted', async () => { const { subscription, errors$ } = setupScenario(10); errors$.next(SavedObjectsErrorHelpers.createGenericNotFoundEsUnavailableError('a', 'b')); @@ -247,6 +258,17 @@ describe('createManagedConfiguration()', () => { expect(subscription).toHaveBeenNthCalledWith(2, 8); }); + test('should decrease configuration at the next interval when an msearch 500 error is emitted', async () => { + const { subscription, errors$ } = setupScenario(10); + errors$.next(new MsearchError(500)); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + expect(subscription).toHaveBeenNthCalledWith(1, 10); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 8); + }); + test('should decrease configuration at the next interval when an msearch 503 error is emitted', async () => { const { subscription, errors$ } = setupScenario(10); errors$.next(new MsearchError(503)); @@ -338,6 +360,16 @@ describe('createManagedConfiguration()', () => { expect(subscription).toHaveBeenNthCalledWith(2, 120); }); + test('should increase configuration at the next interval when a 500 error is emitted', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.decorateGeneralError(new Error('a'), 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 120); + }); + test('should increase configuration at the next interval when a 503 error is emitted', async () => { const { subscription, errors$ } = setupScenario(100); errors$.next(SavedObjectsErrorHelpers.createGenericNotFoundEsUnavailableError('a', 'b')); diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts index 8a76029efb8eb..d13c2511a2a2b 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts @@ -165,8 +165,10 @@ function countErrors(errors$: Observable, countInterval: number): Observa (e) => SavedObjectsErrorHelpers.isTooManyRequestsError(e) || SavedObjectsErrorHelpers.isEsUnavailableError(e) || + SavedObjectsErrorHelpers.isGeneralError(e) || isEsCannotExecuteScriptError(e) || getMsearchStatusCode(e) === 429 || + getMsearchStatusCode(e) === 500 || getMsearchStatusCode(e) === 503 ) )