Skip to content

Commit

Permalink
Pass the execution ID to the plugin. Fixes #1259 (#1293)
Browse files Browse the repository at this point in the history
* Pass the execution ID to the plugin. Fixes #1259

* WIP Updated plugin tests
  • Loading branch information
brollb authored Oct 23, 2019
1 parent b3429a3 commit d4ecd5a
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 15 deletions.
34 changes: 31 additions & 3 deletions src/common/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,37 @@
return lines;
};

const defer = function() {
const deferred = {resolve: null, reject: null};
deferred.promise = new Promise((resolve, reject) => {
deferred.resolve = resolve;
deferred.reject = reject;
});
return deferred;
};

const withTimeout = function(fn, err, time=1500) {
return async function() {
let deferred = defer();
let result = null;

setTimeout(() => {
if (!result) {
deferred.reject(err);
}
}, time);

result = await fn.call(this);
deferred.resolve(result);
return deferred.promise;
};
};

return {
getSetterSchema: getSetterSchema,
resolveCarriageReturns: resolveCarriageReturns,
abbr: abbr
getSetterSchema,
resolveCarriageReturns,
abbr,
withTimeout,
defer,
};
}));
5 changes: 3 additions & 2 deletions src/common/viz/Execute.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ define([

const onPluginInitiated = (sender, event) => {
this.client.removeEventListener(this._client.CONSTANTS.PLUGIN_INITIATED, onPluginInitiated);
this.client.setAttribute(node.getId(), 'executionId', event.executionId);
deferred.resolve(event.executionId);
const {executionId} = event;
this.client.sendMessageToPlugin(executionId, 'executionId', executionId);
deferred.resolve(executionId);
};

this.client.addEventListener(
Expand Down
21 changes: 18 additions & 3 deletions src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ define([
this.deletions = [];
this.createIdToMetadataId = {};
this.logManager = null;

const deferred = Q.defer();
this.executionId = deferred.promise;
this.setExecutionId = deferred.resolve;
};

ExecuteJob.metadata = pluginMetadata;
Expand Down Expand Up @@ -114,15 +118,16 @@ define([
ExecuteJob.prototype.main = async function (callback) {
// Check the activeNode to make sure it is a valid node
var type = this.core.getMetaType(this.activeNode),
typeName = type && this.getAttribute(type, 'name'),
execNode;
typeName = type && this.getAttribute(type, 'name');

if (typeName !== 'Job') {
return callback(new Error(`Cannot execute ${typeName} (expected Job)`), this.result);
}

this.setAttribute(this.activeNode, 'executionId', await this.getExecutionId());

// Set the parent execution to 'running'
execNode = this.core.getParent(this.activeNode);
const execNode = this.core.getParent(this.activeNode);
this.setAttribute(execNode, 'status', 'running');

this._callback = callback;
Expand Down Expand Up @@ -187,6 +192,16 @@ define([
return JSON.parse(this.getAttribute(node, 'jobInfo')).hash;
};

ExecuteJob.prototype.getExecutionId = utils.withTimeout(async function() {
return await this.executionId;
}, new Error('Timeout: Did not receive execution ID'));

ExecuteJob.prototype.onMessage = function(messageId, content) {
if (messageId === 'executionId') {
this.setExecutionId(content);
}
};

ExecuteJob.prototype.onAbort =
ExecuteJob.prototype.onUserCancelDetected = function () {
this.logger.info('Received Abort. Canceling jobs.');
Expand Down
1 change: 1 addition & 0 deletions src/plugins/ExecutePipeline/ExecutePipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ define([
return callback('Current node is not a Pipeline or Execution!', this.result);
}

this.setAttribute(this.activeNode, 'executionId', await this.getExecutionId());
this._callback = callback;
this.currentForkName = null;

Expand Down
6 changes: 4 additions & 2 deletions test/unit/plugins/ExecuteJob/ExecuteJob.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ describe('ExecuteJob', function () {
var plugin,
node,
preparePlugin = function(done) {
var context = {
const config = {compute: {id: 'gme'}};
const context = {
project: project,
commitHash: commitHash,
namespace: 'pipeline',
Expand All @@ -77,7 +78,8 @@ describe('ExecuteJob', function () {
return manager.initializePlugin(pluginName)
.then(plugin_ => {
plugin = plugin_;
return manager.configurePlugin(plugin, {}, context);
plugin.executionId = Promise.resolve('some_execution_id');
return manager.configurePlugin(plugin, config, context);
})
.then(() => node = plugin.activeNode)
.nodeify(done);
Expand Down
13 changes: 8 additions & 5 deletions test/unit/plugins/ExecutePipeline/ExecutePipeline.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* globals describe, before, after */
/* globals */
/*jshint node:true, mocha:true*/

describe('ExecutePipeline', function () {
Expand Down Expand Up @@ -62,15 +62,16 @@ describe('ExecutePipeline', function () {
});

it.skip('should execute single job', function (done) {
var context = {
const config = {compute: {id: 'gme'}};
const context = {
project: project,
commitHash: commitHash,
namespace: 'pipeline',
branchName: 'test',
activeNode: '/f/5'
};

manager.executePlugin(pluginName, {}, context, function (err, pluginResult) {
manager.executePlugin(pluginName, config, context, function (err, pluginResult) {
expect(err).to.equal(null);
expect(typeof pluginResult).to.equal('object');
expect(pluginResult.success).to.equal(true);
Expand Down Expand Up @@ -107,7 +108,8 @@ describe('ExecutePipeline', function () {
});

var preparePlugin = function(done) {
var context = {
const config = {compute: {id: 'gme'}};
const context = {
project: project,
commitHash: commitHash,
namespace: 'pipeline',
Expand All @@ -120,7 +122,8 @@ describe('ExecutePipeline', function () {
plugin = plugin_;
plugin.checkExecutionEnv = () => Q();
plugin.startExecHeartBeat = () => {};
return manager.configurePlugin(plugin, {}, context);
plugin.executionId = Promise.resolve('some_execution_id');
return manager.configurePlugin(plugin, config, context);
})
.then(() => node = plugin.activeNode)
.nodeify(done);
Expand Down

0 comments on commit d4ecd5a

Please sign in to comment.