-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path.gitlab-ci.yml
347 lines (318 loc) · 10.8 KB
/
.gitlab-ci.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
variables:
REGISTRY_ROOT: "/home/cloudadm/.ci-python_packages"
REGISTRY_KEY: "${CI_PIPELINE_ID}"
REGISTRY: "${REGISTRY_ROOT}/${REGISTRY_KEY}"
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
PACKAGE: "python_packages.tar.gz"
LAKE_HOME: "/projets/TSF"
LAKE_MODULES_DIR: "${LAKE_HOME}/applicatifs"
LAKE_PKG_TARGET: "${LAKE_MODULES_DIR}/sf-packages-${CI_COMMIT_REF_SLUG}.tar.gz"
LAKE_ENTRYPOINT: "${LAKE_MODULES_DIR}/main.py"
DEPLOYMENT_TYPE: "null"
### Workflow: defines when branch and merge request pipelines should run.
workflow:
rules:
- if: '$CI_COMMIT_BRANCH == "main" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "main"'
variables:
DEPLOYMENT_TYPE: "prod"
- if: '$CI_COMMIT_BRANCH == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop"'
variables:
DEPLOYMENT_TYPE: "small"
- if: '$CI_COMMIT_BRANCH && $CI_OPEN_MERGE_REQUESTS'
when: never
- if: '$CI_COMMIT_TAG'
when: never
- if: '$CI_COMMIT_BRANCH != "main" && $CI_COMMIT_BRANCH != "develop"'
.lake:
tags:
- datalake
before_script:
- cd ${REGISTRY}/bin && pwd
after_script:
- pwd
.python:
tags:
- ci
image: python:3.6.8
before_script:
- source venv/bin/activate
cache:
key: $CI_COMMIT_REF_SLUG
paths:
- .cache/pip
- venv/
##########################
# PIPELINE STARTS HERE #
##########################
stages:
- .pre
- build
- test
- deploy
- .post
remove_older_working_directories:
stage: .pre
tags:
- datalake
script:
- echo "Cleaning older CI pipeline data. Keeping only the 10 most recent directories in ${REGISTRY_ROOT}."
- ls -1QAtd ${REGISTRY_ROOT}/* | tail -n+10 | xargs rm -rf
prepare_python_venv:
stage: .pre
extends: .python
before_script: null
script:
- python -V # Print out python version for debugging
- pip install -U pip
- pip install virtualenv==20.16.2
- virtualenv venv
prepare_new_working_directory:
stage: .pre
tags:
- datalake
script:
- echo "Preparing runners shared space & configuration in ${REGISTRY}."
- "[[ -d ${REGISTRY} ]] && rm -rf ${REGISTRY}"
- mkdir -p ${REGISTRY}
- mkdir -p ${REGISTRY}/trace
- mkdir -p ${REGISTRY}/log
- mkdir -p ${REGISTRY}/bin
- mkdir -p ${REGISTRY}/results
- echo "Preparing spark configuration inside ${REGISTRY}/spark_config.json (will be set to 'null' if no prediction should be run afterwards)."
- jq --arg REF "${DEPLOYMENT_TYPE}" '.[$REF]' ${CI_PROJECT_DIR}/.ci/datalake/spark_config.json | envsubst > ${REGISTRY}/spark_config.json
- cat ${REGISTRY}/spark_config.json
- echo "Getting MaacDo version from default or ${CI_COMMIT_REF_SLUG} from .ci/datalake/maacdo_version.json "
- MAACDO_VERSION=$(jq --raw-output --arg BRANCH "${CI_COMMIT_REF_SLUG}" '.maacdo | .default + .[$BRANCH] | .version' ${CI_PROJECT_DIR}/.ci/datalake/maacdo_version.json)
- echo "${MAACDO_VERSION} > ${REGISTRY}/.maacdo_version"
- echo "Got MaacDo version as ${MAACDO_VERSION} "
- git clone --depth 1 --branch ${MAACDO_VERSION} https://${GIT_MAACDO_USERNAME}:${GIT_MAACDO_TOKEN}@forge.dgfip.finances.rie.gouv.fr/raphaelventura/maac-do.git ${REGISTRY}/bin
prepare_maacdo_configuration:
stage: .pre
extends: .lake
needs:
- job: "prepare_new_working_directory"
script:
- SPARK_CONFIG=$(jq --compact-output '.sparkTask' ${REGISTRY}/spark_config.json)
- >
[[ -z ${SPARK_CONFIG} ]]
&& [[ ${DEPLOYMENT_TYPE} != "null" ]]
&& { echo "Error: no config for Spark task excecution was provided."; exit 1; }
- export SPARK_CONFIG=${SPARK_CONFIG}
- envsubst < ${CI_PROJECT_DIR}/.ci/datalake/datalake_config_template.par > ${REGISTRY}/ci_job.par
- cat ${REGISTRY}/ci_job.par
install_package:
stage: build
extends: .python
needs:
- job: "prepare_new_working_directory"
- job: "prepare_python_venv"
script:
# Installing shap through wheel causes import failure when sent to the datalake servers.
- pip install -v .[pack] --no-binary=shap
venv_pack:
stage: build
extends: .python
needs:
- job: "install_package"
script:
- venv-pack -o /packages/${REGISTRY_KEY}/${PACKAGE}
pylint:
stage: test
extends: .python
script:
- pip install pylint
- pylint --rcfile=.pylintrc src/sf_datalake/
rules:
- changes:
- src/sf_datalake/**/*.py
send_package_to_lake:
stage: deploy
extends: .lake
needs:
- job: "venv_pack"
- job: "prepare_maacdo_configuration"
- job: "pylint"
optional: true
script:
- echo "Deleting old packages archive (if it exists) on the lake HDFS."
- ./api_maacdo.pl
-a SuppressionFichier
-i ${LAKE_PKG_TARGET}
-c ${REGISTRY}/ci_job.par ||
echo "Failed removing ${LAKE_PKG_TARGET}. Maybe this file doesn't exist."
- echo "Sending new python packages archive to ${LAKE_PKG_TARGET}."
- ./api_maacdo.pl
-a EnvoyerFichier
-l ${REGISTRY}/${PACKAGE}
-i ${LAKE_PKG_TARGET}
-c ${REGISTRY}/ci_job.par
send_entrypoint_to_lake:
stage: deploy
extends: .lake
needs:
- job: "prepare_maacdo_configuration"
- job: "pylint"
optional: true
- job: "send_package_to_lake"
script:
- echo "Deleting previous entry point (if it exists) on the lake HDFS."
- ./api_maacdo.pl
-a SuppressionFichier
-i ${LAKE_ENTRYPOINT}
-c ${REGISTRY}/ci_job.par ||
echo "Failed removing ${LAKE_ENTRYPOINT}. Maybe this file doesn't exist."
- SCRIPT="${CI_PROJECT_DIR}/src/sf_datalake/__main__.py"
- echo "Sending new python main script ${SCRIPT} to ${LAKE_ENTRYPOINT}."
- ./api_maacdo.pl
-a EnvoyerFichier
-l ${SCRIPT}
-i ${LAKE_ENTRYPOINT}
-c ${REGISTRY}/ci_job.par
rules:
- if: $DEPLOYMENT_TYPE != "null"
### Spark jobs handling
start_spark_task:
stage: deploy
extends: .lake
needs:
- job: "send_entrypoint_to_lake"
- job: "send_package_to_lake"
script:
- echo "Launching spark job, getting livy ID."
- 'idLivy=$(./api_maacdo.pl -a DemarrageTacheSpark -c ${REGISTRY}/ci_job.par | grep ID | cut -d: -f2)'
- echo "Spark task has started. Livy id is [$idLivy]."
- echo ${idLivy} | xargs > ${REGISTRY}/idLivy
- echo "To get yarns logs even in success case, execute the following from the server running the gitlab runner process:"
- echo "--> ./api_maacdo.pl -a YarnLogsTacheSpark -j ${idLivy} -c ${REGISTRY}/ci_job.par"
- echo "To stop the spark task, execute the following from the server running the gitlab runner process:"
- echo "--> ./api_maacdo.pl -a ArretTacheSpark -j ${idLivy} -c ${REGISTRY}/ci_job.par"
rules:
- if: $DEPLOYMENT_TYPE != "null"
wait_for_task_finishing:
stage: deploy
extends: .lake
needs:
- job: "start_spark_task"
script:
- export ID_LIVY=$(cat ${REGISTRY}/idLivy)
- |
LAST=""
while true; do
TACHE_SPARK=$(./api_maacdo.pl -a EtatTacheSpark -j ${ID_LIVY} -c ${REGISTRY}/ci_job.par)
STATUS=$(echo ${TACHE_SPARK} | awk '{print $NF}' | tr -dc '[:alnum:]\r\n' | tr '[:lower:]' '[:upper:]')
echo "${STATUS}" > ${REGISTRY}/status_${ID_LIVY}
echo "Status of task ${ID_LIVY} is '${STATUS}'"
[[ ${STATUS} == 'DEAD' ]] && exit 111
[[ ${STATUS} == 'STARTING' || ${STATUS} == 'RUNNING' ]] || break
LOGS=$(./api_maacdo.pl -a LogsTacheSpark -j ${ID_LIVY} -c ${REGISTRY}/ci_job.par)
CURRENT=$(printf '%s' "${LOGS}" | md5sum)
if [[ ${LAST} == ${CURRENT} ]]
then
echo "No fresh logs, waiting for another 60s."
else
echo "New available logs:"
echo ${LOGS}
LAST=${CURRENT}
echo "Waiting for another 60s."
fi
sleep 60
done
rules:
- if: $DEPLOYMENT_TYPE != "null"
stop_spark_task:
stage: deploy
extends: .lake
needs:
- job: "start_spark_task"
script:
- export ID_LIVY=$(cat ${REGISTRY}/idLivy)
- export STATUS=$(cat ${REGISTRY}/status_${ID_LIVY})
- echo "Task ${ID_LIVY} has status ${STATUS}."
- |
if [[ ${STATUS} == 'STARTING' || ${STATUS} == 'RUNNING' ]]
then
./api_maacdo.pl -a ArretTacheSpark -j ${ID_LIVY} -c ${REGISTRY}/ci_job.par
else
echo "Nothing to do."
fi
rules:
- if: $DEPLOYMENT_TYPE != "null"
when: manual
allow_failure: true
### Post-processing
pages:
stage: deploy
extends: .python
needs:
- job: "prepare_python_venv"
script:
- pip install sphinx sphinx-rtd-theme
- cd docs/
- sphinx-apidoc -f -o source/ ../src/
- make html
- mv build/html/ ../public/
when: manual
# artifacts:
# paths:
# - public
fetch_yarn_logs:
stage: .post
extends: .lake
needs:
- job: "wait_for_task_finishing"
script:
- export ID_LIVY=$(cat ${REGISTRY}/idLivy)
- echo "Fetching YARN logs after job failure."
- ./api_maacdo.pl -a YarnLogsTacheSpark -j ${ID_LIVY} -c ${REGISTRY}/ci_job.par
- find ${REGISTRY}/log/ -type f -name 'YarnLogsTacheSpark_*' -execdir cat '{}' + | grep -v 'OnOutOfMemoryError' | grep 'Error' -B 20
rules:
- if: $DEPLOYMENT_TYPE != "null"
when: on_failure
fetch_run_outputs_from_HDFS:
stage: .post
extends: .lake
needs:
- job: "wait_for_task_finishing"
script:
- export ID_LIVY=$(cat ${REGISTRY}/idLivy)
- echo "Parsing ${REGISTRY}/spark_config.json looking for '--prediction_path' argument."
- cat ${REGISTRY}/spark_config.json
- > # select 'sparkTask.args', build json objects like { "--param": "value"}, add these objects in an array, then select value for "--prediction_path" key
OUTPUT_DIR=$(jq --raw-output '[.sparkTask.args as $m | range(0; $m | length; 2) | {($m[.]): $m[(. + 1)]} ] | add | .["--prediction_path"]' ${REGISTRY}/spark_config.json)
- echo "Found argument '--prediction_path'=${OUTPUT_DIR}"
- bash fetchFolder.sh ${LAKE_HOME}/${OUTPUT_DIR} ${REGISTRY}/results ${REGISTRY}/ci_job.par
- echo "Gathering fetched outputs into ${REGISTRY}/results.zip"
- zip -j ${REGISTRY}/results.zip ${REGISTRY}/results/*
- rm -r ${REGISTRY}/results/
- echo "Removing remote results in ${OUTPUT_DIR}"
- bash rmRecursiveHdfs.sh ${LAKE_HOME}/${OUTPUT_DIR} ${REGISTRY}/ci_job.par
rules:
- if: $DEPLOYMENT_TYPE != "null"
when: always
######################
# PIPELINE ENDS HERE #
######################
print-python-env:
stage: .pre
extends: .python
script:
- env | sort
- pwd
- whoami
- ls -al /packages
when: manual
print-shell-env:
stage: .pre
extends: .lake
script:
- env | sort
- whoami
- pwd
- echo "ID_LIVY = $(cat ${REGISTRY}/idLivy)"
- echo "ID_LIVY status = $(cat ${REGISTRY}/status_${ID_LIVY})"
- echo "Spark config:"
- cat ${REGISTRY}/spark_config.json
- echo "MaacDo version = $(cat ${REGISTRY}/.maacdo_version"
- echo "MaacDo configuration:"
- cat ${REGISTRY}/ci_job.par
when: manual