Skip to content

Commit

Permalink
[Task Manager] Handles case where buffer receives multiple entities w…
Browse files Browse the repository at this point in the history
…ith the same ID (#74943) (#75150)

Handles the case where two operations for the same entity make it into a single batched bulk operation and avoid the clashing ID issue that could cause the poller to hang and stop poling for work).
  • Loading branch information
gmmorris authored Aug 18, 2020
1 parent 17cf06e commit 02d92cd
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 11 deletions.
33 changes: 33 additions & 0 deletions x-pack/plugins/task_manager/server/buffered_task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,39 @@ describe('Buffered Task Store', () => {
);
expect(await results[2]).toMatchObject(tasks[2]);
});

test('handles multiple items with the same id', async () => {
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
const bufferedStore = new BufferedTaskStore(taskStore, {});

const duplicateIdTask = mockTask();
const tasks = [
duplicateIdTask,
mockTask(),
mockTask(),
{ ...mockTask(), id: duplicateIdTask.id },
];

taskStore.bulkUpdate.mockResolvedValueOnce([
asOk(tasks[0]),
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
asOk(tasks[2]),
asOk(tasks[3]),
]);

const results = [
bufferedStore.update(tasks[0]),
bufferedStore.update(tasks[1]),
bufferedStore.update(tasks[2]),
bufferedStore.update(tasks[3]),
];
expect(await results[0]).toMatchObject(tasks[0]);
expect(results[1]).rejects.toMatchInlineSnapshot(
`[Error: Oh no, something went terribly wrong]`
);
expect(await results[2]).toMatchObject(tasks[2]);
expect(await results[3]).toMatchObject(tasks[3]);
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer';
import { mapErr, asOk, asErr, Ok, Err } from './result_type';
import { mockLogger } from '../test_utils';

interface TaskInstance extends Entity {
attempts: number;
Expand Down Expand Up @@ -227,5 +228,38 @@ describe('Bulk Operation Buffer', () => {
done();
});
});

test('logs unknown bulk operation results', async (done) => {
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
([task1, task2, task3]) => {
return Promise.resolve([
incrementAttempts(task1),
errorAttempts(createTask()),
incrementAttempts(createTask()),
]);
}
);

const logger = mockLogger();

const bufferedUpdate = createBuffer(bulkUpdate, { logger });

const task1 = createTask();
const task2 = createTask();
const task3 = createTask();

return Promise.all([
expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)),
expect(bufferedUpdate(task2)).rejects.toMatchObject(
asErr(new Error(`Unhandled buffered operation for entity: ${task2.id}`))
),
expect(bufferedUpdate(task3)).rejects.toMatchObject(
asErr(new Error(`Unhandled buffered operation for entity: ${task3.id}`))
),
]).then(() => {
expect(logger.warn).toHaveBeenCalledTimes(2);
done();
});
});
});
});
64 changes: 55 additions & 9 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { keyBy, map } from 'lodash';
import { map } from 'lodash';
import { Subject, race, from } from 'rxjs';
import { bufferWhen, filter, bufferCount, flatMap, mapTo, first } from 'rxjs/operators';
import { either, Result, asOk, asErr, Ok, Err } from './result_type';
import { Logger } from '../types';

export interface BufferOptions {
bufferMaxDuration?: number;
bufferMaxOperations?: number;
logger?: Logger;
}

export interface Entity {
Expand Down Expand Up @@ -41,39 +43,76 @@ const FLUSH = true;

export function createBuffer<Input extends Entity, ErrorOutput, Output extends Entity = Input>(
bulkOperation: BulkOperation<Input, ErrorOutput, Output>,
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE }: BufferOptions = {}
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE, logger }: BufferOptions = {}
): Operation<Input, ErrorOutput, Output> {
const flushBuffer = new Subject<void>();

const storeUpdateBuffer = new Subject<{
entity: Input;
onSuccess: (entity: Ok<Output>) => void;
onFailure: (error: Err<ErrorOutput>) => void;
onFailure: (error: Err<ErrorOutput | Error>) => void;
}>();

storeUpdateBuffer
.pipe(
bufferWhen(() => flushBuffer),
filter((tasks) => tasks.length > 0)
)
.subscribe((entities) => {
const entityById = keyBy(entities, ({ entity: { id } }) => id);
bulkOperation(map(entities, 'entity'))
.subscribe((bufferedEntities) => {
bulkOperation(map(bufferedEntities, 'entity'))
.then((results) => {
results.forEach((result) =>
either(
result,
(entity) => {
entityById[entity.id].onSuccess(asOk(entity));
either(
pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === entity.id),
({ onSuccess }) => {
onSuccess(asOk(entity));
},
() => {
if (logger) {
logger.warn(
`Unhandled successful Bulk Operation result: ${
entity?.id ? entity.id : entity
}`
);
}
}
);
},
({ entity, error }: OperationError<Input, ErrorOutput>) => {
entityById[entity.id].onFailure(asErr(error));
either(
pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === entity.id),
({ onFailure }) => {
onFailure(asErr(error));
},
() => {
if (logger) {
logger.warn(
`Unhandled failed Bulk Operation result: ${entity?.id ? entity.id : entity}`
);
}
}
);
}
)
);

// if any `bufferedEntities` remain in the array then there was no result we could map to them in the bulkOperation
// call their failure handler to avoid hanging the promise returned to the call site
bufferedEntities.forEach((unhandledBufferedEntity) => {
unhandledBufferedEntity.onFailure(
asErr(
new Error(
`Unhandled buffered operation for entity: ${unhandledBufferedEntity.entity.id}`
)
)
);
});
})
.catch((ex) => {
entities.forEach(({ onFailure }) => onFailure(asErr(ex)));
bufferedEntities.forEach(({ onFailure }) => onFailure(asErr(ex)));
});
});

Expand Down Expand Up @@ -120,3 +159,10 @@ function resolveIn(ms: number) {
setTimeout(resolve, ms);
});
}

function pullFirstWhere<T>(collection: T[], predicate: (entity: T) => boolean): Result<T, void> {
const indexOfFirstEntity = collection.findIndex(predicate);
return indexOfFirstEntity >= 0
? asOk(collection.splice(indexOfFirstEntity, 1)[0])
: asErr(undefined);
}
5 changes: 3 additions & 2 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ export class TaskManager {

this.bufferedStore = new BufferedTaskStore(this.store, {
bufferMaxOperations: opts.config.max_workers,
logger: this.logger,
});

this.pool = new TaskPool({
Expand Down Expand Up @@ -283,7 +284,7 @@ export class TaskManager {
*/
public async schedule(
taskInstance: TaskInstanceWithDeprecatedFields,
options?: object
options?: Record<string, unknown>
): Promise<ConcreteTaskInstance> {
await this.waitUntilStarted();
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
Expand Down Expand Up @@ -318,7 +319,7 @@ export class TaskManager {
*/
public async ensureScheduled(
taskInstance: TaskInstanceWithId,
options?: object
options?: Record<string, unknown>
): Promise<TaskInstanceWithId> {
try {
return await this.schedule(taskInstance, options);
Expand Down

0 comments on commit 02d92cd

Please sign in to comment.