Skip to content

Commit

Permalink
changed queue channel to array with mutex + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
SchawnnDev committed Nov 21, 2023
1 parent 0833412 commit e5f6935
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 157 deletions.
5 changes: 2 additions & 3 deletions internals/export/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
)

func TestEquals(t *testing.T) {
p1 := CSVParameters{FileName: "bla"}
p2 := CSVParameters{FileName: "bla2"}
p1 := CSVParameters{}
p2 := CSVParameters{}
expression.AssertEqual(t, p1.Equals(p2), false)
expression.AssertEqual(t, p1.Equals(p1), true)

Expand All @@ -19,7 +19,6 @@ func TestEquals(t *testing.T) {
Separator: ';',
Limit: 10,
ChunkSize: 100,
FileName: "bla",
}
expression.AssertEqual(t, params3.Equals(p2), false)
expression.AssertEqual(t, params3.Equals(params3), true)
Expand Down
61 changes: 39 additions & 22 deletions internals/export/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
)

type ExportWorker struct {
Id int
Mutex sync.Mutex
Available bool // do not touch this variable inside of worker it is used thread-safely by wrapper
Success chan<- int
Cancel chan bool // channel to cancel the worker
//
Mutex sync.Mutex
Id int
Success chan<- int
Cancel chan bool // channel to cancel the worker
BasePath string // base path where the file will be saved
// critical fields
Available bool
QueueItem WrapperItem
BasePath string // base path where the file will be saved
}

func NewExportWorker(id int, basePath string, success chan<- int) *ExportWorker {
Expand All @@ -33,21 +33,39 @@ func NewExportWorker(id int, basePath string, success chan<- int) *ExportWorker
}
}

// SetError sets the error and the status of the worker
func (e *ExportWorker) SetError(error error) {
e.Mutex.Lock()
defer e.Mutex.Unlock()
e.QueueItem.Status = StatusError
e.QueueItem.Error = error
}

// SetStatus sets the status of the worker
func (e *ExportWorker) SetStatus(status int) {
e.Mutex.Lock()
defer e.Mutex.Unlock()
e.QueueItem.Status = status
}

// SetAvailable sets the worker availability to true and clears the queueItem
func (e *ExportWorker) SetAvailable() {
// SwapAvailable swaps the availability of the worker
func (e *ExportWorker) SwapAvailable(available bool) (old bool) {
e.Mutex.Lock()
defer e.Mutex.Unlock()
old = e.Available
e.Available = available
return old
}

// IsAvailable returns the availability of the worker
func (e *ExportWorker) IsAvailable() bool {
e.Mutex.Lock()
defer e.Mutex.Unlock()
return e.Available
}

// Finalise sets the worker availability to true and clears the queueItem
func (e *ExportWorker) Finalise() {
e.Mutex.Lock()

// set status to error if error occurred
Expand All @@ -67,8 +85,8 @@ func (e *ExportWorker) SetAvailable() {

// Start starts the export task
// It handles one queueItem at a time and when finished it stops the goroutine
func (e *ExportWorker) Start(item WrapperItem) {
defer e.SetAvailable()
func (e *ExportWorker) Start(item WrapperItem, ctx context.Context) {
defer e.Finalise()
e.Mutex.Lock()
e.QueueItem = item
e.Mutex.Unlock()
Expand Down Expand Up @@ -98,7 +116,7 @@ func (e *ExportWorker) Start(item WrapperItem) {
var writerErr error

// local context handling
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Increment the WaitGroup counter
Expand All @@ -117,7 +135,7 @@ func (e *ExportWorker) Start(item WrapperItem) {
defer close(streamedExport.Data)

for _, f := range item.FactIDs {
_ = f // TODO:
_ = f // TODO: facts
writerErr = streamedExport.StreamedExportFactHitsFull(ctx, engine.Fact{}, item.Params.Limit)
if writerErr != nil {
break // break here when error occurs?
Expand All @@ -128,23 +146,21 @@ func (e *ExportWorker) Start(item WrapperItem) {
// Chunk handler
first := true
labels := item.Params.ColumnsLabel
loop := true

for loop {
loop:
for {
select {
case hits, ok := <-streamedExport.Data:
if !ok { // channel closed
loop = false
break
break loop
}

err := WriteConvertHitsToCSV(csvWriter, hits, item.Params.Columns, labels, item.Params.FormatColumnsData, item.Params.Separator)
err = WriteConvertHitsToCSV(csvWriter, hits, item.Params.Columns, labels, item.Params.FormatColumnsData, item.Params.Separator)

if err != nil {
zap.L().Error("WriteConvertHitsToCSV error during export", zap.Error(err))
cancel()
loop = false
break
break loop
}

// Flush data
Expand All @@ -154,10 +170,11 @@ func (e *ExportWorker) Start(item WrapperItem) {
first = false
labels = []string{}
}

case <-ctx.Done():
break loop
case <-e.Cancel:
cancel()
loop = false
break loop
}
}

Expand Down
32 changes: 30 additions & 2 deletions internals/export/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,36 @@ import (
)

func TestNewExportWorker(t *testing.T) {
worker := NewExportWorker("/tmp")
worker := NewExportWorker(0, "/tmp", make(chan<- int))
expression.AssertEqual(t, worker.BasePath, "/tmp")
expression.AssertEqual(t, worker.Available, true)
expression.AssertEqual(t, worker.QueueItemId, "")
expression.AssertEqual(t, worker.Id, 0)
}

func TestExportWorker_SetError(t *testing.T) {
worker := NewExportWorker(0, "/tmp", make(chan<- int))
worker.SetError(nil)
expression.AssertEqual(t, worker.QueueItem.Status, StatusError)
expression.AssertEqual(t, worker.QueueItem.Error, nil)
}

func TestExportWorker_SetStatus(t *testing.T) {
worker := NewExportWorker(0, "/tmp", make(chan<- int))
worker.SetStatus(StatusPending)
expression.AssertEqual(t, worker.QueueItem.Status, StatusPending)
}

func TestExportWorker_SwapAvailable(t *testing.T) {
worker := NewExportWorker(0, "/tmp", make(chan<- int))
expression.AssertEqual(t, worker.SwapAvailable(false), true)
expression.AssertEqual(t, worker.Available, false)
expression.AssertEqual(t, worker.SwapAvailable(true), false)
expression.AssertEqual(t, worker.Available, true)
}

func TestExportWorker_IsAvailable(t *testing.T) {
worker := NewExportWorker(0, "/tmp", make(chan<- int))
expression.AssertEqual(t, worker.IsAvailable(), true)
worker.SwapAvailable(false)
expression.AssertEqual(t, worker.IsAvailable(), false)
}
Loading

0 comments on commit e5f6935

Please sign in to comment.