-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMAINFLOW.yml
92 lines (83 loc) · 2.82 KB
/
MAINFLOW.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
id: ml-Model-Automation
namespace: ml.pipeline
description: "Automated ML Pipeline with Drift Detection"
inputs:
- id: drift_threshold
type: FLOAT
defaults: 0.1
description: "Threshold for data drift detection"
- id: DISCORD_WEBHOOK
type: STRING
defaults: "{{ secret:DISCORD_WEBHOOK }}"
tasks:
- id: data_processing
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: data_validation
type: io.kestra.plugin.scripts.python.Commands
containerImage: ghcr.io/kestra-io/pydata:latest
namespaceFiles:
enabled: true
include:
- data/reference_data.csv
- data_validation.py
beforeCommands:
- pip install -r requirements.txt
inputFiles:
requirements.txt: |
evidently
pandas
numpy
scikit-learn
pymongo
requests
argparse
outputFiles:
- "drift_report.json"
- "validation_outputs.json"
- "data/reference_data.csv"
commands:
- python data_validation.py --drift_threshold {{ inputs.drift_threshold }}
- id: conditional_update
type: io.kestra.plugin.core.flow.WaitFor
condition: "{{ read(outputs.data_validation.outputFiles['drift_report.json']) | json('needs_retraining') }}"
tasks:
- id: update_reference_data
type: io.kestra.plugin.core.namespace.UploadFiles
namespace: "{{ flow.namespace }}"
filesMap:
"data/reference_data.csv": "{{ outputs.data_validation.outputFiles['data/reference_data.csv'] }}"
- id: train_model_flow
type: io.kestra.plugin.core.flow.Subflow
namespace: ml.pipeline
flowId: model-training
inputs:
reference_data: "{{ outputs.data_validation.outputFiles['data/reference_data.csv'] }}"
wait: true
transmitFailed: true
- id: notify_retraining
type: io.kestra.plugin.notifications.discord.DiscordExecution
url: "{{ inputs.DISCORD_WEBHOOK }}"
username: "ML Automation Bot"
payload: |
{
"embeds": [{
"title": "🚀 Retraining & Redeploying Triggered!",
"description": "The model was retrained and redeployed due to detected drift.",
"fields": [
{
"name": "🔍 Drift Detected",
"value": "The pipeline detected drift exceeding the threshold of {{ inputs.drift_threshold }}.",
"inline": true
}
],
"color": 15105570,
"footer": {
"text": "ML Pipeline Notification"
}
}]
}
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 0 * * *"