Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async export : streamed and zipped csv exports in filesystem #148

Merged
merged 39 commits into from
Dec 27, 2023
Merged
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6eb7a2d
WIP: exports are done async with workers
SchawnnDev Nov 7, 2023
06b94ed
improved export worker
SchawnnDev Nov 8, 2023
f333236
added tests for export package
SchawnnDev Nov 8, 2023
e6fb33a
Merge branch 'master' of github.com:myrteametrics/myrtea-engine-api i…
SchawnnDev Nov 8, 2023
fea23da
delete forgotten zip_test
SchawnnDev Nov 8, 2023
b4591f6
Added export routes
SchawnnDev Nov 9, 2023
0833412
lot of changes, replaced mutexes by channels
SchawnnDev Nov 20, 2023
e5f6935
changed queue channel to array with mutex + tests
SchawnnDev Nov 21, 2023
0af54fe
fixed tests
SchawnnDev Nov 21, 2023
6605888
added endpoints + wip facts not factids in export structs
SchawnnDev Nov 21, 2023
051daa5
Export endpoints + cancel tests
SchawnnDev Nov 22, 2023
85eb196
added missing test for getuserexport and fixed pipeline
SchawnnDev Nov 22, 2023
866595a
added missing test for getuserexport and fixed pipeline
SchawnnDev Nov 22, 2023
19537ef
its better to define Factids
SchawnnDev Nov 22, 2023
50ecd99
Fixed variablesconfig responding nil when no data was found
SchawnnDev Nov 23, 2023
ea541ad
Lot of changes for export: wrapped export request
SchawnnDev Nov 29, 2023
fb19f06
Merge branch 'master' of github.com:myrteametrics/myrtea-engine-api i…
SchawnnDev Nov 29, 2023
f66ce9e
Merge branch 'async-export' of https://github.com/myrteametrics/myrte…
SchawnnDev Nov 30, 2023
f21e01f
changed export method to post
SchawnnDev Nov 30, 2023
f638da6
wip: reworking notification system
SchawnnDev Nov 30, 2023
0ec576e
changed notification way of work
SchawnnDev Dec 4, 2023
b80260b
Changed notifications & user.login as main unique id for users
SchawnnDev Dec 5, 2023
d8950d5
improved notification system
SchawnnDev Dec 5, 2023
9eaabbb
notification system improvements
SchawnnDev Dec 6, 2023
c4558c9
fixed returning handler returning null
SchawnnDev Dec 6, 2023
200b489
changed error to string since its not marshalling
SchawnnDev Dec 7, 2023
7254974
Some fixes & file moves
SchawnnDev Dec 8, 2023
23600b1
a lot of fixes
SchawnnDev Dec 20, 2023
5fbcc3b
some changes & added title field
SchawnnDev Dec 21, 2023
ac42209
added new status export canceling
SchawnnDev Dec 21, 2023
f81ceaf
some fixes
SchawnnDev Dec 22, 2023
3c1835c
added download export
SchawnnDev Dec 26, 2023
bdda359
changed swagger doc
SchawnnDev Dec 26, 2023
ff59412
fixed merge conflict
SchawnnDev Dec 26, 2023
3e88509
fixed test
SchawnnDev Dec 27, 2023
046c224
fixed another tests
SchawnnDev Dec 27, 2023
ed0800c
fixed tests
SchawnnDev Dec 27, 2023
da2bb97
Update go.yml rm branch condition
SchawnnDev Dec 27, 2023
3d77752
added some tests
SchawnnDev Dec 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
notification system improvements
  • Loading branch information
SchawnnDev committed Dec 6, 2023
commit 9eaabbbcd848dc61ee955772a1a8f37785fd449d
2 changes: 1 addition & 1 deletion config/engine-api.toml
Original file line number Diff line number Diff line change
@@ -234,4 +234,4 @@ AUTHENTICATION_OIDC_FRONT_END_URL = "http://127.0.0.1:4200"
AUTHENTICATION_OIDC_ENCRYPTION_KEY = "thisis24characterslongs."

