From 811ee28ff0cef34f071ad6c3186461718b53e451 Mon Sep 17 00:00:00 2001 From: Tiago Garcia Date: Wed, 13 Mar 2019 15:17:27 +0100 Subject: [PATCH] Use config value for status queue name; rename var for tasks queue --- app/src/services/status-queue.service.js | 84 ++++++++++++------------ app/src/services/tasks-queue.service.js | 6 +- config/default.json | 3 +- config/test.json | 1 + 4 files changed, 48 insertions(+), 46 deletions(-) diff --git a/app/src/services/status-queue.service.js b/app/src/services/status-queue.service.js index 0d20401..5d682a4 100644 --- a/app/src/services/status-queue.service.js +++ b/app/src/services/status-queue.service.js @@ -4,13 +4,13 @@ const TaskService = require('services/task.service'); const DatasetService = require('services/dataset.service'); const { execution, status, task } = require('rw-doc-importer-messages'); const ExecutorTaskQueueService = require('services/executor-task-queue.service'); -const { STATUS_QUEUE } = require('app.constants'); -const STATUS = require('app.constants').STATUS; +const config = require('config'); +const { STATUS } = require('app.constants'); class StatusQueueService extends QueueService { constructor() { - super(STATUS_QUEUE, true); + super(config.get('queues.status'), true); this.statusMsg = {}; this.currentTask = {}; } @@ -20,7 +20,7 @@ class StatusQueueService extends QueueService { taskId: this.currentTask._id }; this.currentTask = await TaskService.get(this.statusMsg.taskId); - props.forEach(prop => { + props.forEach((prop) => { const field = Object.keys(prop)[0]; const dbField = prop[field]; // message prop cases @@ -174,44 +174,44 @@ class StatusQueueService extends QueueService { switch (this.statusMsg.type) { - case status.MESSAGE_TYPES.STATUS_INDEX_CREATED: - await this.indexCreated(); - break; - case status.MESSAGE_TYPES.STATUS_READ_DATA: - await this.readData(); - break; - case status.MESSAGE_TYPES.STATUS_BLOCKCHAIN_GENERATED: - await this.blockchainGenerated(); - break; - case status.MESSAGE_TYPES.STATUS_READ_FILE: - await this.readFile(); - break; - case status.MESSAGE_TYPES.STATUS_WRITTEN_DATA: - await this.writtenData(); - break; - case status.MESSAGE_TYPES.STATUS_INDEX_DELETED: - await this.indexDeleted(); - break; - case status.MESSAGE_TYPES.STATUS_PERFORMED_DELETE_QUERY: - await this.performedDeleteQuery(); - break; - case status.MESSAGE_TYPES.STATUS_FINISHED_DELETE_QUERY: - await this.finishedDeleteQuery(); - break; - case status.MESSAGE_TYPES.STATUS_PERFORMED_REINDEX: - await this.performedReindex(); - break; - case status.MESSAGE_TYPES.STATUS_FINISHED_REINDEX: - await this.finishedReindex(); - break; - case status.MESSAGE_TYPES.STATUS_IMPORT_CONFIRMED: - await this.importConfirmed(); - break; - case status.MESSAGE_TYPES.STATUS_ERROR: - await this.error(); - break; - default: - logger.error('Status Message Type not valid'); + case status.MESSAGE_TYPES.STATUS_INDEX_CREATED: + await this.indexCreated(); + break; + case status.MESSAGE_TYPES.STATUS_READ_DATA: + await this.readData(); + break; + case status.MESSAGE_TYPES.STATUS_BLOCKCHAIN_GENERATED: + await this.blockchainGenerated(); + break; + case status.MESSAGE_TYPES.STATUS_READ_FILE: + await this.readFile(); + break; + case status.MESSAGE_TYPES.STATUS_WRITTEN_DATA: + await this.writtenData(); + break; + case status.MESSAGE_TYPES.STATUS_INDEX_DELETED: + await this.indexDeleted(); + break; + case status.MESSAGE_TYPES.STATUS_PERFORMED_DELETE_QUERY: + await this.performedDeleteQuery(); + break; + case status.MESSAGE_TYPES.STATUS_FINISHED_DELETE_QUERY: + await this.finishedDeleteQuery(); + break; + case status.MESSAGE_TYPES.STATUS_PERFORMED_REINDEX: + await this.performedReindex(); + break; + case status.MESSAGE_TYPES.STATUS_FINISHED_REINDEX: + await this.finishedReindex(); + break; + case status.MESSAGE_TYPES.STATUS_IMPORT_CONFIRMED: + await this.importConfirmed(); + break; + case status.MESSAGE_TYPES.STATUS_ERROR: + await this.error(); + break; + default: + logger.error('Status Message Type not valid'); } } diff --git a/app/src/services/tasks-queue.service.js b/app/src/services/tasks-queue.service.js index 6d9af68..b1a0e2c 100644 --- a/app/src/services/tasks-queue.service.js +++ b/app/src/services/tasks-queue.service.js @@ -10,7 +10,7 @@ const config = require('config'); class TasksQueueService extends QueueService { constructor() { - super(config.get('queues.docTasks'), true); + super(config.get('queues.tasks'), true); this.taskMsg = {}; this.task = {}; } @@ -47,7 +47,7 @@ class TasksQueueService extends QueueService { } async consume(msg) { - logger.info(`Message received in ${config.get('queues.docTasks')}`); + logger.info(`Message received in ${config.get('queues.tasks')}`); this.taskMsg = JSON.parse(msg.content.toString()); try { // check if any task is currently running for this dataset @@ -62,7 +62,7 @@ class TasksQueueService extends QueueService { await this.processMessage(); // All OK -> msg sent, so ack emitted await this.channel.ack(msg); - logger.debug(`${config.get('queues.docTasks')} queue message acknowledged`); + logger.debug(`${config.get('queues.tasks')} queue message acknowledged`); } catch (err) { // Error creating entity or sending to queue logger.error(err); diff --git a/config/default.json b/config/default.json index f02fa28..4bcd566 100644 --- a/config/default.json +++ b/config/default.json @@ -9,7 +9,8 @@ "name": "Doc Importer Orchestrator" }, "queues": { - "docTasks": "DOC-TASKS", + "tasks": "DOC-TASKS", + "status": "DOC-STATUS", "executorTasks": "DOC-EXECUTOR-TASKS" } } diff --git a/config/test.json b/config/test.json index 30de0d1..9ff6059 100644 --- a/config/test.json +++ b/config/test.json @@ -15,6 +15,7 @@ }, "queues": { "docTasks": "DOC-TASKS-TEST", + "status": "DOC-STATUS-TEST", "executorTasks": "DOC-EXECUTOR-TASKS-TEST" } }