Skip to content

Commit

Permalink
refactor: basic support for versioned functions
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrokiefer committed Jul 23, 2020
1 parent fba2e42 commit 63993dd
Show file tree
Hide file tree
Showing 21 changed files with 2,834 additions and 1,539 deletions.
235 changes: 235 additions & 0 deletions lib/domain/Functions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
const crypto = require('crypto');
const semver = require('semver');

const log = require('../support/log');
const { StdoutLogStorage, DefaultLogStorage } = require('./LogStorage');

const Metric = require('./Metric');

const Pipeline = require('./Pipeline');
const ErrorTracker = require('./ErrorTracker');

function codeFileName(namespace, codeId, version) {
if (version === undefined || version === 'latest') {
return `${namespace}/${codeId}.js`;
}
return `${namespace}/${codeId}/${version}.js`;
}

class Functions {
constructor(storage, sandbox, req) {
this.storage = storage;
this.sandbox = sandbox;
this.req = req;
}

async updateVersion(namespace, id, version) {
if (!version) {
return;
}

// Latest versions are saved to namespace
let latest = {};
const ns = await this.storage.getNamespace(namespace) || { namespace };

if (ns.latest) {
latest = ns.latest;
}

if (Object.prototype.hasOwnProperty.call(latest, id)) {
const curVersion = latest[id];
if (semver.gt(version, curVersion)) {
latest[id] = version;
}
} else {
latest[id] = version;
}

ns.latest = latest;
await this.storage.putNamespace(namespace, ns);
}

async create(namespace, id, version, code, env) {
let v = version;
const filename = codeFileName(namespace, id, version);

if (version === 'latest') {
v = null;
}

const invalid = this.sandbox.testSyntaxError(filename, code, {
console: new StdoutLogStorage(namespace, id, v).console,
});
if (invalid) {
this.req.log.error(`Failed to post code: ${invalid.error}`);
return {
status: 400,
body: invalid,
};
}

const hash = crypto.createHash('sha1').update(code).digest('hex');
const data = { id, version, code, hash };

if (env) {
data.env = env;
}

try {
await this.storage.putCode(namespace, id, v, data);

await this.updateVersion(namespace, id, v);

return {
status: 200,
body: data,
};
} catch (err) {
log.error(`[${namespace}:${id}:${version}] ${err}`);
return {
status: 500,
body: { error: err.message },
};
}
}

async run(namespace, id, version) {
let v = version;
const filename = codeFileName(namespace, id, version);
const metric = new Metric('function-run');

const ns = await this.storage.getNamespace(namespace);
if (!ns) {
v = null;
} else if (ns.latest && version === 'latest') {
v = ns.latest[id];
}

const logStorage = new DefaultLogStorage(namespace, id, v, this.req);

let code;
try {
code = await this.storage.getCodeByCache(namespace, id, v, {
preCache: (preCode) => {
preCode.script = this.sandbox.compileCode(filename, preCode.code);
return preCode;
},
});

if (!code) {
const errMsg = v ? `Code '${namespace}/${id}/${v}' was not found` : `Code '${namespace}/${id}' was not found`;
return {
status: 404,
body: { error: errMsg },
};
}
} catch (err) {
return {
status: err.statusCode || 500,
body: { error: err.message },
};
}

try {
const options = {
console: logStorage.console,
env: code.env,
};
const result = await this.sandbox.runScript(code.script, this.req, options);

const spent = metric.observeFunctionRun({ namespace, id, version, status: result.status });
logStorage.flush({
status: result.status,
requestTime: spent,
});
return result;
} catch (err) {
logStorage.console.error(`Failed to run function: ${err}`);
logStorage.console.error(err.stack);
const status = err.statusCode || 500;
const errResult = {
status,
body: { error: err.message },
};

const spent = metric.observeFunctionRun({ namespace, id, version, status });

const logResult = logStorage.flush({
status,
requestTime: spent,
});

const { sentryDSN } = code;

const extra = Object.assign({ body: this.req.body }, logResult || {});
const errTracker = new ErrorTracker({
sentryDSN,
filename,
extra,
tags: { codeHash: code.hash },
code: code.code,
});
errTracker.notify(err);
return errResult;
}
}

async runPipeline(stepsInput) {
const metric = new Metric('pipeline-run');

const stepsPromises = stepsInput.map(async (step) => {
const [namespace, id, version] = step.split('/', 3);
const ns = await this.storage.getNamespace(namespace);

// Return versioned function
if (version !== undefined || version !== 'latest') {
return { namespace, id, version };
}

// Handle latest and unversioned functions
let v = version;
if (!ns) {
v = null;
} else if (ns.latest && version === 'latest') {
v = ns.latest[id];
}
return { namespace, id, version: v };
});

const steps = await Promise.all(stepsPromises);

try {
const codes = await this.storage.getCodesByCache(steps, {
preCache: (code) => {
const filename = codeFileName(code.namespace, code.id, code.version);
code.script = this.sandbox.compileCode(filename, code.code);
return code;
},
});

for (let i = 0; i < codes.length; i += 1) {
if (!codes[i]) {
const { namespace, id, version } = steps[i];
const codeName = version ? `${namespace}/${id}/${version}` : `${namespace}/${id}`;
const e = new Error(`Code '${codeName}' was not found`);
e.statusCode = 404;
throw e;
}
}

const result = await new Pipeline(this.sandbox, this.req, codes).run();
metric.observePipelineRun(result.status);
return result;
} catch (err) {
const result = {
status: err.statusCode || 500,
body: { error: err.message },
};

metric.observePipelineRun(result.status);
return result;
}
}
}

