Skip to content

Commit

Permalink
Add support for dataset append operation
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagojsag committed May 30, 2019
1 parent ed5f0a8 commit 7cacdb6
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 52 deletions.
28 changes: 17 additions & 11 deletions app/src/services/status-queue.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,23 @@ class StatusQueueService extends QueueService {
}

async importConfirmed() {
if (this.currentTask.type === task.MESSAGE_TYPES.TASK_OVERWRITE) {
await this.sendExecutionTask(execution.MESSAGE_TYPES.EXECUTION_DELETE_INDEX, [{ index: 'message.index' }]);
} else if (this.currentTask.type === task.MESSAGE_TYPES.TASK_CONCAT) {
await this.sendExecutionTask(execution.MESSAGE_TYPES.EXECUTION_REINDEX, [{ sourceIndex: 'message.index' }, { targetIndex: 'index' }]);
} else {
await TaskService.update(this.currentTask._id, {
status: TASK_STATUS.SAVED
});
await DatasetService.update(this.currentTask.datasetId, {
status: DATASET_STATUS.SAVED,
});
switch (this.currentTask.type) {

case task.MESSAGE_TYPES.TASK_OVERWRITE:
await this.sendExecutionTask(execution.MESSAGE_TYPES.EXECUTION_DELETE_INDEX, [{ index: 'message.index' }]);
break;
case task.MESSAGE_TYPES.TASK_CONCAT:
await this.sendExecutionTask(execution.MESSAGE_TYPES.EXECUTION_REINDEX, [{ sourceIndex: 'message.index' }, { targetIndex: 'index' }]);
break;
default:
await TaskService.update(this.currentTask._id, {
status: TASK_STATUS.SAVED
});
await DatasetService.update(this.currentTask.datasetId, {
status: DATASET_STATUS.SAVED
});
break;

}
}

Expand Down
3 changes: 3 additions & 0 deletions app/src/services/tasks-queue.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class TasksQueueService extends QueueService {
case task.MESSAGE_TYPES.TASK_CONCAT:
executorTaskMessage = execution.createMessage(execution.MESSAGE_TYPES.EXECUTION_CONCAT, this.taskMsg);
break;
case task.MESSAGE_TYPES.TASK_APPEND:
executorTaskMessage = execution.createMessage(execution.MESSAGE_TYPES.EXECUTION_APPEND, this.taskMsg);
break;
case task.MESSAGE_TYPES.TASK_DELETE:
executorTaskMessage = execution.createMessage(execution.MESSAGE_TYPES.EXECUTION_DELETE, this.taskMsg);
break;
Expand Down
19 changes: 1 addition & 18 deletions app/test/e2e/queue-status-finished-reindex.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ describe('STATUS_FINISHED_REINDEX handling process', () => {
.patch(`/v1/dataset/${fakeTask1.datasetId}`, { status: 1, tableName: fakeTask1.index })
.reply(200, {
data: {
id: '6a994bd1-6f88-48dc-a08e-d8c1c90272c4',
id: fakeTask1.datasetId,
type: 'dataset',
attributes: {
name: 'Resource Watch datasets list',
Expand Down Expand Up @@ -168,23 +168,6 @@ describe('STATUS_FINISHED_REINDEX handling process', () => {
log.should.have.property('id').and.equal(message.id);
log.should.have.property('taskId').and.equal(message.taskId);
log.should.have.property('type').and.equal(message.type);


const validateExecutorTasksQueueMessages = async (msg) => {
const content = JSON.parse(msg.content.toString());
content.should.have.property('id');
content.should.have.property('type').and.equal(execution.MESSAGE_TYPES.EXECUTION_DELETE_INDEX);
content.should.have.property('taskId').and.equal(message.taskId);
content.should.have.property('index').and.equal(fakeTask1.message.index);

await channel.ack(msg);
};

process.on('unhandledRejection', (error) => {
should.fail(error);
});

await channel.consume(config.get('queues.executorTasks'), validateExecutorTasksQueueMessages);
});

afterEach(async () => {
Expand Down
100 changes: 96 additions & 4 deletions app/test/e2e/queue-status-import-confirmed.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,6 @@ describe('STATUS_IMPORT_CONFIRMED handling process', () => {
createdTask.should.have.property('datasetId').and.equal(fakeTask1.datasetId);
createdTask.should.have.property('createdAt').and.be.a('date');
createdTask.should.have.property('updatedAt').and.be.a('date');

process.on('unhandledRejection', (error) => {
should.fail(error);
});
});

it('Consume a STATUS_IMPORT_CONFIRMED message for a TASK_CONCAT should create a EXECUTION_REINDEX message (happy case)', async () => {
Expand Down Expand Up @@ -223,6 +219,102 @@ describe('STATUS_IMPORT_CONFIRMED handling process', () => {
await channel.consume(config.get('queues.executorTasks'), validateExecutorTasksQueueMessages);
});

it('Consume a STATUS_IMPORT_CONFIRMED message for a TASK_APPEND should set the dataset and task to SAVED status (happy case)', async () => {
const fakeTask1 = await new Task(createTask(appConstants.TASK_STATUS.INIT, task.MESSAGE_TYPES.TASK_APPEND)).save();

const message = {
id: 'e492cef7-e287-4bd8-9128-f034a3b531ef',
type: 'STATUS_IMPORT_CONFIRMED',
taskId: fakeTask1.id,
lastCheckedDate: '2019-03-29T08:43:08.091Z'
};

nock(process.env.CT_URL)
.patch(`/v1/dataset/${fakeTask1.datasetId}`, { status: 1 })
.reply(200, {
data: {
id: '6a994bd1-6f88-48dc-a08e-d8c1c90272c4',
type: 'dataset',
attributes: {
name: 'Resource Watch datasets list',
slug: 'Resource-Watch-datasets-list_25',
type: null,
subtitle: null,
application: ['rw'],
dataPath: 'data',
attributesPath: null,
connectorType: 'document',
provider: 'json',
userId: '1a10d7c6e0a37126611fd7a7',
connectorUrl: 'http://api.resourcewatch.org/dataset',
tableName: fakeTask1.index,
status: 'saved',
published: true,
overwrite: false,
verified: false,
blockchain: {},
mainDateField: null,
env: 'production',
geoInfo: false,
protected: false,
legend: {
nested: [],
country: [],
region: [],
date: [],
integer: [],
short: [],
byte: [],
double: [],
float: [],
half_float: [],
scaled_float: [],
boolean: [],
binary: [],
string: [],
text: [],
keyword: []
},
clonedHost: {},
errorMessage: '',
taskId: '/v1/doc-importer/task/4e451d0e-a464-448f-9dc3-68cc493f0193',
updatedAt: '2019-03-30T06:15:26.762Z',
dataLastUpdated: null,
widgetRelevantProps: [],
layerRelevantProps: []
}
}
});

const preStatusQueueStatus = await channel.assertQueue(config.get('queues.status'));
preStatusQueueStatus.messageCount.should.equal(0);
const existingTaskList = await Task.find({}).exec();
existingTaskList.should.be.an('array').and.have.lengthOf(1);

await channel.sendToQueue(config.get('queues.status'), Buffer.from(JSON.stringify(message)));

// Give the code 3 seconds to do its thing
await new Promise(resolve => setTimeout(resolve, 3000));

const postQueueStatus = await channel.assertQueue(config.get('queues.status'));
postQueueStatus.messageCount.should.equal(0);

const createdTasks = await Task.find({}).exec();

createdTasks.should.be.an('array').and.have.lengthOf(1);
const createdTask = createdTasks[0];
createdTask.should.have.property('status').and.equal(appConstants.TASK_STATUS.SAVED);
createdTask.should.have.property('reads').and.equal(0);
createdTask.should.have.property('writes').and.equal(0);
createdTask.should.have.property('logs').and.be.an('array').and.have.lengthOf(1);
createdTask.should.have.property('_id').and.equal(fakeTask1.id);
createdTask.should.have.property('type').and.equal(task.MESSAGE_TYPES.TASK_APPEND);
createdTask.should.have.property('message').and.be.an('object');
createdTask.should.have.property('datasetId').and.equal(fakeTask1.datasetId);
createdTask.should.have.property('createdAt').and.be.a('date');
createdTask.should.have.property('updatedAt').and.be.a('date');
});

afterEach(async () => {
Task.remove({}).exec();

Expand Down
Loading

0 comments on commit 7cacdb6

Please sign in to comment.