Skip to content

Commit

Permalink
Merge pull request #195 from WorldBank-Transport/feature/poi-import
Browse files Browse the repository at this point in the history
[WIP] POI editable layer
  • Loading branch information
olafveerman authored Oct 25, 2017
2 parents 9389bef + 3a814a6 commit 591c328
Show file tree
Hide file tree
Showing 11 changed files with 523 additions and 184 deletions.
65 changes: 41 additions & 24 deletions app/routes/scenarios--create.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ function handler (params, payload, reply) {
const rnSource = payload.roadNetworkSource;
const rnSourceScenarioId = payload.roadNetworkSourceScenario;
const roadNetworkFile = payload.roadNetworkFile;
const poiSource = payload.poiSource;
const poiSourceScenarioId = payload.poiSourceScenario;

return db('projects')
.select('status')
Expand Down Expand Up @@ -97,19 +99,31 @@ function handler (params, payload, reply) {
});
})
// Start operation and return data to continue.
.then(scenario => startOperation(params.projId, scenario.id).then(op => [op, scenario]))
.then(data => {
let [op, scenario] = data;
if (rnSource === 'clone') {
return createScenario(params.projId, scenario.id, op.getId(), rnSource, {rnSourceScenarioId})
.then(() => scenario);
} else if (rnSource === 'new') {
return handleRoadNetworkUpload(scenario, op.getId(), rnSource, roadNetworkFile)
.then(() => scenario);
} else if (rnSource === 'osm') {
return createScenario(params.projId, scenario.id, op.getId(), rnSource)
.then(() => scenario);
.then(scenario => Promise.all([
startOperation(params.projId, scenario.id),
scenario
]))
.then(([op, scenario]) => {
let action = Promise.resolve({ rnSource, poiSource });

// Upload and process file. Add the filename to the data object.
if (rnSource === 'new') {
action = action.then(data => handleRoadNetworkUpload(scenario, op.getId(), rnSource, roadNetworkFile)
.then(res => Object.assign({}, data, res))
);
// Add the rn source id to the data object.
} else if (rnSource === 'clone') {
action = action.then(data => Object.assign({}, data, {rnSourceScenarioId}));
}

// Add the poi source id to the data object.
if (poiSource === 'clone') {
action = action.then(data => Object.assign({}, data, {poiSourceScenarioId}));
}

return action
.then(data => createScenario(params.projId, scenario.id, op.getId(), data))
.then(() => scenario);
})
.then(scenario => reply(scenario))
.catch(err => {
Expand Down Expand Up @@ -165,13 +179,21 @@ export default [
if (result.files.roadNetworkFile) {
payload.roadNetworkFile = result.files.roadNetworkFile[0];
}
if (result.fields.poiSource) {
payload.poiSource = result.fields.poiSource[0];
}
if (result.fields.poiSourceScenario) {
payload.poiSourceScenario = result.fields.poiSourceScenario[0];
}

let validation = Joi.validate(payload, Joi.object().keys({
name: Joi.string().required(),
description: Joi.string(),
roadNetworkSource: Joi.string().valid('clone', 'new', 'osm').required(),
roadNetworkSourceScenario: Joi.number().when('roadNetworkSource', {is: 'clone', then: Joi.required()}),
roadNetworkFile: Joi.object().when('roadNetworkSource', {is: 'new', then: Joi.required()})
roadNetworkFile: Joi.object().when('roadNetworkSource', {is: 'new', then: Joi.required()}),
poiSource: Joi.string().valid('clone').required(),
poiSourceScenario: Joi.number().when('poiSource', {is: 'clone', then: Joi.required()})
}));

if (validation.error) {
Expand Down Expand Up @@ -235,7 +257,9 @@ export default [
name: data.name,
description: data.description,
roadNetworkSource: 'clone',
roadNetworkSourceScenario: request.params.scId
roadNetworkSourceScenario: request.params.scId,
poiSource: 'clone',
poiSourceScenario: request.params.scId
};
return handler(request.params, payload, reply);
})
Expand Down Expand Up @@ -267,20 +291,13 @@ function startOperation (projId, scId) {
});
}

function createScenario (projId, scId, opId, rnSource, data = {}) {
function createScenario (projId, scId, opId, data = {}) {
let action = Promise.resolve();
// In test mode we don't want to start the generation.
// It will be tested in the appropriate place.
if (process.env.DS_ENV === 'test') { return action; }

if (rnSource === 'clone') {
// We need to close the connection to the source scenario before cloning
// the database. This needs to be done in this process. The process ran by
// the service runner won't have access to it.
// action = closeDatabase(projId, data.rnSourceScenarioId);
}

let serviceData = Object.assign({}, {projId, scId, opId, rnSource}, data);
let serviceData = Object.assign({}, {projId, scId, opId}, data);

action.then(() => {
console.log(`p${projId} s${scId}`, 'createScenario');
Expand Down Expand Up @@ -321,5 +338,5 @@ function handleRoadNetworkUpload (scenario, opId, source, roadNetworkFile) {
.then(() => putFileToS3(filePath, roadNetworkFile.path))
// Delete temp file.
.then(() => removeLocalFile(roadNetworkFile.path, true))
.then(() => createScenario(scenario.project_id, scenario.id, opId, source, {roadNetworkFile: fileName}));
.then(() => ({roadNetworkFile: fileName}));
}
2 changes: 1 addition & 1 deletion app/s3/utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
import fs from 'fs';
import fs from 'fs-extra';
import Promise from 'bluebird';

import s3, { bucket } from './';
Expand Down
204 changes: 173 additions & 31 deletions app/services/export-road-network/export-road-network.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import path from 'path';
import obj2osm from 'obj2osm';
import osmP2PApi from 'osm-p2p-server/api/index';
import through2 from 'through2';
import osmtogeojson from 'osmtogeojson';

import config from '../../config';
import { getDatabase } from '../rra-osm-p2p';
Expand Down Expand Up @@ -51,17 +52,22 @@ export function exportRoadNetwork (e) {

let op = new Operation(db);
op.loadById(opId)
.then(op => op.log('road-network', {message: 'Updating road network'}))
.then(op => {
.then(op => op.log('road-network', {message: 'Updating road network and pois'}))
// Load scenario poi types.
.then(() => db('scenarios_files')
.select('subtype')
.where('type', 'poi')
.where('project_id', projId)
.where('scenario_id', scId)
.then(types => types.map(o => o.subtype))
)
.then(poiTypes => {
const bbox = [-180, -90, 180, 90];
const toOsmOptions = {
bounds: {minlon: bbox[0], minlat: bbox[1], maxlon: bbox[2], maxlat: bbox[3]}
};
const osmDb = getDatabase(projId, scId);
const fileName = `road-network_${Date.now()}`;
const filePath = `scenario-${scId}/${fileName}`;

let formatTransform = obj2osm(toOsmOptions);
const formatTransform = obj2osm(toOsmOptions);

formatTransform.on('error', (err) => {
throw err;
Expand All @@ -70,32 +76,99 @@ export function exportRoadNetwork (e) {
logger && logger.log('Exporting data from osm-p2p');

let stream = osmP2PApi(osmDb).getMap(bbox, {order: 'type'})
.pipe(processOSMP2PExport())
.pipe(formatTransform);

return putFileStream(filePath, stream)
.then(() => logger && logger.log('Exporting data from osm-p2p... done'))
// Get previous file.
.then(() => db('scenarios_files')
.select('path')
.where('type', 'road-network')
.where('project_id', projId)
.where('scenario_id', scId)
.first()
)
// Delete from storage.
.then(file => removeFile(file.path))
// Add entry to the database
.then(() => db('scenarios_files')
.update({
name: fileName,
path: filePath,
updated_at: (new Date())
.pipe(processOSMP2PExport());

// Extract the POI into a promise and continue with the road network.
let splitting = collectPOIs(stream, poiTypes);

stream = splitting.stream.pipe(formatTransform);

function processRN () {
const fileName = `road-network_${Date.now()}`;
const filePath = `scenario-${scId}/${fileName}`;

return putFileStream(filePath, stream)
// Get previous file.
.then(() => db('scenarios_files')
.select('path')
.where('type', 'road-network')
.where('project_id', projId)
.where('scenario_id', scId)
.first()
)
// Delete from storage.
.then(file => removeFile(file.path))
// Add entry to the database
.then(() => db('scenarios_files')
.update({
name: fileName,
path: filePath,
updated_at: (new Date())
})
.where('type', 'road-network')
.where('project_id', projId)
.where('scenario_id', scId)
);
}

function processPOI () {
return splitting.deferred
// Convert to Feature Collection from Overpass style nodes.
.then(data => {
let fc = osmtogeojson({elements: data});
// Group features by its ram_poi_type.
let groups = fc.features.reduce((acc, feat) => {
let type = feat.properties.ram_poi_type;
if (!acc[type]) {
acc[type] = {
type: 'FeatureCollection',
features: []
};
}
acc[type].features.push(feat);
return acc;
}, {});

return groups;
})
.where('type', 'road-network')
.where('project_id', projId)
.where('scenario_id', scId)
);
.then(groups => Promise.all(Object.keys(groups).map(key => {
const fileName = `poi_${key}_${Date.now()}`;
const filePath = `scenario-${scId}/${fileName}`;

let data = JSON.stringify(groups[key]);

return putFileStream(filePath, data)
// Get previous file.
.then(() => db('scenarios_files')
.select('id', 'path')
.where('type', 'poi')
.where('subtype', key)
.where('project_id', projId)
.where('scenario_id', scId)
.first()
)
// Delete from storage.
.then(file => removeFile(file.path)
.then(() => file.id)
)
// Add entry to the database
.then(id => db('scenarios_files')
.update({
name: fileName,
path: filePath,
updated_at: (new Date())
})
.where('type', 'poi')
.where('id', id)
.where('project_id', projId)
.where('scenario_id', scId)
);
})));
}

return processRN()
.then(() => processPOI())
.then(() => logger && logger.log('Exporting data from osm-p2p... done'));
})
// Note: There's no need to close the osm-p2p-db because when the process
// terminates the connection is automatically closed.
Expand Down Expand Up @@ -140,3 +213,72 @@ function processOSMP2PExport () {
cb(null, data);
});
}

function collectPOIs (stream, poiTypes) {
let rn = [];
let pois = [];
let nodeStack = {};

// Create a sort of deferred.
// This promise will collect the POI and return them for
// later processing.
let _resolve;
const deferred = new Promise((resolve) => {
_resolve = resolve;
});

let dbgSkipped = 0;

const write = (data, enc, next) => {
if (data.type === 'node') {
if (data.tags && data.tags.amenity) {
// Discard nodes with ram_poi_type different than what was uploaded.
if (data.tags.ram_poi_type && poiTypes.indexOf(data.tags.ram_poi_type) !== -1) {
pois.push(data);
} else {
dbgSkipped++;
}
} else {
nodeStack[data.id] = data;
}
} else if (data.type === 'way') {
if (data.tags && data.tags.amenity) {
// Discard ways with ram_poi_type different than what was uploaded.
if (data.tags.ram_poi_type && poiTypes.indexOf(data.tags.ram_poi_type) !== -1) {
pois.push(data);
data.nodes.forEach(n => {
if (nodeStack[n]) {
pois.push(nodeStack[n]);
delete nodeStack[n];
}
});
} else {
dbgSkipped++;
}
} else {
rn.push(data);
data.nodes.forEach(n => {
if (nodeStack[n]) {
rn.push(nodeStack[n]);
delete nodeStack[n];
}
});
}
}
next();
};

const end = function (next) {
DEBUG && console.log('collectPOIs', 'missing/invalid ram_poi_type', dbgSkipped);
// Sort.
pois.sort(a => a.type === 'node' ? -1 : 1);
setImmediate(() => _resolve(pois));

rn.sort(a => a.type === 'node' ? -1 : 1);
rn.forEach(o => this.push(o));
next();
};

stream = stream.pipe(through2.obj(write, end));
return {stream, deferred};
}
Loading

0 comments on commit 591c328

Please sign in to comment.