diff --git a/src/common/utils.js b/src/common/utils.js index 6cd4163de..8ca0f1f33 100644 --- a/src/common/utils.js +++ b/src/common/utils.js @@ -84,9 +84,37 @@ return lines; }; + const defer = function() { + const deferred = {resolve: null, reject: null}; + deferred.promise = new Promise((resolve, reject) => { + deferred.resolve = resolve; + deferred.reject = reject; + }); + return deferred; + }; + + const withTimeout = function(fn, err, time=1500) { + return async function() { + let deferred = defer(); + let result = null; + + setTimeout(() => { + if (!result) { + deferred.reject(err); + } + }, time); + + result = await fn.call(this); + deferred.resolve(result); + return deferred.promise; + }; + }; + return { - getSetterSchema: getSetterSchema, - resolveCarriageReturns: resolveCarriageReturns, - abbr: abbr + getSetterSchema, + resolveCarriageReturns, + abbr, + withTimeout, + defer, }; })); diff --git a/src/common/viz/Execute.js b/src/common/viz/Execute.js index 899ee0a66..34cc3a960 100644 --- a/src/common/viz/Execute.js +++ b/src/common/viz/Execute.js @@ -93,8 +93,9 @@ define([ const onPluginInitiated = (sender, event) => { this.client.removeEventListener(this._client.CONSTANTS.PLUGIN_INITIATED, onPluginInitiated); - this.client.setAttribute(node.getId(), 'executionId', event.executionId); - deferred.resolve(event.executionId); + const {executionId} = event; + this.client.sendMessageToPlugin(executionId, 'executionId', executionId); + deferred.resolve(executionId); }; this.client.addEventListener( diff --git a/src/plugins/ExecuteJob/ExecuteJob.js b/src/plugins/ExecuteJob/ExecuteJob.js index e66e9faed..243e68924 100644 --- a/src/plugins/ExecuteJob/ExecuteJob.js +++ b/src/plugins/ExecuteJob/ExecuteJob.js @@ -71,6 +71,10 @@ define([ this.deletions = []; this.createIdToMetadataId = {}; this.logManager = null; + + const deferred = Q.defer(); + this.executionId = deferred.promise; + this.setExecutionId = deferred.resolve; }; ExecuteJob.metadata = pluginMetadata; @@ -114,15 +118,16 @@ define([ ExecuteJob.prototype.main = async function (callback) { // Check the activeNode to make sure it is a valid node var type = this.core.getMetaType(this.activeNode), - typeName = type && this.getAttribute(type, 'name'), - execNode; + typeName = type && this.getAttribute(type, 'name'); if (typeName !== 'Job') { return callback(new Error(`Cannot execute ${typeName} (expected Job)`), this.result); } + this.setAttribute(this.activeNode, 'executionId', await this.getExecutionId()); + // Set the parent execution to 'running' - execNode = this.core.getParent(this.activeNode); + const execNode = this.core.getParent(this.activeNode); this.setAttribute(execNode, 'status', 'running'); this._callback = callback; @@ -187,6 +192,16 @@ define([ return JSON.parse(this.getAttribute(node, 'jobInfo')).hash; }; + ExecuteJob.prototype.getExecutionId = utils.withTimeout(async function() { + return await this.executionId; + }, new Error('Timeout: Did not receive execution ID')); + + ExecuteJob.prototype.onMessage = function(messageId, content) { + if (messageId === 'executionId') { + this.setExecutionId(content); + } + }; + ExecuteJob.prototype.onAbort = ExecuteJob.prototype.onUserCancelDetected = function () { this.logger.info('Received Abort. Canceling jobs.'); diff --git a/src/plugins/ExecutePipeline/ExecutePipeline.js b/src/plugins/ExecutePipeline/ExecutePipeline.js index 0853980b3..44326b20d 100644 --- a/src/plugins/ExecutePipeline/ExecutePipeline.js +++ b/src/plugins/ExecutePipeline/ExecutePipeline.js @@ -122,6 +122,7 @@ define([ return callback('Current node is not a Pipeline or Execution!', this.result); } + this.setAttribute(this.activeNode, 'executionId', await this.getExecutionId()); this._callback = callback; this.currentForkName = null;