# NOTIFICATION_LIFETIME: The lifetime of a notification in the database.
NOTIFICATION_LIFETIME = "1w"
NOTIFICATION_LIFETIME = "168h" # 168h = 7 days, available units are "ns", "us" (or "µs"), "ms", "s", "m", "h"
2 changes: 1 addition & 1 deletion internals/app/services.go
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ func initCoordinator() {

instanceName := viper.GetString("INSTANCE_NAME")
if err = coordinator.InitInstance(instanceName, models); err != nil {
zap.L().Fatal("Intialisation of coordinator master", zap.Error(err))
zap.L().Fatal("Initialization of coordinator master", zap.Error(err))
}
if viper.GetBool("ENABLE_CRONS_ON_START") {
for _, li := range coordinator.GetInstance().LogicalIndices {
5 changes: 5 additions & 0 deletions internals/handlers/notification_handlers.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package handlers

import (
"github.com/myrteametrics/myrtea-engine-api/v5/internals/export"
"net/http"
"strconv"
"time"

"github.com/go-chi/chi/v5"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/dbutils"
@@ -67,6 +69,9 @@ func GetNotifications(w http.ResponseWriter, r *http.Request) {
return
}

notifications = append(notifications, notification.NewMockNotification(1, "level", "title", "subTitle", "description", time.Now(), []int64{1}, map[string]interface{}{"issueId": 1}))
notifications = append(notifications, notification.NewExportNotification(2, export.WrapperItem{Id: "test"}, 1))
notifications = append(notifications, notification.NewBaseNotification(3, false))
render.JSON(w, r, notifications)
}

35 changes: 15 additions & 20 deletions internals/handlers/notifier_handlers.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package handlers

import (
"github.com/google/uuid"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/export"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/notifier/notification"
"net/http"
"time"

"github.com/myrteametrics/myrtea-engine-api/v5/internals/handlers/render"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/models"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/notifier"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/security/users"
"go.uber.org/zap"
"net/http"
)

// NotificationsWSRegister godoc
@@ -46,20 +41,20 @@ func NotificationsWSRegister(w http.ResponseWriter, r *http.Request) {
zap.L().Error("Add new WS Client to manager", zap.Error(err))
return
}
go func(client *notifier.WebsocketClient) { // temporary for tests
zap.L().Info("starting notifier")
ticker := time.NewTicker(1 * time.Second)
after := time.After(30 * time.Second)
for {
select {
case <-ticker.C:
notifier.C().SendToUsers(notification.ExportNotification{Status: export.StatusPending, Export: export.WrapperItem{Id: uuid.New().String(), FileName: "test.bla"}}, []users.UserWithPermissions{user})
zap.L().Info("send notification")
case <-after:
return
}
}
}(client)
//go func(client *notifier.WebsocketClient) { // temporary for tests
// zap.L().Info("starting notifier")
// ticker := time.NewTicker(1 * time.Second)
// after := time.After(30 * time.Second)
// for {
// select {
// case <-ticker.C:
// notifier.C().SendToUsers(notification.ExportNotification{Status: export.StatusPending, Export: export.WrapperItem{Id: uuid.New().String(), FileName: "test.bla"}}, []users.UserWithPermissions{user})
// zap.L().Info("send notification")
// case <-after:
// return
// }
// }
//}(client)
go client.Write()

// go client.Read() // Disabled until proper usage
12 changes: 3 additions & 9 deletions internals/handlers/processor_handlers.go
Original file line number Diff line number Diff line change
@@ -3,15 +3,13 @@ package handlers
import (
"encoding/json"
"errors"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/ingester"
"net/http"
"time"

"github.com/myrteametrics/myrtea-engine-api/v5/internals/handlers/render"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/ingester"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/processor"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/scheduler"
"github.com/myrteametrics/myrtea-sdk/v4/models"
"go.uber.org/zap"
"net/http"
)

// ProcessorHandler is a basic struct allowing to set up a single aggregateIngester instance for all handlers
@@ -21,12 +19,8 @@ type ProcessorHandler struct {

// NewProcessorHandler returns a pointer to an ProcessorHandler instance
func NewProcessorHandler() *ProcessorHandler {
var aggregateIngester = ingester.NewAggregateIngester()
go aggregateIngester.Run() // Start ingester
time.Sleep(10 * time.Millisecond) // goroutine warm-up

return &ProcessorHandler{
aggregateIngester: aggregateIngester,
aggregateIngester: ingester.NewAggregateIngester(),
}
}

28 changes: 18 additions & 10 deletions internals/ingester/aggregate.go
Original file line number Diff line number Diff line change
@@ -13,8 +13,9 @@ import (

// AggregateIngester is a component which process scheduler.ExternalAggregate
type AggregateIngester struct {
Data chan []scheduler.ExternalAggregate
data chan []scheduler.ExternalAggregate
metricQueueGauge *stdprometheus.Gauge
running bool
}

var (
@@ -39,16 +40,17 @@ func _newRegisteredGauge() *stdprometheus.Gauge {
// NewAggregateIngester returns a pointer to a new AggregateIngester instance
func NewAggregateIngester() *AggregateIngester {
return &AggregateIngester{
Data: make(chan []scheduler.ExternalAggregate, viper.GetInt("AGGREGATEINGESTER_QUEUE_BUFFER_SIZE")),
data: make(chan []scheduler.ExternalAggregate, viper.GetInt("AGGREGATEINGESTER_QUEUE_BUFFER_SIZE")),
metricQueueGauge: _aggregateIngesterGauge,
running: false,
}
}

// Run is the main routine of a TypeIngester instance
func (ingester *AggregateIngester) Run() {
func (ai *AggregateIngester) Run() {
zap.L().Info("Starting AggregateIngester")

for ir := range ingester.Data {
for ir := range ai.data {
zap.L().Debug("Received ExternalAggregate", zap.Int("ExternalAggregate items count", len(ir)))

err := HandleAggregates(ir)
@@ -57,25 +59,31 @@ func (ingester *AggregateIngester) Run() {
}

// Update queue gauge
(*ingester.metricQueueGauge).Set(float64(len(ingester.Data)))
(*ai.metricQueueGauge).Set(float64(len(ai.data)))
}

}

// Ingest process an array of scheduler.ExternalAggregate
func (ingester *AggregateIngester) Ingest(aggregates []scheduler.ExternalAggregate) error {
dataLen := len(ingester.Data)
func (ai *AggregateIngester) Ingest(aggregates []scheduler.ExternalAggregate) error {
dataLen := len(ai.data)

// Start ingester if not running
if !ai.running {
go ai.Run()
ai.running = true
}

zap.L().Debug("Ingesting data", zap.Any("aggregates", aggregates))

// Check for channel overloading
if dataLen+1 >= cap(ingester.Data) {
if dataLen+1 >= cap(ai.data) {
zap.L().Debug("Buffered channel would be overloaded with incoming bulkIngestRequest")
(*ingester.metricQueueGauge).Set(float64(dataLen))
(*ai.metricQueueGauge).Set(float64(dataLen))
return errors.New("channel overload")
}

ingester.Data <- aggregates
ai.data <- aggregates

return nil
}
6 changes: 5 additions & 1 deletion internals/notifier/notification/handler.go
Original file line number Diff line number Diff line change
@@ -45,6 +45,8 @@ func NewHandler(notificationLifetime time.Duration) *Handler {
// useless to start cleaner if lifetime is less than 0
if notificationLifetime > 0 {
go handler.startCleaner(context.Background())
} else {
zap.L().Info("Notification cleaner will not be started", zap.Duration("notificationLifetime", notificationLifetime))
}

return handler
@@ -68,7 +70,9 @@ func (h *Handler) RegisterNotificationTypes() {

// startCleaner start a ticker to clean expired notifications in database every 24 hours
func (h *Handler) startCleaner(context context.Context) {
ticker := time.NewTicker(time.Hour * 24)
cleanRate := time.Hour * 24
zap.L().Info("Starting notification cleaner", zap.Duration("cleanRate", cleanRate), zap.Duration("notificationLifetime", h.notificationLifetime))
ticker := time.NewTicker(cleanRate)
defer ticker.Stop()
for {
select {
9 changes: 9 additions & 0 deletions internals/notifier/notification/notification.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,15 @@ type BaseNotification struct {
Type string `json:"type"`
}

// NewBaseNotification returns a new instance of a BaseNotification
func NewBaseNotification(id int64, isRead bool) BaseNotification {
return BaseNotification{
Id: id,
IsRead: isRead,
Type: "BaseNotification",
}
}

// NewInstance returns a new instance of a BaseNotification
func (n BaseNotification) NewInstance(id int64, data []byte, isRead bool) (Notification, error) {
var notification BaseNotification
1 change: 1 addition & 0 deletions internals/notifier/notification/notification_mock.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ type MockNotification struct {
Title string `json:"title"`
SubTitle string `json:"subtitle"`
Description string `json:"description"`
Target string `json:"target"`
Context map[string]interface{} `json:"context,omitempty"`
}