Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] POI editable layer #195

Merged
merged 13 commits into from
Oct 25, 2017
65 changes: 41 additions & 24 deletions app/routes/scenarios--create.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ function handler (params, payload, reply) {
const rnSource = payload.roadNetworkSource;
const rnSourceScenarioId = payload.roadNetworkSourceScenario;
const roadNetworkFile = payload.roadNetworkFile;
const poiSource = payload.poiSource;
const poiSourceScenarioId = payload.poiSourceScenario;

return db('projects')
.select('status')
Expand Down Expand Up @@ -97,19 +99,31 @@ function handler (params, payload, reply) {
});
})
// Start operation and return data to continue.
.then(scenario => startOperation(params.projId, scenario.id).then(op => [op, scenario]))
.then(data => {
let [op, scenario] = data;
if (rnSource === 'clone') {
return createScenario(params.projId, scenario.id, op.getId(), rnSource, {rnSourceScenarioId})
.then(() => scenario);
} else if (rnSource === 'new') {
return handleRoadNetworkUpload(scenario, op.getId(), rnSource, roadNetworkFile)
.then(() => scenario);
} else if (rnSource === 'osm') {
return createScenario(params.projId, scenario.id, op.getId(), rnSource)
.then(() => scenario);
.then(scenario => Promise.all([
startOperation(params.projId, scenario.id),
scenario
]))
.then(([op, scenario]) => {
let action = Promise.resolve({ rnSource, poiSource });

// Upload and process file. Add the filename to the data object.
if (rnSource === 'new') {
action = action.then(data => handleRoadNetworkUpload(scenario, op.getId(), rnSource, roadNetworkFile)
.then(res => Object.assign({}, data, res))
);
// Add the rn source id to the data object.
} else if (rnSource === 'clone') {
action = action.then(data => Object.assign({}, data, {rnSourceScenarioId}));
}

// Add the poi source id to the data object.
if (poiSource === 'clone') {
action = action.then(data => Object.assign({}, data, {poiSourceScenarioId}));
}

return action
.then(data => createScenario(params.projId, scenario.id, op.getId(), data))
.then(() => scenario);
})
.then(scenario => reply(scenario))
.catch(err => {
Expand Down Expand Up @@ -165,13 +179,21 @@ export default [
if (result.files.roadNetworkFile) {
payload.roadNetworkFile = result.files.roadNetworkFile[0];
}
if (result.fields.poiSource) {
payload.poiSource = result.fields.poiSource[0];
}
if (result.fields.poiSourceScenario) {
payload.poiSourceScenario = result.fields.poiSourceScenario[0];
}

let validation = Joi.validate(payload, Joi.object().keys({
name: Joi.string().required(),
description: Joi.string(),
roadNetworkSource: Joi.string().valid('clone', 'new', 'osm').required(),
roadNetworkSourceScenario: Joi.number().when('roadNetworkSource', {is: 'clone', then: Joi.required()}),
roadNetworkFile: Joi.object().when('roadNetworkSource', {is: 'new', then: Joi.required()})
roadNetworkFile: Joi.object().when('roadNetworkSource', {is: 'new', then: Joi.required()}),
poiSource: Joi.string().valid('clone').required(),
poiSourceScenario: Joi.number().when('poiSource', {is: 'clone', then: Joi.required()})
}));

if (validation.error) {
Expand Down Expand Up @@ -235,7 +257,9 @@ export default [
name: data.name,
description: data.description,
roadNetworkSource: 'clone',
roadNetworkSourceScenario: request.params.scId
roadNetworkSourceScenario: request.params.scId,
poiSource: 'clone',
poiSourceScenario: request.params.scId
};
return handler(request.params, payload, reply);
})
Expand Down Expand Up @@ -267,20 +291,13 @@ function startOperation (projId, scId) {
});
}

function createScenario (projId, scId, opId, rnSource, data = {}) {
function createScenario (projId, scId, opId, data = {}) {
let action = Promise.resolve();
// In test mode we don't want to start the generation.
// It will be tested in the appropriate place.
if (process.env.DS_ENV === 'test') { return action; }

if (rnSource === 'clone') {
// We need to close the connection to the source scenario before cloning
// the database. This needs to be done in this process. The process ran by
// the service runner won't have access to it.
// action = closeDatabase(projId, data.rnSourceScenarioId);
}

let serviceData = Object.assign({}, {projId, scId, opId, rnSource}, data);
let serviceData = Object.assign({}, {projId, scId, opId}, data);

action.then(() => {
console.log(`p${projId} s${scId}`, 'createScenario');
Expand Down Expand Up @@ -321,5 +338,5 @@ function handleRoadNetworkUpload (scenario, opId, source, roadNetworkFile) {
.then(() => putFileToS3(filePath, roadNetworkFile.path))
// Delete temp file.
.then(() => removeLocalFile(roadNetworkFile.path, true))
.then(() => createScenario(scenario.project_id, scenario.id, opId, source, {roadNetworkFile: fileName}));
.then(() => ({roadNetworkFile: fileName}));
}
2 changes: 1 addition & 1 deletion app/s3/utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
import fs from 'fs';
import fs from 'fs-extra';
import Promise from 'bluebird';

import s3, { bucket } from './';
Expand Down
204 changes: 173 additions & 31 deletions app/services/export-road-network/export-road-network.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import path from 'path';
import obj2osm from 'obj2osm';
import osmP2PApi from 'osm-p2p-server/api/index';
import through2 from 'through2';
import osmtogeojson from 'osmtogeojson';

import config from '../../config';
import { getDatabase } from '../rra-osm-p2p';
Expand Down Expand Up @@ -51,17 +52,22 @@ export function exportRoadNetwork (e) {

let op = new Operation(db);
op.loadById(opId)
.then(op => op.log('road-network', {message: 'Updating road network'}))
.then(op => {
.then(op => op.log('road-network', {message: 'Updating road network and pois'}))
// Load scenario poi types.
.then(() => db('scenarios_files')
.select('subtype')
.where('type', 'poi')
.where('project_id', projId)
.where('scenario_id', scId)
.then(types => types.map(o => o.subtype))
)
.then(poiTypes => {
const bbox = [-180, -90, 180, 90];
const toOsmOptions = {
bounds: {minlon: bbox[0], minlat: bbox[1], maxlon: bbox[2], maxlat: bbox[3]}
};
const osmDb = getDatabase(projId, scId);
const fileName = `road-network_${Date.now()}`;
const filePath = `scenario-${scId}/${fileName}`;

let formatTransform = obj2osm(toOsmOptions);
const formatTransform = obj2osm(toOsmOptions);

formatTransform.on('error', (err) => {
throw err;
Expand All @@ -70,32 +76,99 @@ export function exportRoadNetwork (e) {
logger && logger.log('Exporting data from osm-p2p');

let stream = osmP2PApi(osmDb).getMap(bbox, {order: 'type'})
.pipe(processOSMP2PExport())
.pipe(formatTransform);

return putFileStream(filePath, stream)
.then(() => logger && logger.log('Exporting data from osm-p2p... done'))
// Get previous file.
.then(() => db('scenarios_files')
.select('path')
.where('type', 'road-network')
.where('project_id', projId)
.where('scenario_id', scId)
.first()
)
// Delete from storage.
.then(file => removeFile(file.path))
// Add entry to the database
.then(() => db('scenarios_files')
.update({
name: fileName,
path: filePath,
updated_at: (new Date())
.pipe(processOSMP2PExport());

// Extract the POI into a promise and continue with the road network.
let splitting = collectPOIs(stream, poiTypes);

stream = splitting.stream.pipe(formatTransform);

function processRN () {
const fileName = `road-network_${Date.now()}`;
const filePath = `scenario-${scId}/${fileName}`;

return putFileStream(filePath, stream)
// Get previous file.
.then(() => db('scenarios_files')
.select('path')
.where('type', 'road-network')
.where('project_id', projId)
.where('scenario_id', scId)
.first()
)
// Delete from storage.
.then(file => removeFile(file.path))
// Add entry to the database
.then(() => db('scenarios_files')
.update({
name: fileName,
path: filePath,
updated_at: (new Date())
})
.where('type', 'road-network')
.where('project_id', projId)
.where('scenario_id', scId)
);
}

function processPOI () {
return splitting.deferred
// Convert to Feature Collection from Overpass style nodes.
.then(data => {
let fc = osmtogeojson({elements: data});
// Group features by its ram_poi_type.
let groups = fc.features.reduce((acc, feat) => {
let type = feat.properties.ram_poi_type;
if (!acc[type]) {
acc[type] = {
type: 'FeatureCollection',
features: []
};
}
acc[type].features.push(feat);
return acc;
}, {});

return groups;
})
.where('type', 'road-network')
.where('project_id', projId)
.where('scenario_id', scId)
);
.then(groups => Promise.all(Object.keys(groups).map(key => {
const fileName = `poi_${key}_${Date.now()}`;
const filePath = `scenario-${scId}/${fileName}`;

let data = JSON.stringify(groups[key]);

return putFileStream(filePath, data)
// Get previous file.
.then(() => db('scenarios_files')
.select('id', 'path')
.where('type', 'poi')
.where('subtype', key)
.where('project_id', projId)
.where('scenario_id', scId)
.first()
)
// Delete from storage.
.then(file => removeFile(file.path)
.then(() => file.id)
)
// Add entry to the database
.then(id => db('scenarios_files')
.update({
name: fileName,
path: filePath,
updated_at: (new Date())
})
.where('type', 'poi')
.where('id', id)
.where('project_id', projId)
.where('scenario_id', scId)
);
})));
}

return processRN()
.then(() => processPOI())
.then(() => logger && logger.log('Exporting data from osm-p2p... done'));
})
// Note: There's no need to close the osm-p2p-db because when the process
// terminates the connection is automatically closed.
Expand Down Expand Up @@ -140,3 +213,72 @@ function processOSMP2PExport () {
cb(null, data);
});
}

function collectPOIs (stream, poiTypes) {
let rn = [];
let pois = [];
let nodeStack = {};

// Create a sort of deferred.
// This promise will collect the POI and return them for
// later processing.
let _resolve;
const deferred = new Promise((resolve) => {
_resolve = resolve;
});

let dbgSkipped = 0;

const write = (data, enc, next) => {
if (data.type === 'node') {
if (data.tags && data.tags.amenity) {
// Discard nodes with ram_poi_type different than what was uploaded.
if (data.tags.ram_poi_type && poiTypes.indexOf(data.tags.ram_poi_type) !== -1) {
pois.push(data);
} else {
dbgSkipped++;
}
} else {
nodeStack[data.id] = data;
}
} else if (data.type === 'way') {
if (data.tags && data.tags.amenity) {
// Discard ways with ram_poi_type different than what was uploaded.
if (data.tags.ram_poi_type && poiTypes.indexOf(data.tags.ram_poi_type) !== -1) {
pois.push(data);
data.nodes.forEach(n => {
if (nodeStack[n]) {
pois.push(nodeStack[n]);
delete nodeStack[n];
}
});
} else {
dbgSkipped++;
}
} else {
rn.push(data);
data.nodes.forEach(n => {
if (nodeStack[n]) {
rn.push(nodeStack[n]);
delete nodeStack[n];
}
});
}
}
next();
};

const end = function (next) {
DEBUG && console.log('collectPOIs', 'missing/invalid ram_poi_type', dbgSkipped);
// Sort.
pois.sort(a => a.type === 'node' ? -1 : 1);
setImmediate(() => _resolve(pois));

rn.sort(a => a.type === 'node' ? -1 : 1);
rn.forEach(o => this.push(o));
next();
};

stream = stream.pipe(through2.obj(write, end));
return {stream, deferred};
}
Loading