Skip to content

Commit

Permalink
Use config value for status queue name; rename var for tasks queue
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagojsag committed Mar 13, 2019
1 parent 3eed03e commit 811ee28
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 46 deletions.
84 changes: 42 additions & 42 deletions app/src/services/status-queue.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ const TaskService = require('services/task.service');
const DatasetService = require('services/dataset.service');
const { execution, status, task } = require('rw-doc-importer-messages');
const ExecutorTaskQueueService = require('services/executor-task-queue.service');
const { STATUS_QUEUE } = require('app.constants');
const STATUS = require('app.constants').STATUS;
const config = require('config');
const { STATUS } = require('app.constants');

class StatusQueueService extends QueueService {

constructor() {
super(STATUS_QUEUE, true);
super(config.get('queues.status'), true);
this.statusMsg = {};
this.currentTask = {};
}
Expand All @@ -20,7 +20,7 @@ class StatusQueueService extends QueueService {
taskId: this.currentTask._id
};
this.currentTask = await TaskService.get(this.statusMsg.taskId);
props.forEach(prop => {
props.forEach((prop) => {
const field = Object.keys(prop)[0];
const dbField = prop[field];
// message prop cases
Expand Down Expand Up @@ -174,44 +174,44 @@ class StatusQueueService extends QueueService {

switch (this.statusMsg.type) {

case status.MESSAGE_TYPES.STATUS_INDEX_CREATED:
await this.indexCreated();
break;
case status.MESSAGE_TYPES.STATUS_READ_DATA:
await this.readData();
break;
case status.MESSAGE_TYPES.STATUS_BLOCKCHAIN_GENERATED:
await this.blockchainGenerated();
break;
case status.MESSAGE_TYPES.STATUS_READ_FILE:
await this.readFile();
break;
case status.MESSAGE_TYPES.STATUS_WRITTEN_DATA:
await this.writtenData();
break;
case status.MESSAGE_TYPES.STATUS_INDEX_DELETED:
await this.indexDeleted();
break;
case status.MESSAGE_TYPES.STATUS_PERFORMED_DELETE_QUERY:
await this.performedDeleteQuery();
break;
case status.MESSAGE_TYPES.STATUS_FINISHED_DELETE_QUERY:
await this.finishedDeleteQuery();
break;
case status.MESSAGE_TYPES.STATUS_PERFORMED_REINDEX:
await this.performedReindex();
break;
case status.MESSAGE_TYPES.STATUS_FINISHED_REINDEX:
await this.finishedReindex();
break;
case status.MESSAGE_TYPES.STATUS_IMPORT_CONFIRMED:
await this.importConfirmed();
break;
case status.MESSAGE_TYPES.STATUS_ERROR:
await this.error();
break;
default:
logger.error('Status Message Type not valid');
case status.MESSAGE_TYPES.STATUS_INDEX_CREATED:
await this.indexCreated();
break;
case status.MESSAGE_TYPES.STATUS_READ_DATA:
await this.readData();
break;
case status.MESSAGE_TYPES.STATUS_BLOCKCHAIN_GENERATED:
await this.blockchainGenerated();
break;
case status.MESSAGE_TYPES.STATUS_READ_FILE:
await this.readFile();
break;
case status.MESSAGE_TYPES.STATUS_WRITTEN_DATA:
await this.writtenData();
break;
case status.MESSAGE_TYPES.STATUS_INDEX_DELETED:
await this.indexDeleted();
break;
case status.MESSAGE_TYPES.STATUS_PERFORMED_DELETE_QUERY:
await this.performedDeleteQuery();
break;
case status.MESSAGE_TYPES.STATUS_FINISHED_DELETE_QUERY:
await this.finishedDeleteQuery();
break;
case status.MESSAGE_TYPES.STATUS_PERFORMED_REINDEX:
await this.performedReindex();
break;
case status.MESSAGE_TYPES.STATUS_FINISHED_REINDEX:
await this.finishedReindex();
break;
case status.MESSAGE_TYPES.STATUS_IMPORT_CONFIRMED:
await this.importConfirmed();
break;
case status.MESSAGE_TYPES.STATUS_ERROR:
await this.error();
break;
default:
logger.error('Status Message Type not valid');

}
}
Expand Down
6 changes: 3 additions & 3 deletions app/src/services/tasks-queue.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const config = require('config');
class TasksQueueService extends QueueService {

constructor() {
super(config.get('queues.docTasks'), true);
super(config.get('queues.tasks'), true);
this.taskMsg = {};
this.task = {};
}
Expand Down Expand Up @@ -47,7 +47,7 @@ class TasksQueueService extends QueueService {
}

async consume(msg) {
logger.info(`Message received in ${config.get('queues.docTasks')}`);
logger.info(`Message received in ${config.get('queues.tasks')}`);
this.taskMsg = JSON.parse(msg.content.toString());
try {
// check if any task is currently running for this dataset
Expand All @@ -62,7 +62,7 @@ class TasksQueueService extends QueueService {
await this.processMessage();
// All OK -> msg sent, so ack emitted
await this.channel.ack(msg);
logger.debug(`${config.get('queues.docTasks')} queue message acknowledged`);
logger.debug(`${config.get('queues.tasks')} queue message acknowledged`);
} catch (err) {
// Error creating entity or sending to queue
logger.error(err);
Expand Down
3 changes: 2 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"name": "Doc Importer Orchestrator"
},
"queues": {
"docTasks": "DOC-TASKS",
"tasks": "DOC-TASKS",
"status": "DOC-STATUS",
"executorTasks": "DOC-EXECUTOR-TASKS"
}
}
1 change: 1 addition & 0 deletions config/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
},
"queues": {
"docTasks": "DOC-TASKS-TEST",
"status": "DOC-STATUS-TEST",
"executorTasks": "DOC-EXECUTOR-TASKS-TEST"
}
}

0 comments on commit 811ee28

Please sign in to comment.