From 67cc4aa1bbf07f089388fda091b8898204e4c41e Mon Sep 17 00:00:00 2001 From: Jennings Anderson Date: Tue, 9 Oct 2018 17:52:40 -0600 Subject: [PATCH] added a readme and described example use case --- LICENSE | 2 +- README.md | 48 ++ examples/file.jsonseq | 5 + examples/index.js | 16 + examples/map.js | 8 + .../map-geom-reconstruction.js | 280 ----------- .../node-history-builder.js | 78 ---- geometry-reconstruction/old-index.js | 332 -------------- .../relation-history-builder.js | 69 --- .../way-history-builder.js | 433 ------------------ index-geom.js | 45 -- mbview.html | 349 -------------- package-lock.json | 93 +--- package.json | 36 +- src/cover.js | 33 -- src/index.js | 92 +--- src/mbtiles.js | 41 -- src/remote.js | 26 -- src/vt.js | 30 -- src/worker.js | 43 +- 20 files changed, 129 insertions(+), 1930 deletions(-) create mode 100644 README.md create mode 100644 examples/file.jsonseq create mode 100644 examples/index.js create mode 100644 examples/map.js delete mode 100644 geometry-reconstruction/map-geom-reconstruction.js delete mode 100644 geometry-reconstruction/node-history-builder.js delete mode 100644 geometry-reconstruction/old-index.js delete mode 100644 geometry-reconstruction/relation-history-builder.js delete mode 100644 geometry-reconstruction/way-history-builder.js delete mode 100644 index-geom.js delete mode 100644 mbview.html delete mode 100644 src/cover.js delete mode 100644 src/mbtiles.js delete mode 100644 src/remote.js delete mode 100644 src/vt.js diff --git a/LICENSE b/LICENSE index 9ba35cf..0ea1287 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -This utility is built on a modified version of tile-reduce by Mapbox, original license and +This utility is built on a modified version of tile-reduce by Mapbox, original license and copyright below: ISC License diff --git a/README.md b/README.md new file mode 100644 index 0000000..ef2b9d3 --- /dev/null +++ b/README.md @@ -0,0 +1,48 @@ +# Stream-Reduce + +This is a simplified framework built off Mapbox's [tile-reduce](//github.com/mapbox/tile-reduce) to perform map-reduce functions against large files of line-delimited JSON. This simply removes all of the `tile` processing and instead passes each new line in the file to the map script. + +This is currently used for geometry-reconstruction of historical OSM objects as output from the [OSM-Wayback](//github.com/osmlab/osm-wayback) utility + +### Example Implementation + +#### File: `example/index.js` + + var count = 0; + + streamReduce({ + map: path.join(__dirname, 'map.js'), //Map function + file: path.join(__dirname, 'file.jsonseq'), //Input file (lines of JSON) + maxWorkers:10 // The number of cpus you'd like to use + }) + .on('reduce', function(res) { + count+=res + }) + .on('end', function() { + console.log("Finished with count value: "+count) + }); + +#### File: `example/map.js` + + module.exports = function(line, writeData, done) { + var object = JSON.parse(line.toString()); + done(object.count) + }) + +#### file.jsonseq + + {"count":10} + {"count":10} + {"count":10} + {"count":10} + {"count":10} + +#### Use + $ node examples/index.js + + >>Starting up 2 workers... Job started. + >>Processing lines from file: examples/file.jsonseq + >>5 lines processed in 0s. + >>Finished, value of count: 50 + + diff --git a/examples/file.jsonseq b/examples/file.jsonseq new file mode 100644 index 0000000..ba8accf --- /dev/null +++ b/examples/file.jsonseq @@ -0,0 +1,5 @@ +{"count":10} +{"count":10} +{"count":10} +{"count":10} +{"count":10} diff --git a/examples/index.js b/examples/index.js new file mode 100644 index 0000000..32a2737 --- /dev/null +++ b/examples/index.js @@ -0,0 +1,16 @@ +var streamReduce = require('../src'); +var path = require('path') + +var count = 0; + +streamReduce({ + map: path.join(__dirname, 'map.js'), + file: path.join(__dirname, 'file.jsonseq'), + maxWorkers:5 // The number of cpus you'd like to use +}) +.on('reduce', function(res) { + count+=res +}) +.on('end', function() { + console.log("Finished, value of count: "+count) +}); diff --git a/examples/map.js b/examples/map.js new file mode 100644 index 0000000..2d0e4ae --- /dev/null +++ b/examples/map.js @@ -0,0 +1,8 @@ +'use strict'; + +module.exports = function(line, writeData, done) { + + var object = JSON.parse(line.toString()); + + done(null, object.count) +} diff --git a/geometry-reconstruction/map-geom-reconstruction.js b/geometry-reconstruction/map-geom-reconstruction.js deleted file mode 100644 index d147018..0000000 --- a/geometry-reconstruction/map-geom-reconstruction.js +++ /dev/null @@ -1,280 +0,0 @@ -'use strict'; - -var topojson = require("topojson"); -var WayGeometryBuilder = require('./way-history-builder.js') -var NodeGeometryBuilder = require('./node-history-builder.js') -var RelationGeometryBuilder = require('./relation-history-builder.js') - -/* -* Helper function to reconstruct properties between major Versions from diffs -*/ -function reconstructMajorOSMTags(baseObject,newObject){ - if (newObject.hasOwnProperty('aA') && newObject.aA){ - Object.keys(newObject.aA).forEach(function(key){ - baseObject[key] = newObject.aA[key] - }) - } - if (newObject.hasOwnProperty('aM') && newObject.aM){ - Object.keys(newObject.aM).forEach(function(key){ - baseObject[key] = newObject.aM[key][1] - }) - } - if (newObject.hasOwnProperty('aD') && newObject.aD){ - Object.keys(newObject.aD).forEach(function(key){ - delete baseObject[key] - }) - } - return baseObject -} - -const DEBUG = true; - -//TODO: abstract this out to INDEX -const CONFIG = { - //Choose this... - 'GEOMETRY_ONLY' : false, //Only @validSince, @validUntil on ALL objects - - //OR - 'INCLUDE_DIFFS_ON_MAJOR_VERSIONS' : true,//DIFFS don't go on minor versions - 'INCLUDE_FULL_PROPERTIES_ON_MAJOR_VERSIONS' : false, - - //Optional - 'INCLUDE_FULL_PROPERTIES_ON_MINOR_VERSIONS' : false, - - //ONLY ONE OF THESE SHOULD BE SET... - 'WRITE_HISTORY_COMPLETE_OBJECT' : false, - 'WRITE_EVERY_GEOMETRY' : false, - 'WRITE_TOPOJSON_HISTORY' : true -} - -module.exports = function(line, writeData, done) { - var status = { - lineProcessed : false, - noHistory : false, - jsonParsingError : false, - noNodeLocations : 0, - geometryBuilderFailedToDefine : false, - totalGeometries : 0, - processLineFailures : false, - topoJSONEncodingError : false, - allGeometriesByteSize :0, - historyCompleteSingleObjectByteSize :0, - topojsonHistoryByteSize :0 - } - - var geometryBuilder; - var string - - try{ - var object = JSON.parse(line.toString()); - - // All objects should have a `@history` property when they get to this stage - if (object.properties.hasOwnProperty('@history')){ - - //If it's a node, initialize a simpler geometry builder - if (object.properties['@type']==='node'){ - - geometryBuilder = new NodeGeometryBuilder({ - 'history' : object.properties['@history'], - 'osmID' : object.properties['@id'] - }, CONFIG) - - //If it's not a node, then it should have a nodeLocations - }else if (object.hasOwnProperty('nodeLocations')) { - geometryBuilder = new WayGeometryBuilder({ - 'nodeLocations' : object.nodeLocations, - 'history' : object.properties['@history'], - 'osmID' : object.properties['@id'] - }, CONFIG) - }else if (object.properties['@type']==='relation'){ - geometryBuilder = new RelationGeometryBuilder({ - 'history' : object.properties['@history'], - 'osmID' : object.properties['@id'], - 'geometry' : object.geometry - }) - }else{ - status.noNodeLocations++; - } - - //if Geometry Builder was defined, keep going! - if (geometryBuilder){ - /* Populates geometryBuilder.historicalGeometries object: - historicalGeometries = { - : [minorVersion0, minorVersion1, minorVersion1, .. ], - : [minorVersion0, minorVersion1, minorVersion1, .. ], - = validSince){ - filteredNodes.push(node) - }else{ - //Not the same changeset and not >= validSince - prevNodeNotAdded = Object.assign({},node); - } - - prevNode = node; - }) - //Safety condition 1: If there are now NO NODES, return prevNode - if(filteredNodes.length==0){ - return [prevNode]; - } - - //If the first node in the list is TOO new, add prevNodeNotAdded - if(prevNodeNotAdded){ - if(filteredNodes[0].t > validSince+CHANGESET_THRESHOLD){ - filteredNodes.unshift(prevNodeNotAdded) - } - } - - }else{ - filteredNodes = nodeVersions; - } - - //filteredNodes now only has nodes greater than given timestamp; - - //stay safe from atomic changes below... - var filterable = JSON.parse(JSON.stringify(filteredNodes)); - - //If we have a validUntil, then filter the future nodes out - if (validUntil){ - - //OVERRIDE 2: IF there is a matching changeset, be sure it doesn't get abandoned - filterable = filterable.filter(function(v){return (v.t < validUntil || v.c==changeset)}) - - if(filterable.length==0){ - //If this removed all nodes, then return the most recent version; there is likely a _deleted_ version later - if (prevNode){ - return [prevNode] - }else{ - return false; - } - } - } - if (filterable.length==1){ - return filterable; //Only 1 possible case, return it - }else{ - - //OVER RIDE 2: If there aren't any different geometries, don't return it... too expensive - try{ - var diffGeoms = [filterable[0]]; //basic case, just 1 - - var prev = filterable[0].p - - for(var i=1;i ")) - } - var minorVersions; - - //Expand out the versions array - var maxLen = _.max(versions.map(function(a){return a.length})) - - if(DEBUG){console.warn("\n" + maxLen + "\n")} - - if(maxLen>1){ //There are minor versions! - minorVersions = [[]]; - - //Iterate through each of the nodes, building geometries as they exist. - for(var i=0; i maxNode.t){ - maxNode = mV[idx] - } - } - mapped.push([maxNode, mV]) - }) - - minorVersions = undefined; - - sortedMinorVersions = _.sortBy(mapped, function(x){return x[0].t}) - - if(DEBUG){ - console.warn("Minor Versions: ") - } - - var countableMinorVersions = []; - var prevTimestamp = sortedMinorVersions[0][0].t; - var minorVersionIdx = 1; - sortedMinorVersions.forEach(function(sorted){ - var mV = sorted[1] - // console.warn("PREV TIMESTAMP: " + (new Date(prevTimestamp*1000)).toISOString()) - if (sorted[0].t > prevTimestamp + MINOR_VERSION_SECOND_THRESHOLD){ - if(DEBUG){ - console.warn(mV.map(function(n){return n.h}).join(" > ")) - console.warn(sorted[0].h, (new Date(sorted[0].t*1000)).toISOString()) - console.warn() - } - countableMinorVersions.push({ - minorVersion: minorVersionIdx, - changeset: sorted[0].c, - validSince: sorted[0].t, - user: sorted[0].h, - uid: sorted[0].u, - coordinates:mV.map(function(p){return p.p}) - }) - minorVersionIdx++; - } - /* else{ - if(DEBUG){ - console.warn("SKIPPED: ") - console.warn(mV.map(function(n){return n.h}).join(" > ")) - console.warn(sorted[0].h, (new Date(sorted[0].t*1000)).toISOString()) - } - } */ - prevTimestamp = sorted[0].t; - }) - } - - return { - majorVersion: majorVersion.map(function(g){return g.p}), - minorVersions: countableMinorVersions - } -} - - -/** - * Iterate through an OSM object's history and construct all possible geometries - * - * Expects: Nothing, call on Object. - * - * Returns: Nothing, populates the ``historicalGeometries`` attribute. -*/ -this.buildGeometries = function(){ - var that = this; - var validSince, validUntil; - - if(DEBUG){ - console.warn(`\n\nReconstructing Geometries for ID: ${that.osmID}\n==================`) - } - - //Versions is an object's history, length should correspond to current 'v' - for(var i=0; i0){ - validSince = that.versions[i].t - } - // //If there's another version to come, set validUntil to the next version - if(i < that.versions.length-1){ - validUntil = that.versions[i+1].t - CHANGESET_THRESHOLD - } - - //Now construct all possible geometries for this Major Version: - //Breaking Case: If it's not 'visible' and version has no nodes, then don't try to create a geometry... - if( that.versions[i].hasOwnProperty('n') ){ - //Construct all possible geometries for this version, based on the nodeRefs. - - var majorVersionNumber = that.versions[i]['i'] - - var geometries = that.buildAllPossibleVersionGeometries({ - nodeRefs: that.versions[i].n, - validSince: validSince, - validUntil: validUntil, - changeset: that.versions[i].c - }) - - if(geometries.majorVersion){ - that.historicalGeometries[majorVersionNumber] = [{ - type:"Feature", - properties:{ - '@version': majorVersionNumber, - '@minorVersion': 0, - '@user' : that.versions[i].h, - '@changeset' : that.versions[i].c, - '@uid' : that.versions[i].u, - '@validSince': that.versions[i].t, - '@validUntil': (i0){ - //Iterate through the minorVersions, amending the validUntil fields... - //Reset the validUntil of the major Version with minorVersion_1 - that.historicalGeometries[majorVersionNumber][0].properties['@validUntil'] = geometries.minorVersions[0]["validSince"] - - for(var j=0; j < geometries.minorVersions.length; j++){ - var mV = geometries.minorVersions[j]; - that.historicalGeometries[majorVersionNumber].push({ - type:"Feature", - geometry:{ - type:"LineString", - coordinates: mV.coordinates - }, - properties:{ - '@version':majorVersionNumber, - '@minorVersion':mV.minorVersion, - '@changeset':mV.changeset, - '@user':mV.user, - '@uid' :mV.uid, - '@validSince':mV.validSince, - '@validUntil': (j - - - - mbview - vector - - - - - - - - - - - - - - - - - -
- - - - - diff --git a/package-lock.json b/package-lock.json index dbcf9f1..0cae1ca 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4,11 +4,6 @@ "lockfileVersion": 1, "requires": true, "dependencies": { - "back": { - "version": "0.1.5", - "resolved": "https://registry.npmjs.org/back/-/back-0.1.5.tgz", - "integrity": "sha1-NCuWuARlewPsmjHySKEfIAYI3MI=" - }, "binary-split": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/binary-split/-/binary-split-1.0.3.tgz", @@ -22,36 +17,21 @@ "resolved": "https://registry.npmjs.org/buffer-shims/-/buffer-shims-1.0.0.tgz", "integrity": "sha1-mXjOMXOIxkmth5MCjDR37wRKi1E=" }, - "commander": { - "version": "2.19.0", - "resolved": "https://registry.npmjs.org/commander/-/commander-2.19.0.tgz", - "integrity": "sha512-6tvAOO+D6OENvRAh524Dh9jcfKTYDQAqvqezbCW82xj5X0pSrcpxtvRKHLG0yBY6SD7PSDrJaj+0AiOcKVd1Xg==" + "child_process": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/child_process/-/child_process-1.0.2.tgz", + "integrity": "sha1-sffn/HPSXn/R1FWtyU4UODAYK1o=" }, "core-util-is": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", "integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=" }, - "errs": { - "version": "0.3.2", - "resolved": "https://registry.npmjs.org/errs/-/errs-0.3.2.tgz", - "integrity": "sha1-eYCZstvTfKK8dJ5TinwTB9C1BJk=" - }, "events": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/events/-/events-3.0.0.tgz", "integrity": "sha512-Dc381HFWJzEOhQ+d8pkNon++bk9h6cdAoAj4iE6Q4y6xgTzySWXlKn05/TVNpjnfRqi/X0EpJEJohPjNI3zpVA==" }, - "fork": { - "version": "1.3.1", - "resolved": "https://registry.npmjs.org/fork/-/fork-1.3.1.tgz", - "integrity": "sha1-OQKKz5MXjKCJ+N0tnvbAUN/8GIU=", - "requires": { - "back": "~0.1.5", - "errs": "^0.3.2", - "killer": "^0.1.0" - } - }, "fs": { "version": "0.0.1-security", "resolved": "https://registry.npmjs.org/fs/-/fs-0.0.1-security.tgz", @@ -67,15 +47,10 @@ "resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz", "integrity": "sha1-u5NdSFgsuhaMBoNJV6VKPgcSTxE=" }, - "killer": { - "version": "0.1.0", - "resolved": "https://registry.npmjs.org/killer/-/killer-0.1.0.tgz", - "integrity": "sha1-l6Lz9PqRc7qRvSFzI2pdXyiI9go=" - }, - "lodash": { - "version": "4.17.11", - "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.11.tgz", - "integrity": "sha512-cQKh8igo5QUhZ7lg38DYWAxMvjSAKG0A8wGSVimP07SIUEK2UO+arSRKbRZWtelMtN5V0Hkwh5ryOto/SshYIg==" + "os": { + "version": "0.1.1", + "resolved": "https://registry.npmjs.org/os/-/os-0.1.1.tgz", + "integrity": "sha1-IIhF6J4ZOtTZcUdLk5R3NqVtE/M=" }, "path": { "version": "0.12.7", @@ -120,21 +95,6 @@ "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" }, - "split": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/split/-/split-1.0.1.tgz", - "integrity": "sha1-YFvZvjA6pZ+zX5Ip++oN3snqB9k=", - "requires": { - "through": "2" - }, - "dependencies": { - "through": { - "version": "2.3.8", - "resolved": "https://registry.npmjs.org/through/-/through-2.3.8.tgz", - "integrity": "sha1-DdTJ/6q8NXlgsbckEV1+Doai4fU=" - } - } - }, "stream-array": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/stream-array/-/stream-array-1.1.2.tgz", @@ -186,43 +146,6 @@ "xtend": "~4.0.1" } }, - "topojson": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/topojson/-/topojson-3.0.2.tgz", - "integrity": "sha512-u3zeuL6WEVL0dmsRn7uHZKc4Ao4gpW3sORUv+N3ezLTvY3JdCuyg0hvpWiIfFw8p/JwVN++SvAsFgcFEeR15rQ==", - "requires": { - "topojson-client": "3.0.0", - "topojson-server": "3.0.0", - "topojson-simplify": "3.0.2" - }, - "dependencies": { - "topojson-client": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/topojson-client/-/topojson-client-3.0.0.tgz", - "integrity": "sha1-H5kpOnfvQqRI0DKoGqmCtz82DS8=", - "requires": { - "commander": "2" - } - }, - "topojson-server": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/topojson-server/-/topojson-server-3.0.0.tgz", - "integrity": "sha1-N4546Hw5cqe1vixdYENptrrmnF4=", - "requires": { - "commander": "2" - } - }, - "topojson-simplify": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/topojson-simplify/-/topojson-simplify-3.0.2.tgz", - "integrity": "sha512-gyYSVRt4jO/0RJXKZQPzTDQRWV+D/nOfiljNUv0HBXslFLtq3yxRHrl7jbrjdbda5Ytdr7M8BZUI4OxU7tnbRQ==", - "requires": { - "commander": "2", - "topojson-client": "3" - } - } - } - }, "util": { "version": "0.10.4", "resolved": "https://registry.npmjs.org/util/-/util-0.10.4.tgz", diff --git a/package.json b/package.json index 49791ac..1bcb89d 100644 --- a/package.json +++ b/package.json @@ -1,33 +1,31 @@ { "name": "stream-reduce", "version": "1.0.0", - "description": "Based on Mapbox's tile-reduce, this is an extensible streaming-based map-reduce framework", - "main": "index.js", + "description": "Based on Mapbox's tile-reduce, a framework for map-reduce jobs in Node from streaming input", + "main": "src/index.js", + "dependencies": { + "binary-split": "^1.0.3", + "child_process": "^1.0.2", + "events": "^3.0.0", + "fs": "0.0.1-security", + "os": "^0.1.1", + "path": "^0.12.7", + "queue-async": "^1.2.1", + "stream-array": "^1.1.2", + "vm": "^0.1.0" + }, + "devDependencies": {}, "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "repository": { "type": "git", - "url": "git+https://github.com/jenningsanderson/geometry-reconstruction.git" + "url": "git+https://github.com/jenningsanderson/stream-reduce.git" }, "author": "", "license": "ISC", "bugs": { - "url": "https://github.com/jenningsanderson/geometry-reconstruction/issues" + "url": "https://github.com/jenningsanderson/stream-reduce/issues" }, - "homepage": "https://github.com/jenningsanderson/geometry-reconstruction#readme", - "dependencies": { - "binary-split": "^1.0.3", - "events": "^3.0.0", - "fork": "^1.3.1", - "fs": "0.0.1-security", - "lodash": "^4.17.11", - "path": "^0.12.7", - "queue-async": "^1.2.1", - "split": "^1.0.1", - "stream-array": "^1.1.2", - "through2": "^2.0.3", - "topojson": "^3.0.2", - "vm": "^0.1.0" - } + "homepage": "https://github.com/jenningsanderson/stream-reduce#readme" } diff --git a/src/cover.js b/src/cover.js deleted file mode 100644 index 680836b..0000000 --- a/src/cover.js +++ /dev/null @@ -1,33 +0,0 @@ -'use strict'; - -module.exports = cover; - -var tilecover = require('@mapbox/tile-cover'); -var bboxPolygon = require('@turf/bbox-polygon'); - -function cover(options) { - if (Array.isArray(options.tiles)) return zoomTiles(options.tiles, options.zoom); - - var area = options.bbox ? bboxPolygon(options.bbox).geometry : - options.geojson ? options.geojson.geometry || options.geojson : null; - - return area ? tilecover.tiles(area, {min_zoom: options.zoom, max_zoom: options.zoom}) : null; -} - -function zoomTiles(tiles, zoom) { - var zoomed = []; - for (var i = 0; i < tiles.length; i++) { - var tile = tiles[i]; - - if (tile[2] === zoom) zoomed.push(tile); - else if (tile[2] > zoom) throw new Error('Tile zoom is higher than expected.'); - else { - var z2 = Math.pow(2, zoom - tile[2]); - - for (var x = tile[0] * z2; x < (tile[0] + 1) * z2; x++) { - for (var y = tile[1] * z2; y < (tile[1] + 1) * z2; y++) zoomed.push([x, y, zoom]); - } - } - } - return zoomed; -} diff --git a/src/index.js b/src/index.js index 7302747..73bedb8 100644 --- a/src/index.js +++ b/src/index.js @@ -3,16 +3,13 @@ module.exports = streamReduce; var EventEmitter = require('events').EventEmitter; -var cpus = require('os').cpus().length; -var vm = require('vm'); -var fs = require('fs'); -var fork = require('child_process').fork; -var path = require('path'); -var binarysplit = require('binary-split'); -// var cover = require('./cover'); -var streamArray = require('stream-array'); -// var MBTiles = require('@mapbox/mbtiles'); -var through = require('through2'); +var cpus = require('os').cpus().length; +var vm = require('vm'); +var fs = require('fs'); +var fork = require('child_process').fork; +var path = require('path'); +var streamArray = require('stream-array'); +var split = require('binary-split'); // Suppress max listener warnings. We need at least 1 listener per worker. process.stderr.setMaxListeners(0); @@ -51,15 +48,15 @@ function streamReduce(options) { var mapOptions = options.mapOptions || {}; for (var i = 0; i < maxWorkers; i++) { - var worker = fork(path.join(__dirname, 'worker.js'), [options.map, 1, JSON.stringify(mapOptions)], {silent: true}); - worker.stdout.pipe(binarysplit('\x1e')).pipe(output); + var worker = fork(path.join(__dirname, 'worker.js'), [options.map, JSON.stringify(mapOptions)], {silent: true}); + worker.stdout.pipe(split('\x1e')).pipe(output); worker.stderr.pipe(process.stderr); worker.on('message', handleMessage); workers.push(worker); } function handleMessage(message) { - if (message.reduce) reduce(message.value, message.tile); + if (message.reduce) reduce(message.value, message.line); else if (message.ready && ++workersReady === workers.length) run(); } @@ -67,50 +64,15 @@ function streamReduce(options) { log('Job started.\n'); ee.emit('start'); - timer = setInterval(updateStatus, 64); - - // var tiles = cover(options); - - // if (tiles) { - // // JS tile array, GeoJSON or bbox - // log('Processing ' + tiles.length + ' tiles.\n'); - // lineStream = streamArray(tiles) - // .on('data', handleTile) - // .on('end', streamEnded); - // - // } else - if (options.lineStream) { - log('Processing lines from stream.\n'); - lineStream = options.lineStream; - lineStream - .on('data', handleLine) - .on('end', streamEnded) - .resume(); - } - // } else { - // // try to get tiles from mbtiles (either specified by sourceCover or first encountered) - // var source; - // for (var i = 0; i < options.sources.length; i++) { - // source = options.sources[i]; - // if (options.sources[i].mbtiles && (!options.sourceCover || options.sourceCover === source.name)) break; - // source = null; - // } - // if (source) { - // log('Processing tile coords from "' + source.name + '" source.\n'); - // var db = new MBTiles(source.mbtiles, function(err) { - // if (err) throw err; - // lineStream = db.createZXYStream() - // .pipe(binarysplit('\n')) - // .on('data', handleZXYLine) - // .on('end', streamEnded); - // }); - // - // } else { - // throw new Error(options.sourceCover ? - // 'Specified source for cover not found.' : - // 'No area or tiles specified for the job.'); - // } - // } + timer = setInterval(updateStatus, 100); + + log('Processing lines from file: '+ options.file +'\n'); + lineStream = fs.createReadStream(options.file).pipe(split()) + + lineStream + .on('data', handleLine) + .on('end', streamEnded) + .resume(); } var paused = false; @@ -122,7 +84,6 @@ function streamReduce(options) { } function handleLine(line) { - // console.warn(tile) line = line.toString(); var workerId = linesSent++ % workers.length; ee.emit('map', line, workerId); @@ -133,19 +94,6 @@ function streamReduce(options) { } } - // function handlelineStreamLine(line) { - // var tile = line; - // if (typeof line === 'string' || line instanceof Buffer) { - // tile = line.toString().split(' '); - // } - // handleTile(tile.map(Number)); - // } - - // function handleZXYLine(line) { - // var tile = line.toString().split('/'); - // handleTile([+tile[1], +tile[2], +tile[0]]); - // } - function reduce(value, line) { if (value !== null && value !== undefined) ee.emit('reduce', value, line); if (paused && linesSent - linesDone < (pauseLimit / 2)) { @@ -165,7 +113,6 @@ function streamReduce(options) { ee.emit('end'); } - /* istanbul ignore next */ function updateStatus() { if (options.log === false || !process.stderr.cursorTo) return; @@ -179,7 +126,6 @@ function streamReduce(options) { process.stderr.clearLine(1); } - /* istanbul ignore next */ function log(str) { if (options.log !== false) process.stderr.write(str); } diff --git a/src/mbtiles.js b/src/mbtiles.js deleted file mode 100644 index 07c7bc3..0000000 --- a/src/mbtiles.js +++ /dev/null @@ -1,41 +0,0 @@ -'use strict'; - -var zlib = require('zlib'); -var MBTiles = require('@mapbox/mbtiles'); -var parseVT = require('./vt'); - -module.exports = mbTilesVT; - -function mbTilesVT(source, ready) { - var db = new MBTiles(source.mbtiles, dbReady); - - function dbReady(err, db) { - if (err) ready(err); - else db.getInfo(infoReady); - } - - function infoReady(err, info) { - if (err) { - ready(err); - } else if (info.format === 'pbf' || info.format === 'application/vnd.mapbox-vector-tile') { - ready(null, getVT); - } else { - ready(new Error('Unsupported MBTiles format: ' + info.format)); - } - } - - function getVT(tile, done) { - db.getTile(tile[2], tile[0], tile[1], tileFetched); - - function tileFetched(err, data) { - if (!err) zlib.unzip(data, tileUnzipped); - else if (err.message === 'Tile does not exist') done(); - else done(err); - } - - function tileUnzipped(err, data) { - if (err) done(err); - done(null, parseVT(data, tile, source)); - } - } -} diff --git a/src/remote.js b/src/remote.js deleted file mode 100644 index 727b01c..0000000 --- a/src/remote.js +++ /dev/null @@ -1,26 +0,0 @@ -'use strict'; - -var request = require('request'); -var parseVT = require('./vt'); -var rateLimit = require('function-rate-limit'); - -module.exports = remoteVT; - -function remoteVT(source, ready) { - var getTile = function(tile, done) { - var url = source.url - .replace('{x}', tile[0]) - .replace('{y}', tile[1]) - .replace('{z}', tile[2]); - - request({url: url, gzip: true, encoding: null}, function(err, res, body) { - if (err) return done(err); - else if (res.statusCode === 200) return done(null, parseVT(body, tile, source)); - else if (res.statusCode === 401) return done(); - else return done(new Error('Server responded with status code ' + res.statusCode)); - }); - }; - - if (source.maxrate) getTile = rateLimit(source.maxrate, 1000, getTile); - ready(null, getTile); -} diff --git a/src/vt.js b/src/vt.js deleted file mode 100644 index 33e9084..0000000 --- a/src/vt.js +++ /dev/null @@ -1,30 +0,0 @@ -'use strict'; - -var VectorTile = require('@mapbox/vector-tile').VectorTile; -var Pbf = require('pbf'); - -module.exports = parseData; - -function parseData(data, tile, source) { - var layers = new VectorTile(new Pbf(data)).layers; - return source.raw ? layers : toGeoJSON(layers, tile, source); -} - -function toGeoJSON(layers, tile, source) { - var collections = {}; - - for (var layerId in layers) { - if (source.layers && source.layers.indexOf(layerId) === -1) continue; - - collections[layerId] = { - type: 'FeatureCollection', - features: [] - }; - for (var k = 0; k < layers[layerId].length; k++) { - collections[layerId].features.push( - layers[layerId].feature(k).toGeoJSON(tile[0], tile[1], tile[2]) - ); - } - } - return collections; -} diff --git a/src/worker.js b/src/worker.js index 972f127..576d6be 100644 --- a/src/worker.js +++ b/src/worker.js @@ -3,54 +3,25 @@ var queue = require('queue-async'); var q = queue(); var sources = []; -var tilesQueue = queue(1); +var linesQueue = queue(1); var isOldNode = process.versions.node.split('.')[0] < 4; -global.mapOptions = JSON.parse(process.argv[4]); +global.mapOptions = JSON.parse(process.argv[3]); var map = require(process.argv[2]); -// JSON.parse(process.argv[3]).forEach(function(source) { -// q.defer(loadSource, source); -// }); - -// function loadSource(source, done) { -// var loaded = {name: source.name}; -// sources.push(loaded); -// -// /*eslint global-require: 0 */ -// if (source.mbtiles) require('./mbtiles')(source, done); -// // else if (source.url) require('./remote')(source, done); -// // else throw new Error('Unknown source type'); -// } - q.awaitAll(function(err, results) { if (err) throw err; - // for (var i = 0; i < results.length; i++) sources[i].getTile = results[i]; process.send({ready: true}); }); -function processTile(tile, callback) { +function processLine(line, callback) { var q = queue(); - // for (var i = 0; i < sources.length; i++) { - // q.defer(sources[i].getTile, tile); - // } - q.awaitAll(gotData); function gotData(err, results) { if (err) throw err; - // var data = {}; - // for (var i = 0; i < results.length; i++) { - // data[sources[i].name] = results[i]; - // if (!results[i]) { - // callback(); - // process.send({reduce: true}); - // return; - // } - // } - var writeQueue = queue(1); function write(data) { @@ -60,12 +31,12 @@ function processTile(tile, callback) { function gotResults(err, value) { if (err) throw err; writeQueue.awaitAll(function() { - process.send({reduce: true, value: value, tile: tile}, null, callback); + process.send({reduce: true, value: value, line: line}, null, callback); if (isOldNode) callback(); // process.send is async since Node 4.0 }); } - map(tile, write, gotResults); + map(line, write, gotResults); } } @@ -73,6 +44,6 @@ function writeStdout(str, cb) { process.stdout.write(str, cb); } -process.on('message', function(tile) { - tilesQueue.defer(processTile, tile); +process.on('message', function(line) { + linesQueue.defer(processLine, line); });