diff --git a/app/routes/scenarios--create.js b/app/routes/scenarios--create.js index c296b2bb..b6f2701a 100644 --- a/app/routes/scenarios--create.js +++ b/app/routes/scenarios--create.js @@ -18,6 +18,8 @@ function handler (params, payload, reply) { const rnSource = payload.roadNetworkSource; const rnSourceScenarioId = payload.roadNetworkSourceScenario; const roadNetworkFile = payload.roadNetworkFile; + const poiSource = payload.poiSource; + const poiSourceScenarioId = payload.poiSourceScenario; return db('projects') .select('status') @@ -97,19 +99,31 @@ function handler (params, payload, reply) { }); }) // Start operation and return data to continue. - .then(scenario => startOperation(params.projId, scenario.id).then(op => [op, scenario])) - .then(data => { - let [op, scenario] = data; - if (rnSource === 'clone') { - return createScenario(params.projId, scenario.id, op.getId(), rnSource, {rnSourceScenarioId}) - .then(() => scenario); - } else if (rnSource === 'new') { - return handleRoadNetworkUpload(scenario, op.getId(), rnSource, roadNetworkFile) - .then(() => scenario); - } else if (rnSource === 'osm') { - return createScenario(params.projId, scenario.id, op.getId(), rnSource) - .then(() => scenario); + .then(scenario => Promise.all([ + startOperation(params.projId, scenario.id), + scenario + ])) + .then(([op, scenario]) => { + let action = Promise.resolve({ rnSource, poiSource }); + + // Upload and process file. Add the filename to the data object. + if (rnSource === 'new') { + action = action.then(data => handleRoadNetworkUpload(scenario, op.getId(), rnSource, roadNetworkFile) + .then(res => Object.assign({}, data, res)) + ); + // Add the rn source id to the data object. + } else if (rnSource === 'clone') { + action = action.then(data => Object.assign({}, data, {rnSourceScenarioId})); + } + + // Add the poi source id to the data object. + if (poiSource === 'clone') { + action = action.then(data => Object.assign({}, data, {poiSourceScenarioId})); } + + return action + .then(data => createScenario(params.projId, scenario.id, op.getId(), data)) + .then(() => scenario); }) .then(scenario => reply(scenario)) .catch(err => { @@ -165,13 +179,21 @@ export default [ if (result.files.roadNetworkFile) { payload.roadNetworkFile = result.files.roadNetworkFile[0]; } + if (result.fields.poiSource) { + payload.poiSource = result.fields.poiSource[0]; + } + if (result.fields.poiSourceScenario) { + payload.poiSourceScenario = result.fields.poiSourceScenario[0]; + } let validation = Joi.validate(payload, Joi.object().keys({ name: Joi.string().required(), description: Joi.string(), roadNetworkSource: Joi.string().valid('clone', 'new', 'osm').required(), roadNetworkSourceScenario: Joi.number().when('roadNetworkSource', {is: 'clone', then: Joi.required()}), - roadNetworkFile: Joi.object().when('roadNetworkSource', {is: 'new', then: Joi.required()}) + roadNetworkFile: Joi.object().when('roadNetworkSource', {is: 'new', then: Joi.required()}), + poiSource: Joi.string().valid('clone').required(), + poiSourceScenario: Joi.number().when('poiSource', {is: 'clone', then: Joi.required()}) })); if (validation.error) { @@ -235,7 +257,9 @@ export default [ name: data.name, description: data.description, roadNetworkSource: 'clone', - roadNetworkSourceScenario: request.params.scId + roadNetworkSourceScenario: request.params.scId, + poiSource: 'clone', + poiSourceScenario: request.params.scId }; return handler(request.params, payload, reply); }) @@ -267,20 +291,13 @@ function startOperation (projId, scId) { }); } -function createScenario (projId, scId, opId, rnSource, data = {}) { +function createScenario (projId, scId, opId, data = {}) { let action = Promise.resolve(); // In test mode we don't want to start the generation. // It will be tested in the appropriate place. if (process.env.DS_ENV === 'test') { return action; } - if (rnSource === 'clone') { - // We need to close the connection to the source scenario before cloning - // the database. This needs to be done in this process. The process ran by - // the service runner won't have access to it. - // action = closeDatabase(projId, data.rnSourceScenarioId); - } - - let serviceData = Object.assign({}, {projId, scId, opId, rnSource}, data); + let serviceData = Object.assign({}, {projId, scId, opId}, data); action.then(() => { console.log(`p${projId} s${scId}`, 'createScenario'); @@ -321,5 +338,5 @@ function handleRoadNetworkUpload (scenario, opId, source, roadNetworkFile) { .then(() => putFileToS3(filePath, roadNetworkFile.path)) // Delete temp file. .then(() => removeLocalFile(roadNetworkFile.path, true)) - .then(() => createScenario(scenario.project_id, scenario.id, opId, source, {roadNetworkFile: fileName})); + .then(() => ({roadNetworkFile: fileName})); } diff --git a/app/s3/utils.js b/app/s3/utils.js index 8ba68060..e035f0d1 100644 --- a/app/s3/utils.js +++ b/app/s3/utils.js @@ -1,5 +1,5 @@ 'use strict'; -import fs from 'fs'; +import fs from 'fs-extra'; import Promise from 'bluebird'; import s3, { bucket } from './'; diff --git a/app/services/export-road-network/export-road-network.js b/app/services/export-road-network/export-road-network.js index 5c6fd980..31593835 100644 --- a/app/services/export-road-network/export-road-network.js +++ b/app/services/export-road-network/export-road-network.js @@ -3,6 +3,7 @@ import path from 'path'; import obj2osm from 'obj2osm'; import osmP2PApi from 'osm-p2p-server/api/index'; import through2 from 'through2'; +import osmtogeojson from 'osmtogeojson'; import config from '../../config'; import { getDatabase } from '../rra-osm-p2p'; @@ -51,17 +52,22 @@ export function exportRoadNetwork (e) { let op = new Operation(db); op.loadById(opId) - .then(op => op.log('road-network', {message: 'Updating road network'})) - .then(op => { + .then(op => op.log('road-network', {message: 'Updating road network and pois'})) + // Load scenario poi types. + .then(() => db('scenarios_files') + .select('subtype') + .where('type', 'poi') + .where('project_id', projId) + .where('scenario_id', scId) + .then(types => types.map(o => o.subtype)) + ) + .then(poiTypes => { const bbox = [-180, -90, 180, 90]; const toOsmOptions = { bounds: {minlon: bbox[0], minlat: bbox[1], maxlon: bbox[2], maxlat: bbox[3]} }; const osmDb = getDatabase(projId, scId); - const fileName = `road-network_${Date.now()}`; - const filePath = `scenario-${scId}/${fileName}`; - - let formatTransform = obj2osm(toOsmOptions); + const formatTransform = obj2osm(toOsmOptions); formatTransform.on('error', (err) => { throw err; @@ -70,32 +76,99 @@ export function exportRoadNetwork (e) { logger && logger.log('Exporting data from osm-p2p'); let stream = osmP2PApi(osmDb).getMap(bbox, {order: 'type'}) - .pipe(processOSMP2PExport()) - .pipe(formatTransform); - - return putFileStream(filePath, stream) - .then(() => logger && logger.log('Exporting data from osm-p2p... done')) - // Get previous file. - .then(() => db('scenarios_files') - .select('path') - .where('type', 'road-network') - .where('project_id', projId) - .where('scenario_id', scId) - .first() - ) - // Delete from storage. - .then(file => removeFile(file.path)) - // Add entry to the database - .then(() => db('scenarios_files') - .update({ - name: fileName, - path: filePath, - updated_at: (new Date()) + .pipe(processOSMP2PExport()); + + // Extract the POI into a promise and continue with the road network. + let splitting = collectPOIs(stream, poiTypes); + + stream = splitting.stream.pipe(formatTransform); + + function processRN () { + const fileName = `road-network_${Date.now()}`; + const filePath = `scenario-${scId}/${fileName}`; + + return putFileStream(filePath, stream) + // Get previous file. + .then(() => db('scenarios_files') + .select('path') + .where('type', 'road-network') + .where('project_id', projId) + .where('scenario_id', scId) + .first() + ) + // Delete from storage. + .then(file => removeFile(file.path)) + // Add entry to the database + .then(() => db('scenarios_files') + .update({ + name: fileName, + path: filePath, + updated_at: (new Date()) + }) + .where('type', 'road-network') + .where('project_id', projId) + .where('scenario_id', scId) + ); + } + + function processPOI () { + return splitting.deferred + // Convert to Feature Collection from Overpass style nodes. + .then(data => { + let fc = osmtogeojson({elements: data}); + // Group features by its ram_poi_type. + let groups = fc.features.reduce((acc, feat) => { + let type = feat.properties.ram_poi_type; + if (!acc[type]) { + acc[type] = { + type: 'FeatureCollection', + features: [] + }; + } + acc[type].features.push(feat); + return acc; + }, {}); + + return groups; }) - .where('type', 'road-network') - .where('project_id', projId) - .where('scenario_id', scId) - ); + .then(groups => Promise.all(Object.keys(groups).map(key => { + const fileName = `poi_${key}_${Date.now()}`; + const filePath = `scenario-${scId}/${fileName}`; + + let data = JSON.stringify(groups[key]); + + return putFileStream(filePath, data) + // Get previous file. + .then(() => db('scenarios_files') + .select('id', 'path') + .where('type', 'poi') + .where('subtype', key) + .where('project_id', projId) + .where('scenario_id', scId) + .first() + ) + // Delete from storage. + .then(file => removeFile(file.path) + .then(() => file.id) + ) + // Add entry to the database + .then(id => db('scenarios_files') + .update({ + name: fileName, + path: filePath, + updated_at: (new Date()) + }) + .where('type', 'poi') + .where('id', id) + .where('project_id', projId) + .where('scenario_id', scId) + ); + }))); + } + + return processRN() + .then(() => processPOI()) + .then(() => logger && logger.log('Exporting data from osm-p2p... done')); }) // Note: There's no need to close the osm-p2p-db because when the process // terminates the connection is automatically closed. @@ -140,3 +213,72 @@ function processOSMP2PExport () { cb(null, data); }); } + +function collectPOIs (stream, poiTypes) { + let rn = []; + let pois = []; + let nodeStack = {}; + + // Create a sort of deferred. + // This promise will collect the POI and return them for + // later processing. + let _resolve; + const deferred = new Promise((resolve) => { + _resolve = resolve; + }); + + let dbgSkipped = 0; + + const write = (data, enc, next) => { + if (data.type === 'node') { + if (data.tags && data.tags.amenity) { + // Discard nodes with ram_poi_type different than what was uploaded. + if (data.tags.ram_poi_type && poiTypes.indexOf(data.tags.ram_poi_type) !== -1) { + pois.push(data); + } else { + dbgSkipped++; + } + } else { + nodeStack[data.id] = data; + } + } else if (data.type === 'way') { + if (data.tags && data.tags.amenity) { + // Discard ways with ram_poi_type different than what was uploaded. + if (data.tags.ram_poi_type && poiTypes.indexOf(data.tags.ram_poi_type) !== -1) { + pois.push(data); + data.nodes.forEach(n => { + if (nodeStack[n]) { + pois.push(nodeStack[n]); + delete nodeStack[n]; + } + }); + } else { + dbgSkipped++; + } + } else { + rn.push(data); + data.nodes.forEach(n => { + if (nodeStack[n]) { + rn.push(nodeStack[n]); + delete nodeStack[n]; + } + }); + } + } + next(); + }; + + const end = function (next) { + DEBUG && console.log('collectPOIs', 'missing/invalid ram_poi_type', dbgSkipped); + // Sort. + pois.sort(a => a.type === 'node' ? -1 : 1); + setImmediate(() => _resolve(pois)); + + rn.sort(a => a.type === 'node' ? -1 : 1); + rn.forEach(o => this.push(o)); + next(); + }; + + stream = stream.pipe(through2.obj(write, end)); + return {stream, deferred}; +} diff --git a/app/services/project-setup/project-setup.js b/app/services/project-setup/project-setup.js index 9b9d06b9..67547383 100644 --- a/app/services/project-setup/project-setup.js +++ b/app/services/project-setup/project-setup.js @@ -9,14 +9,14 @@ import Promise from 'bluebird'; import config from '../../config'; import db from '../../db/'; import Operation from '../../utils/operation'; -import { setScenarioSetting, getPropInsensitive } from '../../utils/utils'; +import { setScenarioSetting, getScenarioSetting, getPropInsensitive } from '../../utils/utils'; import { createAdminBoundsVT, createRoadNetworkVT } from '../../utils/vector-tiles'; import { getFileContents, getJSONFileContents, putFileStream } from '../../s3/utils'; -import { importRoadNetwork, removeDatabase } from '../rra-osm-p2p'; +import { importRoadNetwork, importPOI, removeDatabase } from '../rra-osm-p2p'; import AppLogger from '../../utils/app-logger'; import * as overpass from '../../utils/overpass'; @@ -403,6 +403,7 @@ export function concludeProjectSetup (e) { : () => db('scenarios_files') .select('*') .where('project_id', projId) + .where('scenario_id', scId) .where('type', 'road-network') .first() .then(file => getFileContents(file.path)); @@ -411,7 +412,22 @@ export function concludeProjectSetup (e) { // Handle POI. let poiProcessPromise = poiSource.type === 'osm' ? () => importOSMPOIs(overpass.fcBbox(adminBoundsFc), poiSource.data.osmPoiTypes) - : () => Promise.resolve(); + // We'll need to get the POI contents to import to the osm-p2p-db. + : () => db('scenarios_files') + .select('*') + .where('project_id', projId) + .where('scenario_id', scId) + .where('type', 'poi') + .then(files => Promise.all([ + files, + Promise.map(files, file => getJSONFileContents(file.path)) + ]) + .then(([files, filesContent]) => { + // Create an object indexed by poi type. + let pois = {}; + files.forEach((f, idx) => { pois[f.subtype] = filesContent[idx]; }); + return pois; + })); // // Handle Profile. @@ -424,10 +440,34 @@ export function concludeProjectSetup (e) { .then(() => profileProcessPromise()) // Remove anything that might be there. We're importing fresh data. .then(() => removeDatabase(projId, scId)) - .then(() => poiProcessPromise()) .then(() => rnProcessPromise() .then(roadNetwork => importRoadNetworkOsmP2Pdb(projId, scId, op, roadNetwork)) .then(roadNetwork => process.env.DS_ENV === 'test' ? null : createRoadNetworkVT(projId, scId, op, roadNetwork).promise) + ) + .then(() => poiProcessPromise() + .then(poisFC => { + // Check rn_active_editing setting to see if we need to import. + return getScenarioSetting(db, scId, 'rn_active_editing') + .then(editing => { + if (!editing) { + return; + } + // Merge all feature collection together. + // Add a property to keep track of the poi type. + let fc = { + type: 'FeatureCollection', + features: Object.keys(poisFC).reduce((acc, key) => { + let feats = poisFC[key].features; + feats.forEach(f => { f.properties.ram_poi_type = key; }); + return acc.concat(feats); + }, []) + }; + + let poiLogger = appLogger.group(`p${projId} s${scId} poi import`); + poiLogger && poiLogger.log('process poi'); + return importPOI(projId, scId, op, fc, poiLogger); + }); + }) ); }) .then(() => { @@ -439,9 +479,9 @@ export function concludeProjectSetup (e) { trx('scenarios') .update({updated_at: (new Date()), status: 'active'}) .where('id', scId) - ]) - .then(() => op.log('success', {message: 'Operation complete'}).then(op => op.finish())); - }); + ]); + }) + .then(() => op.log('success', {message: 'Operation complete'}).then(op => op.finish())); }) .then(() => { logger && logger.log('process complete'); diff --git a/app/services/rra-osm-p2p.js b/app/services/rra-osm-p2p.js index cb8b0a0b..8eb3c459 100644 --- a/app/services/rra-osm-p2p.js +++ b/app/services/rra-osm-p2p.js @@ -90,17 +90,15 @@ export function removeDatabase (projId, scId) { } export function importRoadNetwork (projId, scId, op, roadNetwork, logger) { + const importPromise = Promise.promisify(importer); const basePath = path.resolve(os.tmpdir(), `road-networkP${projId}S${scId}`); - - let osmDb = getDatabase(projId, scId); - - let importPromise = Promise.promisify(importer); + const osmDb = getDatabase(projId, scId); return op.log('process:road-network', {message: 'Road network processing started'}) .then(() => convertToOSMXml(roadNetwork, 'osm', basePath, logger)) .then(() => logger && logger.log('Importing changeset into osm-p2p...')) .then(() => { - let xml = fs.createReadStream(`${basePath}.osm`); + const xml = fs.createReadStream(`${basePath}.osm`); return importPromise(osmDb, xml); }) .then(() => logger && logger.log('Importing changeset into osm-p2p... done')) @@ -109,6 +107,24 @@ export function importRoadNetwork (projId, scId, op, roadNetwork, logger) { .then(() => op.log('process:road-network', {message: 'Road network processing finished'})); } +export function importPOI (projId, scId, op, poiFc, logger) { + const importPromise = Promise.promisify(importer); + const basePath = path.resolve(os.tmpdir(), `poiP${projId}S${scId}`); + const osmDb = getDatabase(projId, scId); + + return op.log('process:poi', {message: 'Poi processing started'}) + .then(() => convertToOSMXml(JSON.stringify(poiFc), 'geojson', basePath, logger)) + .then(() => logger && logger.log('Importing changeset into osm-p2p...')) + .then(() => { + let xml = fs.createReadStream(`${basePath}.osm`); + return importPromise(osmDb, xml); + }) + .then(() => logger && logger.log('Importing changeset into osm-p2p... done')) + // Note: There's no need to close the osm-p2p-db because when the process + // terminates the connection is automatically closed. + .then(() => op.log('process:poi', {message: 'Poi processing finished'})); +} + function convertToOSMXml (data, dataType, basePath, logger) { // Create an OSM Change file and store it in system /tmp folder. return new Promise((resolve, reject) => { diff --git a/app/services/scenario-create/scenario-create.js b/app/services/scenario-create/scenario-create.js index 0eb745cc..0bf4e1e9 100644 --- a/app/services/scenario-create/scenario-create.js +++ b/app/services/scenario-create/scenario-create.js @@ -3,10 +3,10 @@ import path from 'path'; import Promise from 'bluebird'; import config from '../../config'; -import { cloneDatabase, closeDatabase, importRoadNetwork } from '../rra-osm-p2p'; +import { cloneDatabase, closeDatabase, importRoadNetwork, importPOI } from '../rra-osm-p2p'; import db from '../../db/'; -import { setScenarioSetting } from '../../utils/utils'; -import { copyFile, copyDirectory, putFileStream, getFileContents } from '../../s3/utils'; +import { setScenarioSetting, getScenarioSetting } from '../../utils/utils'; +import { copyFile, copyDirectory, putFileStream, getFileContents, getJSONFileContents } from '../../s3/utils'; import Operation from '../../utils/operation'; import AppLogger from '../../utils/app-logger'; import * as overpass from '../../utils/overpass'; @@ -60,84 +60,93 @@ export function scenarioCreate (e) { rnSource, rnSourceScenarioId, roadNetworkFile, + poiSource, + poiSourceScenarioId, callback } = e; let op = new Operation(db); op.loadById(opId) .then(op => op.log('admin-areas', {message: 'Cloning admin areas'})) - .then(() => db.transaction(function (trx) { + .then(() => { let executor = Promise.resolve(); - if (rnSource === 'clone') { + // Cache the road network content to use later. + // This is to avoid multiple downloads from s3. + let rnCache = null; + + logger && logger.log('poiSource', poiSource); + logger && logger.log('rnSource', rnSource); + + if (poiSource === 'clone') { executor = executor // Copy the scenario files. - .then(() => op.log('files', {message: 'Cloning files'})) - .then(() => trx('scenarios_files') + .then(() => op.log('files', {message: 'Cloning points of interest'})) + .then(() => db('scenarios_files') .select('*') - .where('scenario_id', rnSourceScenarioId) + .where('scenario_id', poiSourceScenarioId) .where('project_id', projId) - .whereIn('type', ['poi', 'road-network']) - .then(files => cloneScenarioFiles(trx, files, projId, scId)) + .where('type', 'poi') + .then(files => cloneScenarioFiles(db, files, projId, scId)) ) - .then(() => trx('scenarios_source_data') - .select('project_id', 'name', 'type', 'data') + // Set poi source to file. + .then(() => db('scenarios_source_data') + .insert({ + project_id: projId, + scenario_id: scId, + name: 'poi', + type: 'file' + }) + ); + } else { + throw new Error(`Poi source is invalid: ${poiSource}`); + } + + // Road Network: Clone. + if (rnSource === 'clone') { + executor = executor + // Copy the scenario files. + .then(() => op.log('files', {message: 'Cloning road network'})) + .then(() => db('scenarios_files') + .select('*') .where('scenario_id', rnSourceScenarioId) .where('project_id', projId) - .then(sourceData => { - // Set new id. - sourceData.forEach(o => { - o.scenario_id = scId; - }); - return sourceData; + .where('type', 'road-network') + .then(files => cloneScenarioFiles(db, files, projId, scId)) + ) + // Set road network source to file. + .then(() => db('scenarios_source_data') + .insert({ + project_id: projId, + scenario_id: scId, + name: 'road-network', + type: 'file' }) ) - .then(sourceData => trx.batchInsert('scenarios_source_data', sourceData)) // Copy the setting for road network edition. - .then(() => trx('scenarios_settings') + .then(() => db('scenarios_settings') .select('value') .where('scenario_id', rnSourceScenarioId) .where('key', 'rn_active_editing') .first() .then(res => setScenarioSetting(db, scId, 'rn_active_editing', res ? res.value : false)) ) - // Copy the osm-p2p-db. - .then(() => op.log('files', {message: 'Cloning road network database'})) - .then(() => closeDatabase(projId, scId)) - .then(() => cloneOsmP2Pdb(projId, rnSourceScenarioId, projId, scId)) + // Copy vector tiles. .then(() => copyDirectory(`scenario-${rnSourceScenarioId}/tiles/road-network`, `scenario-${scId}/tiles/road-network`)); - // + + // Road Network: New } else if (rnSource === 'new') { executor = executor - // Copy the scenario files. - .then(() => op.log('files', {message: 'Cloning files'})) - // When uploading a new file we do so only for the - // road-network. Since the poi file is identical for all - // scenarios of the project just clone it from the master. - .then(() => trx('scenarios_files') - .select('scenarios_files.*') - .innerJoin('scenarios', 'scenarios.id', 'scenarios_files.scenario_id') - .where('scenarios.master', true) - .where('scenarios.project_id', projId) - .where('scenarios_files.type', 'poi') - .then(files => cloneScenarioFiles(trx, files, projId, scId)) - ) - // Insert source info. - // TODO: This needs to be updated once we have osm data. - .then(() => trx.batchInsert('scenarios_source_data', [ - { + .then(() => op.log('files', {message: 'Uploading new road network'})) + // Set road network source to file. + .then(() => db('scenarios_source_data') + .insert({ project_id: projId, scenario_id: scId, name: 'road-network', type: 'file' - }, - { - project_id: projId, - scenario_id: scId, - name: 'poi', - type: 'file' - } - ])) + }) + ) // Add entry for road network file. .then(() => { let now = new Date(); @@ -151,45 +160,36 @@ export function scenarioCreate (e) { updated_at: now }; - return trx('scenarios_files') + return db('scenarios_files') .returning('*') .insert(data) .then(res => res[0]); }) .then(file => getFileContents(file.path)) - // Import to the osm-p2p-db. .then(roadNetwork => { + // Disable road network editing if size over threshold. + return setScenarioSetting(db, scId, 'rn_active_editing', roadNetwork.length < config.roadNetEditThreshold) + .then(() => roadNetwork); + }) + // Create vector tiles. + .then(roadNetwork => { + rnCache = roadNetwork; logger && logger.log('process road network'); - return importRoadNetworkOsmP2Pdb(projId, scId, op, roadNetwork) - .then(roadNetwork => createRoadNetworkVT(projId, scId, op, roadNetwork).promise); + return createRoadNetworkVT(projId, scId, op, roadNetwork).promise; }); + + // Road Network: Osm } else if (rnSource === 'osm') { executor = executor .then(() => op.log('files', {message: 'Importing road network'})) - .then(() => trx.batchInsert('scenarios_source_data', [ - { + // Set road network source to osm. + .then(() => db('scenarios_source_data') + .insert({ project_id: projId, scenario_id: scId, name: 'road-network', type: 'osm' - }, - { - project_id: projId, - scenario_id: scId, - name: 'poi', - type: 'file' - } - ])) - // When uploading a new file we do so only for the - // road-network. Since the poi file is identical for all - // scenarios of the project just clone it from the master. - .then(() => trx('scenarios_files') - .select('scenarios_files.*') - .innerJoin('scenarios', 'scenarios.id', 'scenarios_files.scenario_id') - .where('scenarios.master', true) - .where('scenarios.project_id', projId) - .where('scenarios_files.type', 'poi') - .then(files => cloneScenarioFiles(trx, files, projId, scId)) + }) ) // Get the bbox for the overpass import. .then(() => db('projects') @@ -199,8 +199,8 @@ export function scenarioCreate (e) { .then(res => res.bbox) ) .then(bbox => overpass.importRoadNetwork(overpass.convertBbox(bbox))) + // Just to log error .catch(err => { - // Just to log error logger && logger.log('Error importing from overpass', err.message); throw err; }) @@ -220,19 +220,98 @@ export function scenarioCreate (e) { return putFileStream(filePath, osmData) .then(() => db('scenarios_files').insert(data)) - .then(() => { - logger && logger.log('process road network'); - return importRoadNetworkOsmP2Pdb(projId, scId, op, osmData) - .then(roadNetwork => createRoadNetworkVT(projId, scId, op, roadNetwork).promise); - }); + .then(() => osmData); + }) + .then(roadNetwork => { + // Disable road network editing if size over threshold. + return setScenarioSetting(db, scId, 'rn_active_editing', roadNetwork.length < config.roadNetEditThreshold) + .then(() => roadNetwork); + }) + // Create vector tiles. + .then(roadNetwork => { + rnCache = roadNetwork; + logger && logger.log('process road network'); + return createRoadNetworkVT(projId, scId, op, roadNetwork).promise; }); + } else { + throw new Error(`Road network source is invalid: ${rnSource}`); + } + + // If we're cloning both the pois and the rn from the same source + // we can just clone the database. There's no need to import. + if (rnSource === 'clone' && poiSource === 'clone' && rnSourceScenarioId === poiSourceScenarioId) { + logger && logger.log('Cloning from same source. Duplicating osm db.'); + executor = executor + // Copy the osm-p2p-db. + .then(() => op.log('files', {message: 'Cloning osm database'})) + .then(() => closeDatabase(projId, scId)) + .then(() => cloneOsmP2Pdb(projId, rnSourceScenarioId, projId, scId)); + } else { + // Is there any importing to do? + executor = executor + .then(() => getScenarioSetting(db, scId, 'rn_active_editing')) + // No import. Stop the chain with an error. + .then(editing => { + if (!editing) { + logger && logger.log('Road network editing inactive.'); + throw new Error('not editing'); + } + }) + // Get the road network from cache or form the db. + .then(() => rnCache || db('scenarios_files') + .select('*') + .where('project_id', projId) + .where('scenario_id', scId) + .where('type', 'road-network') + .first() + .then(file => getFileContents(file.path)) + ) + // Import into osm db. + .then(roadNetwork => { + let rnLogger = appLogger.group(`p${projId} s${scId} rn import`); + rnLogger && rnLogger.log('process road network'); + return importRoadNetwork(projId, scId, op, roadNetwork, rnLogger); + }) + // Get all the pois and create a feature collection. + .then(() => db('scenarios_files') + .select('*') + .where('project_id', projId) + .where('scenario_id', scId) + .where('type', 'poi') + .then(files => Promise.all([ + files, + Promise.map(files, file => getJSONFileContents(file.path)) + ])) + .then(([files, filesContent]) => { + // Merge all feature collection together. + // Add a property to keep track of the poi type. + return { + type: 'FeatureCollection', + features: files.reduce((acc, file, idx) => { + let key = file.subtype; + let features = filesContent[idx].features; + features.forEach(f => { f.properties.ram_poi_type = key; }); + return acc.concat(features); + }, []) + }; + }) + ) + // Import into osm db. + .then(poiFc => { + console.log('poiFc', poiFc); + let poiLogger = appLogger.group(`p${projId} s${scId} poi import`); + poiLogger && poiLogger.log('process poi'); + return importPOI(projId, scId, op, poiFc, poiLogger); + }) + // Ignore not editing error. + .catch(e => { if (e.message !== 'not editing') throw e; }); } return executor - .then(() => trx('scenarios').update({status: 'active', updated_at: (new Date())}).where('id', scId)) - .then(() => trx('projects').update({updated_at: (new Date())}).where('id', projId)) + .then(() => db('scenarios').update({status: 'active', updated_at: (new Date())}).where('id', scId)) + .then(() => db('projects').update({updated_at: (new Date())}).where('id', projId)) .then(() => op.log('success', {message: 'Operation complete'}).then(op => op.finish())); - })) + }) // Note: There's no need to close the osm-p2p-db because when the process // terminates the connection is automatically closed. .then(() => { @@ -255,7 +334,7 @@ export function scenarioCreate (e) { // Copies the given files from a to the new scenario, both the database entries // and the physical file. -function cloneScenarioFiles (trx, files, projId, scId) { +function cloneScenarioFiles (db, files, projId, scId) { logger && logger.log('cloning files'); let newFiles = files.map(file => { const fileName = file.type === 'poi' @@ -280,7 +359,7 @@ function cloneScenarioFiles (trx, files, projId, scId) { // Insert new files in the db. .then(allFiles => { let [oldFiles, newFiles] = allFiles; - return trx.batchInsert('scenarios_files', newFiles).then(() => [oldFiles, newFiles]); + return db.batchInsert('scenarios_files', newFiles).then(() => [oldFiles, newFiles]); }) // Copy files on s3. .then(allFiles => { @@ -302,19 +381,3 @@ function cloneOsmP2Pdb (srcProjId, srcScId, destProjId, destScId) { } }); } - -function importRoadNetworkOsmP2Pdb (projId, scId, op, roadNetwork) { - let rnLogger = appLogger.group(`p${projId} s${scId} rn import`); - rnLogger && rnLogger.log('process road network'); - - // Disable road network editing if size over threshold. - let allowImport = roadNetwork.length < config.roadNetEditThreshold; - - return setScenarioSetting(db, scId, 'rn_active_editing', allowImport) - .then(() => { - if (allowImport) { - return importRoadNetwork(projId, scId, op, roadNetwork, rnLogger); - } - }) - .then(() => roadNetwork); -} diff --git a/app/utils/utils.js b/app/utils/utils.js index 59632885..5aa8bd0b 100644 --- a/app/utils/utils.js +++ b/app/utils/utils.js @@ -185,6 +185,28 @@ export function setScenarioSetting (db, scId, key, value) { }); } +export function getScenarioSetting (db, scId, key) { + // Check if setting exists. + return db('scenarios_settings') + .select('value') + .where('scenario_id', scId) + .where('key', key) + .first() + .then(setting => { + if (setting) { + try { + // Convert objects, booleans, and integers. + return JSON.parse(setting.value); + } catch (e) { + // Fallback to strings + return setting.value; + } + } else { + return null; + } + }); +} + export function getPropInsensitive (object, prop) { // prop can be written in caps or any variant. // prop, PROP, Prop, PrOp diff --git a/app/utils/vector-tiles.js b/app/utils/vector-tiles.js index 8b3eb559..0ff66b8c 100644 --- a/app/utils/vector-tiles.js +++ b/app/utils/vector-tiles.js @@ -6,11 +6,15 @@ import { exec } from 'child_process'; import tmpDir from 'temp-dir'; import Promise from 'bluebird'; +import config from '../config'; + import { removeDir as removeS3Dir, putDirectory } from '../s3/utils'; +const DEBUG = config.debug; + /** * Create the vector tiles for the admin bounds. * Full process: @@ -28,6 +32,8 @@ import { * @return Object with a `promise` and a `kill` switch. */ export function createAdminBoundsVT (projId, scId, op, fc) { + const identifier = `p${projId} s${scId} AB VT`; + // Temporary disable vector tiles. return { promise: Promise.resolve(), @@ -47,6 +53,8 @@ export function createAdminBoundsVT (projId, scId, op, fc) { let killed = false; let checkKilled = () => { if (killed) throw new Error('Process manually terminated'); }; + DEBUG && console.log(identifier, 'Clean files...'); + // Clean any existing files, locally and from S3. let executor = op.log('process:admin-bounds', {message: 'Creating admin bounds vector tiles'}) // Clean up phase. @@ -58,6 +66,7 @@ export function createAdminBoundsVT (projId, scId, op, fc) { // down the road and we've to repeat. removeS3Dir(`project-${projId}/tiles/admin-bounds`) ])) + .then(() => { DEBUG && console.log(identifier, 'Clean files... done'); }) .then(() => writeJsonP(geojsonFilePath, fc)) // Check if it was killed. The docker run will throw errors but the other // processes won't. Stop the chain if it was aborted before reaching @@ -65,6 +74,7 @@ export function createAdminBoundsVT (projId, scId, op, fc) { .then(() => checkKilled()) // Create tiles. .then(() => { + DEBUG && console.log(identifier, 'Running tippecanoe...'); currentRunning = `p${projId}s${scId}-bounds`; return dockerRun([ `-v ${tmpDir}:/data`, @@ -76,10 +86,13 @@ export function createAdminBoundsVT (projId, scId, op, fc) { `/data/${geojsonName}` ]); }) + .then(() => { DEBUG && console.log(identifier, 'Running tippecanoe... done'); }) // Check if it was killed. Additional check in case docker delayed in // throwing the error. .then(() => checkKilled()) + .then(() => { DEBUG && console.log(identifier, 'Uploading to storage...'); }) .then(() => putDirectory(tilesFolderPath, `project-${projId}/tiles/admin-bounds`)) + .then(() => { DEBUG && console.log(identifier, 'Uploading to storage... done'); }) // Check if it was killed. putDirectory will not throw an error so stop the // run if the analysis was killed while putDirectory was running. .then(() => checkKilled()); @@ -114,6 +127,8 @@ export function createAdminBoundsVT (projId, scId, op, fc) { * @return Object with a `promise` and a `kill` switch. */ export function createRoadNetworkVT (projId, scId, op, roadNetwork) { + const identifier = `p${projId} s${scId} RN VT`; + // Temporary disable vector tiles. return { promise: Promise.resolve(), @@ -135,6 +150,8 @@ export function createRoadNetworkVT (projId, scId, op, roadNetwork) { let killed = false; let checkKilled = () => { if (killed) throw new Error('Process manually terminated'); }; + DEBUG && console.log(identifier, 'Clean files...'); + // Clean any existing files, locally and from S3. let executor = op.log('road-network', {message: 'Creating road-network vector tiles'}) // Clean up phase. @@ -145,6 +162,7 @@ export function createRoadNetworkVT (projId, scId, op, roadNetwork) { // Clean S3 directory removeS3Dir(`scenario-${scId}/tiles/road-network`) ])) + .then(() => { DEBUG && console.log(identifier, 'Clean files... done'); }) .then(() => writeFile(osmFilePath, roadNetwork)) // Check if it was killed. The docker run will throw errors but the other // processes won't. Stop the chain if it was aborted before reaching @@ -152,8 +170,8 @@ export function createRoadNetworkVT (projId, scId, op, roadNetwork) { .then(() => checkKilled()) // Convert to geojson. .then(() => { + DEBUG && console.log(identifier, 'Running osmtogeojson...'); currentRunning = `p${projId}s${scId}-rn`; - return dockerRun([ `-v ${tmpDir}:/data`, `--name ${currentRunning}`, @@ -162,11 +180,13 @@ export function createRoadNetworkVT (projId, scId, op, roadNetwork) { `/data/${osmName} > ${geojsonFilePath}` ]); }) - // Check if it was killed. Additional check in case docker delayed in + .then(() => { DEBUG && console.log(identifier, 'Running osmtogeojson... done'); }) + // Check if it was killed. Additional check in case docker delayed int // throwing the error. .then(() => checkKilled()) // Create tiles. .then(() => { + DEBUG && console.log(identifier, 'Running tippecanoe...'); currentRunning = `p${projId}s${scId}-tiles`; return dockerRun([ `-v ${tmpDir}:/data`, @@ -178,7 +198,13 @@ export function createRoadNetworkVT (projId, scId, op, roadNetwork) { `/data/${geojsonName}` ]); }) + .then(() => { DEBUG && console.log(identifier, 'Running tippecanoe... done'); }) + // Check if it was killed. Additional check in case docker delayed in + // throwing the error. + .then(() => checkKilled()) + .then(() => { DEBUG && console.log(identifier, 'Uploading to storage...'); }) .then(() => putDirectory(tilesFolderPath, `scenario-${scId}/tiles/road-network`)) + .then(() => { DEBUG && console.log(identifier, 'Uploading to storage... done'); }) // Check if it was killed. putDirectory will not throw an error so stop the // run if the analysis was killed while putDirectory was running. .then(() => checkKilled()); diff --git a/test/test-scenarios-create.js b/test/test-scenarios-create.js index 8be6e599..8e5ab625 100644 --- a/test/test-scenarios-create.js +++ b/test/test-scenarios-create.js @@ -171,6 +171,8 @@ describe('Scenarios', function () { form.append('name', 'Scenario name'); form.append('roadNetworkSource', 'clone'); form.append('roadNetworkSourceScenario', 1); + form.append('poiSource', 'clone'); + form.append('poiSourceScenario', 1); return streamToPromise(form) .then(payload => instance.injectThen({ @@ -190,6 +192,8 @@ describe('Scenarios', function () { form.append('name', 'Scenario name'); form.append('roadNetworkSource', 'clone'); form.append('roadNetworkSourceScenario', 1); + form.append('poiSource', 'clone'); + form.append('poiSourceScenario', 1); return streamToPromise(form) .then(payload => instance.injectThen({ @@ -209,6 +213,8 @@ describe('Scenarios', function () { form.append('name', 'Scenario name'); form.append('roadNetworkSource', 'clone'); form.append('roadNetworkSourceScenario', 1); + form.append('poiSource', 'clone'); + form.append('poiSourceScenario', 1); return streamToPromise(form) .then(payload => instance.injectThen({ @@ -229,6 +235,8 @@ describe('Scenarios', function () { form.append('name', 'Main scenario 1200'); form.append('roadNetworkSource', 'clone'); form.append('roadNetworkSourceScenario', 1200); + form.append('poiSource', 'clone'); + form.append('poiSourceScenario', 1200); return streamToPromise(form) .then(payload => instance.injectThen({ @@ -249,6 +257,8 @@ describe('Scenarios', function () { form.append('name', 'New scenario project 1200'); form.append('roadNetworkSource', 'clone'); form.append('roadNetworkSourceScenario', 1200); + form.append('poiSource', 'clone'); + form.append('poiSourceScenario', 1200); return streamToPromise(form) .then(payload => instance.injectThen({ @@ -278,6 +288,8 @@ describe('Scenarios', function () { form.append('name', 'New scenario with file project 1200'); form.append('roadNetworkSource', 'new'); form.append('roadNetworkFile', fs.createReadStream('./test/utils/data-sergipe/road-network.osm')); + form.append('poiSource', 'clone'); + form.append('poiSourceScenario', 1); return streamToPromise(form).then(payload => { return instance.injectThen({ diff --git a/test/test-services-project-setup.js b/test/test-services-project-setup.js index f09ff4ba..341cb652 100644 --- a/test/test-services-project-setup.js +++ b/test/test-services-project-setup.js @@ -118,22 +118,23 @@ describe('Finish Project Setup', function () { db('operations_logs') .where('operation_id', op.getId()) .then(logs => { - assert.lengthOf(logs, 6); + assert.lengthOf(logs, 8); assert.equal(logs[0].code, 'start'); assert.equal(logs[0].data.message, 'Operation started'); - - // These actions run in parallel and can actually be switched. - assert.oneOf(logs[1].code, ['process:admin-bounds', 'process:origins']); - assert.oneOf(logs[1].data.message, ['Processing admin areas', 'Processing origins']); - assert.oneOf(logs[2].code, ['process:admin-bounds', 'process:origins']); - assert.oneOf(logs[2].data.message, ['Processing admin areas', 'Processing origins']); - + assert.equal(logs[1].code, 'process:origins'); + assert.equal(logs[1].data.message, 'Processing origins'); + assert.equal(logs[2].code, 'process:admin-bounds'); + assert.equal(logs[2].data.message, 'Processing admin areas'); assert.equal(logs[3].code, 'process:road-network'); assert.equal(logs[3].data.message, 'Road network processing started'); assert.equal(logs[4].code, 'process:road-network'); assert.equal(logs[4].data.message, 'Road network processing finished'); - assert.equal(logs[5].code, 'success'); - assert.equal(logs[5].data.message, 'Operation complete'); + assert.equal(logs[5].code, 'process:poi'); + assert.equal(logs[5].data.message, 'Poi processing started'); + assert.equal(logs[6].code, 'process:poi'); + assert.equal(logs[6].data.message, 'Poi processing finished'); + assert.equal(logs[7].code, 'success'); + assert.equal(logs[7].data.message, 'Operation complete'); }) ]) // Delete osm p2p folder. diff --git a/yarn.lock b/yarn.lock index c21885ab..b6ccdee9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3291,9 +3291,9 @@ osm-p2p-defork@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/osm-p2p-defork/-/osm-p2p-defork-1.0.0.tgz#b9bf9a43418d0338c9ef509465a1a27bb67a1355" -osm-p2p-import@^3.0.3: - version "3.0.3" - resolved "https://registry.yarnpkg.com/osm-p2p-import/-/osm-p2p-import-3.0.3.tgz#70d6daed2ba1c3f4b769b648c3f55a3858f0b7e0" +osm-p2p-import@^3.0.4: + version "3.0.4" + resolved "https://registry.yarnpkg.com/osm-p2p-import/-/osm-p2p-import-3.0.4.tgz#ce288c03a0067b6feb7878730187cf7142292800" dependencies: minimist "^1.2.0" mkdirp "^0.5.1"