Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Apply task manager backpressure whenever a 500 error is returned in t…
…he task store (#198418) In this PR, I'm making the task manager apply backpressure whenever a 500 error is returned in the task store (msearch or other SO I/O). ## To verify 1. Apply the following diff, run Kibana and notice logs about poll interval and capacity configuration changing ``` diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 2b3440e87c0..d2ffaa2f50f 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -574,6 +574,8 @@ export class TaskStore { const versionMap = this.createVersionMap([]); let allTasks = new Array<ConcreteTaskInstance>(); + responses[0].status = 500; + for (const response of responses) { if (response.status !== 200) { const err = new MsearchError(response.status); ``` 2. Undo previous changes, apply the following diff, run Kibana and notice logs about poll interval and capacity configuration changing ``` diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 2b3440e87c0..95d14152e1d 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -12,6 +12,7 @@ import murmurhash from 'murmurhash'; import { v4 } from 'uuid'; import { Subject } from 'rxjs'; import { omit, defaults, get } from 'lodash'; +import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import { SavedObjectError } from '@kbn/core-saved-objects-common'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; @@ -474,6 +475,7 @@ export class TaskStore { public async bulkGet(ids: string[]): Promise<BulkGetResult> { let result; try { + throw SavedObjectsErrorHelpers.decorateGeneralError(new Error('foo')); result = await this.savedObjectsRepository.bulkGet<SerializedConcreteTaskInstance>( ids.map((id) => ({ type: 'task', id })) ); ```
- Loading branch information