Skip to content
This repository has been archived by the owner on Oct 5, 2022. It is now read-only.

Commit

Permalink
Merge pull request #27 from nearform/fix-parallel-runs
Browse files Browse the repository at this point in the history
Fix parallel runs
  • Loading branch information
temsa authored Jun 15, 2018
2 parents 5c3e0a9 + f9db24f commit 57865ef
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 108 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,8 @@ typings/
# next.js build output
.next

# orig files
*.orig

# test results
.ci-test-results/jest/results.xml
2 changes: 1 addition & 1 deletion example.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const esLog = function () {
this.trace = (...args) => console.log(...args)
this.debug = (...args) => console.debug(...args)
this.info = (...args) => console.info(...args)
this.warn = (...args) => console.warn(...args)
this.warning = (...args) => console.warn(...args)
this.error = (...args) => console.error(...args)
this.fatal = (...args) => console.error('💀', ...args)
}
Expand Down
7 changes: 7 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const start = require('./server')
const lib = require('./lib/indices')
const {connect} = require('./lib/es')
const {log} = require('./lib/logger')
const {store} = require('./lib/state')

// start the micro service
exports.start = start
Expand All @@ -11,5 +12,11 @@ exports.create = (name, {indexTemplate} = {}) => lib.create(name, {body: indexTe
exports.update = (name, {indexTemplate} = {}) => lib.update(name, {body: indexTemplate})
exports.delete = name => lib.delete(name)

// override ES client or get a reference to it
exports.connect = connect

// override the logger or use it
exports.logger = log

// override the store or use it
exports.store = store
207 changes: 119 additions & 88 deletions lib/indices.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,130 +23,161 @@ const {
rollbackAliasSwitch
} = require('./rollbacks')

const {
startIndexCreation,
startIndexUpdate,
startIndexDeletion,
finishIndexCreation,
finishIndexUpdate,
finishIndexDeletion
} = require('./state')

const suffix = (name, date) => `${name}-${(date || new Date()).toISOString().toLowerCase()}`
exports.suffix = suffix

const getOps = async (opsFn) => {
const {ops} = await opsFn()
return ops
}

exports.create = async (name, { body } = { body: {} }) => {
const ops = {}
const index = suffix(name)
await startIndexCreation(name)
try {
ops.preChecks = await Promise.all([
getOps((ops) => checkAliasDoesNotExists(name, ops)),
getOps((ops) => checkIndexDoesNotExist(index, ops))
])
} catch (error) {
error.message = `The index (${index}) or alias (${name}) were already existing: ` + error.message
error.ops = ops
throw Boom.boomify(error, {statusCode: 409})
}
try {
ops.preChecks = await Promise.all([
getOps((ops) => checkAliasDoesNotExists(name, ops)),
getOps((ops) => checkIndexDoesNotExist(index, ops))
])
} catch (error) {
error.message = `The index (${index}) or alias (${name}) were already existing: ` + error.message
error.ops = ops
throw Boom.boomify(error, {statusCode: 409})
}

if (shouldUpdateTemplate(body)) {
await updateTemplate(name, body, ops)
}
if (shouldUpdateTemplate(body)) {
await updateTemplate(name, body, ops)
}

const rollbackFromIndexCreation = async (name, index, error, origin) => {
try {
await rollbackIndexCreation({index}, error, origin)
} finally {
if (ops.template) {
await rollbackIndexTemplateCreation({name}, error, origin)
const rollbackFromIndexCreation = name => async (index, error, origin) => {
try {
await rollbackIndexCreation({index}, error, origin)
} finally {
if (ops.template) {
await rollbackIndexTemplateCreation({name}, error, origin)
}
}
}
}

await createIndex(index, rollbackFromIndexCreation, ops)
const noRollback = () => {}
await checkAliasDoesNotExists(name, noRollback, ops)
await createIndex(index, rollbackFromIndexCreation(name), ops)

const rollbackFromAliasCreation = async (name, index, error, origin) => {
try {
await rollbackAliasCreation({index, name}, error, origin)
} finally {
await rollbackFromIndexCreation(name, index, error, origin)
const rollbackFromAliasCreation = async (name, index, error, origin) => {
try {
await rollbackAliasCreation({index, name}, error, origin)
} finally {
await rollbackFromIndexCreation(name, index, error, origin)
}
}
}

await createAlias(name, index, rollbackFromAliasCreation, ops)

return {name, index, ops}
await checkAliasDoesNotExists(name, rollbackFromAliasCreation, ops)
await createAlias(name, index, rollbackFromAliasCreation, ops)
const {sourceIndex} = await findAliasIndex(name, ops)
if (index !== sourceIndex) {
const origin = `"${name}" After Index "${index}" creation, the Alias was already bound to "${sourceIndex}", if you want to enforce a new index, either 'delete' the existing one, or update it`
const error = Boom.conflict()
await rollbackFromAliasCreation(name, index, error, origin)
}
return {name, index, ops}
} finally {
await finishIndexCreation(name)
}
}

exports.update = async (name, { body } = { body: {} }) => {
const ops = {}
await checkAliasAlreadyExists(name, ops)
const {sourceIndex} = await findAliasIndex(name, ops)
await startIndexUpdate(name)
try {
await checkAliasAlreadyExists(name, ops)
const {sourceIndex} = await findAliasIndex(name, ops)
if (shouldUpdateTemplate(body)) {
await updateTemplate(name, body, ops)
}

if (shouldUpdateTemplate(body)) {
await updateTemplate(name, body, ops)
}
const index = suffix(name)
const rollbackFromIndexCreation = async (name, index, error, origin) => {
await rollbackIndexCreation({index}, error, origin)
if (ops.template) {
await rollbackIndexTemplateCreation({name}, error, origin)
}
}

const index = suffix(name)
const rollbackFromIndexCreation = async (name, index, error, origin) => {
await rollbackIndexCreation({index}, error, origin)
if (ops.template) {
await rollbackIndexTemplateCreation({name}, error, origin)
await createIndex(index, rollbackFromIndexCreation, ops)

const rollbackFromReindex = async (name, index, error, origin) => {
await rollbackIndexCreation({index}, error, origin)
if (ops.template) {
await rollbackIndexTemplateCreation({name}, error, origin)
}
}
}

await createIndex(index, rollbackFromIndexCreation, ops)
await reindex(name, sourceIndex, index, rollbackFromReindex, ops)
try {
ops.postReindex = await Promise.all([
await getOps((ops) => checkIndexAlreadyExists(sourceIndex, ops)),
await getOps((ops) => checkIndexAlreadyExists(index, ops)),
await getOps((ops) => checkAliasAlreadyExists(name, ops))
])
} catch (error) {
const origin = `the checks after ${sourceIndex} reindexation to ${index}, and before the switch of ${name} alias`
error.message = `The original index (${sourceIndex})/destination index (${index})/alias (${name}) status after reindexation was not consistent and has probably been altered by a third party: ` + error.message
error.ops = ops
await rollbackFromReindex(name, index, error, origin)
throw error
}

const rollbackFromReindex = async (name, index, error, origin) => {
await rollbackIndexCreation({index}, error, origin)
if (ops.template) {
await rollbackIndexTemplateCreation({name}, error, origin)
const rollbackFromAliasSwitch = async (name, sourceIndex, index, error, origin) => {
await rollbackFromReindex(name, index, error, origin) // is this really a good idea ?
await rollbackAliasSwitch({name, sourceIndex, index}, error, origin)
}
}
await switchAlias(name, sourceIndex, index, rollbackFromAliasSwitch, ops)

await reindex(name, sourceIndex, index, rollbackFromReindex, ops)
try {
ops.postReindex = await Promise.all([
await getOps((ops) => checkIndexAlreadyExists(sourceIndex, ops)),
await getOps((ops) => checkIndexAlreadyExists(index, ops)),
await getOps((ops) => checkAliasAlreadyExists(name, ops))
ops.postAliasSwitch = await Promise.all([
getOps((ops) => deleteIndex(sourceIndex, () => { throw Boom.failedDependency(`Source Index "${sourceIndex}" could not be deleted`) }, ops)),
getOps((ops) => checkAliasAlreadyExists(name, ops))
])
} catch (error) {
const origin = `the checks after ${sourceIndex} reindexation to ${index}, and before the switch of ${name} alias`
error.message = `The original index (${sourceIndex})/destination index (${index})/alias (${name}) status after reindexation was not consistent and has probably been altered by a third party: ` + error.message
error.ops = ops
await rollbackFromReindex(name, index, error, origin)
throw error
}

const rollbackFromAliasSwitch = async (name, sourceIndex, index, error, origin) => {
await rollbackFromReindex(name, index, error, origin) // is this really a good idea ?
await rollbackAliasSwitch({name, sourceIndex, index}, error, origin)
return {name, sourceIndex, index, ops}
} finally {
await finishIndexUpdate(name)
}
await switchAlias(name, sourceIndex, index, rollbackFromAliasSwitch, ops)

ops.postAliasSwitch = await Promise.all([
getOps((ops) => deleteIndex(sourceIndex, () => { throw Boom.failedDependency(`Source Index "${sourceIndex}" could not be deleted`) }, ops)),
getOps((ops) => checkAliasAlreadyExists(name, ops))
])

return {name, sourceIndex, index, ops}
}

exports.delete = async (name) => {
const ops = {}
await startIndexDeletion(name)
try {
const {sourceIndex: index} = await findAliasIndex(name, ops)
ops.preChecks = await Promise.all([
getOps((ops) => checkAliasAlreadyExists(name, ops)),
getOps((ops) => checkIndexAlreadyExists(index, ops))
])
const cantRollbackAlias = (name, index, e, origin) => { throw e }
const cantRollbackIndex = (index, e, origin) => { throw e }
ops.deletions = await Promise.all([
getOps((ops) => deleteAlias(name, index, cantRollbackAlias, ops)),
getOps((ops) => deleteIndex(index, cantRollbackIndex, ops))
])

return {name, index, ops}
} catch (error) {
error.message = `The alias (${name}) or the index it is pointing to, was missing: ` + error.message
error.ops = ops
throw Boom.boomify(error, {statusCode: 404})
try {
const {sourceIndex: index} = await findAliasIndex(name, ops)
ops.preChecks = await Promise.all([
getOps((ops) => checkAliasAlreadyExists(name, ops)),
getOps((ops) => checkIndexAlreadyExists(index, ops))
])
const cantRollbackAlias = (name, index, e, origin) => { throw e }
const cantRollbackIndex = (index, e, origin) => { throw e }
ops.deletions = await Promise.all([
getOps((ops) => deleteAlias(name, index, cantRollbackAlias, ops)),
getOps((ops) => deleteIndex(index, cantRollbackIndex, ops))
])

return {name, index, ops}
} catch (error) {
error.message = `The alias (${name}) or the index it is pointing to, was missing: ` + error.message
error.ops = ops
throw Boom.boomify(error, {statusCode: 404})
}
} finally {
await finishIndexDeletion(name)
}
}
Loading

0 comments on commit 57865ef

Please sign in to comment.