module.exports = Functions;
8 changes: 4 additions & 4 deletions lib/domain/LogStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ function extractHTTPFields(req) {

class StdoutLogStorage {
/* eslint class-methods-use-this: ["error", { "exceptMethods": ["flush"] }] */
constructor(namespace, id) {
this.console = new PrefixLog(`namespace:${namespace}, id:${id}`);
constructor(namespace, id, version) {
this.console = new PrefixLog(`namespace:${namespace}, id:${id}, version:${version}`);
}

flush() {}
}

class GelfLogStorage {
constructor(namespace, id, req) {
constructor(namespace, id, version, req) {
this.stream = new MemoryStream(config.log.maxFullMessage);
this.console = new PrefixLog(null, this.stream, this.stream);
this.file = `${namespace}/${id}.js`;
this.file = `${namespace}/${id}/${version}.js`;
this.namespace = namespace;
this.fields = extractHTTPFields(req);
this.gelfClients = GelfLogStorage.prepareGelfClients();
Expand Down
10 changes: 5 additions & 5 deletions lib/domain/Metric.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const functionRunHistogram = new prometheusClient.Histogram({
name: 'backstage_functions_function_run_duration_seconds',
help: 'How many time spent to run a function in seconds',
buckets,
labelNames: ['namespace', 'id'],
labelNames: ['namespace', 'id', 'version'],
});

const functionOverviewRunHistogram = new prometheusClient.Histogram({
Expand Down Expand Up @@ -61,7 +61,7 @@ const functionOverviewRunCounter = new prometheusClient.Counter({
const functionRunCounter = new prometheusClient.Counter({
name: 'backstage_functions_function_run_total',
help: 'What is the status code of a function',
labelNames: ['namespace', 'id', 'status'],
labelNames: ['namespace', 'id', 'version', 'status'],
});

const functionPipelineCounter = new prometheusClient.Counter({
Expand All @@ -81,15 +81,15 @@ class Metric {
this.start = Date.now();
}

observeFunctionRun({ namespace, id, status }) {
observeFunctionRun({ namespace, id, version, status }) {
const spent = (Date.now() - this.start) / 1000;
const normalizedStatusCode = normalizeStatusCode(status);

functionOverviewRunHistogram.observe(spent);
functionOverviewRunCounter.labels(normalizedStatusCode).inc();

functionRunHistogram.labels(namespace, id).observe(spent);
functionRunCounter.labels(namespace, id, normalizedStatusCode).inc();
functionRunHistogram.labels(namespace, id, version).observe(spent);
functionRunCounter.labels(namespace, id, version, normalizedStatusCode).inc();

return spent;
}
Expand Down
11 changes: 7 additions & 4 deletions lib/domain/Pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ const Metric = require('./Metric');
const { DefaultLogStorage } = require('./LogStorage');
const ErrorTracker = require('./ErrorTracker');

function codeFileName(namespace, codeId) {
return `${namespace}/${codeId}.js`;
function codeFileName(namespace, codeId, version) {
return `${namespace}/${codeId}/${version}.js`;
}

class Pipeline {
Expand All @@ -25,8 +25,8 @@ class Pipeline {
this.steps = nextSteps;

const metric = new Metric('function-run');
const filename = codeFileName(step.namespace, step.id);
const logStorage = new DefaultLogStorage(step.namespace, step.id, this.req);
const filename = codeFileName(step.namespace, step.id, step.version);
const logStorage = new DefaultLogStorage(step.namespace, step.id, step.version, this.req);

const options = {
console: logStorage.console,
Expand All @@ -38,6 +38,7 @@ class Pipeline {
const spent = metric.observeFunctionRun({
namespace: step.namespace,
id: step.id,
version: step.version,
status: result.status,
});

Expand All @@ -59,6 +60,7 @@ class Pipeline {
} else {
result.body.namespace = step.namespace;
result.body.functionId = step.id;
result.body.version = step.version;

return result;
}
Expand All @@ -74,6 +76,7 @@ class Pipeline {
const spent = metric.observeFunctionRun({
namespace: step.namespace,
id: step.id,
version: step.version,
status,
});

Expand Down
Loading

0 comments on commit 63993dd

Please sign in to comment.