-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjobfile.js
181 lines (177 loc) · 6.37 KB
/
jobfile.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import _ from 'lodash'
import moment from 'moment'
import { featureEach, getType, multiLineString, getCoords, cleanCoords, envelope, flatten } from '@turf/turf'
import makeDebug from 'debug'
const debug = makeDebug('k-vigicrues')
const dbUrl = process.env.DB_URL || 'mongodb://127.0.0.1:27017/vigicrues'
const ttl = +process.env.TTL || (7 * 24 * 60 * 60) // duration in seconds
function validateFeature (feature) {
// Check required id/gid properties
if (!_.has(feature, 'properties.id') && !_.has(feature, 'properties.gid')) {
console.error('Invalid feature: missing id or gid property')
return false
}
_.set(feature, 'properties.gid', _.toNumber(feature.properties.id || feature.properties.gid))
_.unset(feature, 'id') // Remove unused ID if any
// Check required LbEntCru property
if (!_.has(feature, 'properties.LbEntCru')) {
console.warn('Invalid feature: missing \'LbEntCru\' property')
return false
}
_.set(feature, 'properties.name', feature.properties.LbEntCru) // needed for timeseries
// Check required NivInfViCr property
if (!_.has(feature, 'properties.NivInfViCr' )) {
console.warn('Invalid feature: missing \'NivInfViCr\' property')
return false
}
// Ensure the value is clamped to [1-4]
// see https://github.com/kalisio/k-vigicrues/issues/34
if (feature.properties.NivInfViCr < 1) {
console.warn(`Value for ${feature.properties.name} is invalid: ${nivSituVigiCruEnt}. Setting the value to '1'`)
_.set(feature, 'properties.NivInfViCr', 1)
}
if (feature.properties.NivInfViCr > 4) {
console.warn(`Value for ${feature.properties.name} is invalid: ${nivSituVigiCruEnt}. Setting the value to '3'`)
_.set(feature, 'properties.NivInfViCr', 4)
}
// Ensure the geometry is clean as some line strings have degenerated lines
if (getType(feature) === 'MultiLineString') {
let validLines = []
let nbInvalidGeometries = 0
featureEach(flatten(feature), line => {
try {
cleanCoords(line, { mutate: true })
validLines.push(getCoords(line))
} catch (error) {
nbInvalidGeometries++
}
})
if (nbInvalidGeometries > 0) debug(`Filtering ${nbInvalidGeometries} invalid line(s) for ${feature.properties.LbEntCru}`)
// Rebuild geometry from the clean line
feature.geometry = multiLineString(validLines).geometry
} else if (getType(feature) === 'LineString') {
try {
cleanCoords(feature, { mutate: true })
} catch (error) {
console.warn(`Invalid geometry for ${feature.properties.LbEntCru}`)
return false
}
}
return true
}
export default {
id: 'vigicrues',
store: 'memory',
options: {
workersLimit: 1,
faultTolerant: true
},
tasks: [{
id: 'vigicrues',
type: 'http',
options: {
url: 'https://www.vigicrues.gouv.fr/services/1/InfoVigiCru.geojson'
}
}],
hooks: {
tasks: {
after: {
readJson: {},
generateStations: {
hook: 'apply',
function: (item) => {
const features = _.get(item, 'data.features', [])
let validFeatures = []
_.forEach(features, feature => {
if (validateFeature(feature)) validFeatures.push(feature)
})
console.log(`Found ${validFeatures.length} features`)
_.set(item, 'data.features', validFeatures)
}
},
writeSections: {
hook: 'updateMongoCollection',
collection: 'vigicrues-sections',
filter: { 'properties.gid': '<%= properties.gid %>' },
upsert : true,
transform: {
pick: [
'type',
'geometry',
'properties.gid',
'properties.name',
'properties.CdEntCru',
'properties.CdTCC',
],
inPlace: false
},
chunkSize: 256
},
generateForecasts: {
hook: 'apply',
function: (item) => {
let forecastFeatures = []
const features = _.get(item, 'data.features', [])
_.forEach(features, feature => {
let forecastFeature = envelope(feature)
_.set(forecastFeature, 'time', moment.utc().toDate())
_.set(forecastFeature, 'properties.gid', feature.properties.gid) // needed for timeseries
_.set(forecastFeature, 'properties.name', feature.properties.LbEntCru) // needed for timeseries
_.set(forecastFeature, 'properties.NivSituVigiCruEnt', feature.properties.NivInfViCr) // backward compatibilty
_.set(forecastFeature, 'properties.risk', feature.properties.NivInfViCr)
forecastFeatures.push(forecastFeature)
})
item.data = forecastFeatures
}
},
writeForecasts: {
hook: 'writeMongoCollection',
collection: 'vigicrues-forecasts',
},
clearData: {}
}
},
jobs: {
before: {
createStores: { id: 'memory' },
connectMongo: {
url: dbUrl,
// Required so that client is forwarded from job to tasks
clientPath: 'taskTemplate.client'
},
createSectionsCollection: {
hook: 'createMongoCollection',
clientPath: 'taskTemplate.client',
collection: 'vigicrues-sections',
indices: [
[{ 'properties.gid': 1 }, { unique: true }],
{ geometry: '2dsphere' }
]
},
createForecastsCollection: {
hook: 'createMongoCollection',
clientPath: 'taskTemplate.client',
collection: 'vigicrues-forecasts',
indices: [
[{ time: 1, 'properties.gid': 1 }, { unique: true }],
{ 'properties.NivSituVigiCruEnt': 1 },
{ 'properties.id': 1, 'properties.NivSituVigiCruEnt': 1, time: -1 },
[{ time: 1 }, { expireAfterSeconds: ttl }] // days in secs
]
}
},
after: {
disconnectMongo: {
clientPath: 'taskTemplate.client'
},
removeStores: ['memory']
},
error: {
disconnectMongo: {
clientPath: 'taskTemplate.client'
},
removeStores: ['memory']
}
}
}
}