Skip to content

Commit

Permalink
HARMONY-1508: Add additional tests for work-item updater service
Browse files Browse the repository at this point in the history
  • Loading branch information
indiejames committed Jul 19, 2023
1 parent cc4d84a commit 032b1c8
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 123 deletions.
2 changes: 1 addition & 1 deletion kubernetes-services/work-updater/app/workers/updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export async function batchProcessQueue(queueType: WorkItemQueueType): Promise<v
// efficient than deleting them one at a time.
const updates: WorkItemUpdateQueueItem[] = messages.map((msg) => JSON.parse(msg.body));
try {
await handleBatchWorkItemUpdates(updates, defaultLogger);
await exports.handleBatchWorkItemUpdates(updates, defaultLogger);
} catch (e) {
defaultLogger.error(`Error processing work item updates from queue: ${e}`);
}
Expand Down
210 changes: 88 additions & 122 deletions kubernetes-services/work-updater/test/updater.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/* eslint-disable node/no-unpublished-import */
import { expect } from 'chai';
import { describe, it } from 'mocha';
import * as sinon from 'sinon';
import { SinonStub } from 'sinon';
import { SinonSpy, SinonStub } from 'sinon';
import { Logger } from 'winston';
import * as updater from '../app/workers/updater';
import * as queueFactory from '../../../app/util/queue/queue-factory';
Expand All @@ -19,6 +18,7 @@ describe('Updater Worker', async function () {
let getQueueForTypeStub: SinonStub;
let getJobIdForWorkItemStub: SinonStub;
let handleWorkItemUpdateWithJobIdStub: SinonStub;
let handleBatchWorkItemUpdatesSpy: SinonSpy;

before(function () {
getQueueForTypeStub = sinon.stub(queueFactory, 'getQueueForType').callsFake(function (type: WorkItemQueueType) {
Expand All @@ -33,27 +33,33 @@ describe('Updater Worker', async function () {
handleWorkItemUpdateWithJobIdStub = sinon.stub(wiu, 'handleWorkItemUpdateWithJobId').callsFake(async function (_jobID: string, _update: WorkItemUpdate, _operation: DataOperation, _logger: Logger): Promise<void> {
return;
});
handleBatchWorkItemUpdatesSpy = sinon.spy(updater, 'handleBatchWorkItemUpdates');
});

after(function () {
getQueueForTypeStub.restore();
getJobIdForWorkItemStub.restore();
handleWorkItemUpdateWithJobIdStub.restore();
handleBatchWorkItemUpdatesSpy.restore();
});

this.beforeEach(function () {
handleWorkItemUpdateWithJobIdStub.resetHistory();
handleBatchWorkItemUpdatesSpy.resetHistory();
});

describe('large job update', async function () {

beforeEach(async function () {
await largeUpdateQueue.purge();
await updater.batchProcessQueue(WorkItemQueueType.LARGE_ITEM_UPDATE);
});

describe('when the queue is empty', async function () {
it('should call getQueueForType', async function () {
await updater.batchProcessQueue(WorkItemQueueType.LARGE_ITEM_UPDATE);
expect(getQueueForTypeStub.called).to.be.true;
});
it('should not call handleWorkItemUpdateWithJobId', async function () {
await updater.batchProcessQueue(WorkItemQueueType.LARGE_ITEM_UPDATE);
expect(handleWorkItemUpdateWithJobIdStub.called).to.be.false;
});
});
Expand All @@ -64,136 +70,96 @@ describe('Updater Worker', async function () {
const operation = {};
await largeUpdateQueue.purge();
await largeUpdateQueue.sendMessage(JSON.stringify({ update, operation }), '', false);
handleWorkItemUpdateWithJobIdStub.resetHistory();
await updater.batchProcessQueue(WorkItemQueueType.LARGE_ITEM_UPDATE);
});

it('should call getQueueForType', async function () {
await updater.batchProcessQueue(WorkItemQueueType.LARGE_ITEM_UPDATE);
expect(getQueueForTypeStub.called).to.be.true;
});
it('should call handleWorkItemUpdateWithJobId once', async function () {
await updater.batchProcessQueue(WorkItemQueueType.LARGE_ITEM_UPDATE);
expect(handleWorkItemUpdateWithJobIdStub.callCount).to.equal(1);
});
});

describe('when the queue has two items', async function () {
this.beforeEach(async function () {
const update1 = { workItemId: 1 };
const update2 = { workItemId: 2 };
const operation = {};
await largeUpdateQueue.purge();
await largeUpdateQueue.sendMessage(JSON.stringify({ update: update1, operation }), '', false);
await largeUpdateQueue.sendMessage(JSON.stringify({ update: update2, operation }), '', false);
await updater.batchProcessQueue(WorkItemQueueType.LARGE_ITEM_UPDATE);
});

it('should call getQueueForType', async function () {
expect(getQueueForTypeStub.called).to.be.true;
});
it('should call handleWorkItemUpdateWithJobId twice', async function () {
expect(handleWorkItemUpdateWithJobIdStub.callCount).to.equal(2);
});
it('should not call handleBatchWorkItemUpdates', async function () {
expect(handleBatchWorkItemUpdatesSpy.called).to.be.false;
});
});
});

// describe('small job update', async function () {
// });
describe('small job update', async function () {

beforeEach(async function () {
await smallUpdateQueue.purge();
await updater.batchProcessQueue(WorkItemQueueType.SMALL_ITEM_UPDATE);
});

describe('when the queue is empty', async function () {
it('should call getQueueForType', async function () {
expect(getQueueForTypeStub.called).to.be.true;
});
it('should not call handleWorkItemUpdateWithJobId', async function () {
await updater.batchProcessQueue(WorkItemQueueType.SMALL_ITEM_UPDATE);
expect(handleWorkItemUpdateWithJobIdStub.called).to.be.false;
});
});

describe('when the queue has one item', async function () {
this.beforeEach(async function () {
const update = { workItemId: 1 };
const operation = {};
await smallUpdateQueue.purge();
await smallUpdateQueue.sendMessage(JSON.stringify({ update, operation }), '', false);
await updater.batchProcessQueue(WorkItemQueueType.SMALL_ITEM_UPDATE);
});

it('should call getQueueForType', async function () {
expect(getQueueForTypeStub.called).to.be.true;
});
it('should call handleWorkItemUpdateWithJobId once', async function () {
expect(handleWorkItemUpdateWithJobIdStub.callCount).to.equal(1);
});
});

describe('when the queue has two items', async function () {
this.beforeEach(async function () {
const update1 = { workItemId: 1 };
const update2 = { workItemId: 2 };
const operation = {};
await smallUpdateQueue.purge();
await smallUpdateQueue.sendMessage(JSON.stringify({ update: update1, operation }), '', false);
await smallUpdateQueue.sendMessage(JSON.stringify({ update: update2, operation }), '', false);
await updater.batchProcessQueue(WorkItemQueueType.SMALL_ITEM_UPDATE);
});

it('should call getQueueForType', async function () {
expect(getQueueForTypeStub.called).to.be.true;
});
it('should call handleWorkItemUpdateWithJobId twice', async function () {
expect(handleWorkItemUpdateWithJobIdStub.callCount).to.equal(2);
});
it('should call handleBatchWorkItemUpdates once', async function () {
expect(handleBatchWorkItemUpdatesSpy.callCount).to.equal(1);
});
});
});
});

// describe('Scheduler Worker', async function () {
// const service = 'foo:latest';

// describe('processSchedulerQueue', async function () {
// let getPodsCountForServiceStub: SinonStub;
// let getWorkFromDatabaseStub: SinonStub;
// let getSchedulerQueueStub: SinonStub;
// let getQueueUrlForServiceStub: SinonStub;
// let getQueueForUrlStub: SinonStub;
// const schedulerQueue = new MemoryQueue();
// let serviceQueues;

// before(function () {
// getPodsCountForServiceStub = sinon.stub(k8s, 'getPodsCountForService').callsFake(async function () {
// return 1;
// });
// getWorkFromDatabaseStub = sinon.stub(workItemPolling, 'getWorkFromDatabase').callsFake(async function (_serviceID: string, _logger: Logger) {
// return { workItem: new WorkItem({ id: 1 }) } as WorkItemData;
// });
// getSchedulerQueueStub = sinon.stub(queueFactory, 'getWorkSchedulerQueue').callsFake(function () {
// return schedulerQueue;
// });
// getQueueUrlForServiceStub = sinon.stub(queueFactory, 'getQueueUrlForService').callsFake(function (serviceID: string) { return serviceID; });
// getQueueForUrlStub = sinon.stub(queueFactory, 'getQueueForUrl').callsFake(function (url: string) {
// let queue = serviceQueues[url];
// if (!queue) {
// queue = new MemoryQueue();
// serviceQueues[url] = queue;
// }
// return queue;
// });
// });

// after(function () {
// getPodsCountForServiceStub.restore();
// getWorkFromDatabaseStub.restore();
// getSchedulerQueueStub.restore();
// getQueueForUrlStub.restore();
// getQueueUrlForServiceStub.restore();
// });

// describe('when there is no work on the scheduler queue', async function () {

// beforeEach(async function () {
// await schedulerQueue.purge();
// serviceQueues = {};
// serviceQueues[service] = new MemoryQueue();
// await scheduler.processSchedulerQueue(logger);
// });
// afterEach(async function () {
// await schedulerQueue.purge();
// serviceQueues = {};
// });

// it('does call getSchedulerQueue', async function () {
// expect(getSchedulerQueueStub.called).to.be.true;
// });

// it('does not call getPodsCountForService', async function () {
// expect(getPodsCountForServiceStub.called).to.be.false;
// });

// it('does not call getWorkFromDatabase', async function () {
// expect(getWorkFromDatabaseStub.called).to.be.false;
// });

// it('does not call getQueueForUrl', async function () {
// expect(getQueueForUrlStub.called).to.be.false;
// });

// it('doest not put any messages on the queue', async function () {
// const numMessages = await serviceQueues[service].getApproximateNumberOfMessages();
// expect(numMessages).to.equal(0);
// });
// });

// describe('when there is work on the scheduler queue', async function () {

// beforeEach(async function () {
// await schedulerQueue.purge();
// await schedulerQueue.sendMessage(service);
// serviceQueues = {};
// serviceQueues[service] = new MemoryQueue();
// await scheduler.processSchedulerQueue(logger);
// });
// afterEach(async function () {
// await schedulerQueue.purge();
// serviceQueues = {};
// });

// it('calls getPodsCountForService', async function () {
// expect(getPodsCountForServiceStub.called).to.be.true;
// });

// it('calls getWorkFromDatabase', async function () {
// expect(getWorkFromDatabaseStub.called).to.be.true;
// });

// it('calls getSchedulerQueue', async function () {
// expect(getSchedulerQueueStub.called).to.be.true;
// });

// it('calls getQueueForUrl', async function () {
// expect(getQueueForUrlStub.called).to.be.true;
// });

// it('puts messages on the queue', async function () {
// const numMessages = await serviceQueues[service].getApproximateNumberOfMessages();
// expect(numMessages).to.equal(1);
// });
// });
// });
// });

0 comments on commit 032b1c8

Please sign in to comment.