From cadb80cf7915567fafc68a8a2579e7d2d2e7c6c5 Mon Sep 17 00:00:00 2001 From: sbengo Date: Fri, 22 Oct 2021 06:43:09 +0200 Subject: [PATCH] Added UI Output section This PR tries to add the UI Output component to view runtime stats on output and some fixes on backend to allow control of output status UI detail: - Renamed runtime to runtime device - Created new runtime output section --- pkg/agent/agent.go | 10 +- pkg/agent/output/influx.go | 86 ++-- pkg/webui/apirt-output.go | 8 +- src/common/elapsedseconds.pipe.ts | 10 +- .../components/table/ng-table.component.ts | 8 + .../ng-table/components/table/ng-table.html | 15 +- src/home/home.html | 7 +- src/home/home.ts | 7 +- src/main.module.ts | 6 +- .../runtime_device.component.ts} | 18 +- .../runtime_device.data.ts} | 4 +- .../runtime_device.service.ts} | 2 +- .../runtime_deviceeditor.css} | 0 .../runtime_deviceview.html} | 2 +- .../runtime_output.component.ts | 372 ++++++++++++++++++ src/runtime_output/runtime_output.data.ts | 67 ++++ src/runtime_output/runtime_output.service.ts | 100 +++++ src/runtime_output/runtime_outputeditor.css | 76 ++++ src/runtime_output/runtime_outputview.html | 36 ++ 19 files changed, 771 insertions(+), 63 deletions(-) rename src/{runtime/runtime.component.ts => runtime_device/runtime_device.component.ts} (96%) rename src/{runtime/runtime.data.ts => runtime_device/runtime_device.data.ts} (98%) rename src/{runtime/runtime.service.ts => runtime_device/runtime_device.service.ts} (99%) rename src/{runtime/runtimeeditor.css => runtime_device/runtime_deviceeditor.css} (100%) rename src/{runtime/runtimeview.html => runtime_device/runtime_deviceview.html} (99%) create mode 100644 src/runtime_output/runtime_output.component.ts create mode 100644 src/runtime_output/runtime_output.data.ts create mode 100644 src/runtime_output/runtime_output.service.ts create mode 100644 src/runtime_output/runtime_outputeditor.css create mode 100644 src/runtime_output/runtime_outputview.html diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 24d22621..49dfd15e 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -195,14 +195,14 @@ func GetOutputJSONInfo(id string) ([]byte, error) { } // GetDeviceStats returns a map with the basic info of each device. -func GetOutputStats() map[string]*output.InfluxStats { - outstats := make(map[string]*output.InfluxStats) +func GetOutputStats() map[string]*output.SinkDB { + out := make(map[string]*output.SinkDB) mutex.RLock() + defer mutex.RUnlock() for k, v := range outputdb { - outstats[k] = v.GetBasicStats() + out[k] = v.GetSinkDBBasicStats() } - mutex.RUnlock() - return outstats + return out } // GetDeviceStats returns a map with the basic info of each device. diff --git a/pkg/agent/output/influx.go b/pkg/agent/output/influx.go index 34fec19c..8458751c 100644 --- a/pkg/agent/output/influx.go +++ b/pkg/agent/output/influx.go @@ -37,13 +37,14 @@ type SinkDB struct { smutex sync.Mutex statsData sync.RWMutex - dummy bool - iChan chan *client.BatchPoints - client client.Client - OutInfo string - PingTime time.Duration - Node *bus.Node `json:"-"` - Active bool + dummy bool + iChan chan *client.BatchPoints + client client.Client + OutInfo string + PingTime time.Duration + Node *bus.Node `json:"-"` + Active bool + EnqueueOnWriteError bool } // DummyDB a BD struct needed if no database configured @@ -102,6 +103,13 @@ func (db *SinkDB) GetBasicStats() *InfluxStats { return db.Stats } +// GetBasicStats get basic info for this device +func (db *SinkDB) GetSinkDBBasicStats() *SinkDB { + db.statsData.RLock() + defer db.statsData.RUnlock() + return db +} + // GetBasicStats get basic info for this device func (db *SinkDB) getBasicStats() *InfluxStats { stat := db.stats.ThSafeCopy() @@ -110,6 +118,8 @@ func (db *SinkDB) getBasicStats() *InfluxStats { // GetResetStats return outdb stats and reset its counters func (db *SinkDB) GetResetStats() *InfluxStats { + db.statsData.RLock() + defer db.statsData.RUnlock() if db.dummy == true { log.Debug("Reseting Influxstats for DUMMY DB ") return &InfluxStats{} @@ -365,29 +375,45 @@ func (db *SinkDB) sendBatchPoint(data *client.BatchPoints, enqueueonerror bool) nf += len(fields) } } - // keep trying until we get it (don't drop the data) - startSend := time.Now() - err := db.client.Write(*data) - elapsedSend := time.Since(startSend) - - bufferPercent = (float32(len(db.iChan)) * 100.0) / float32(db.cfg.BufferSize) - log.Infof("Buffer OUTPUT [%s] : %.2f%%", db.cfg.ID, bufferPercent) - if err != nil { - db.stats.WriteErrUpdate(elapsedSend, bufferPercent) - log.Errorf("ERROR on Write batchPoint in DB %s (%d points) | elapsed : %s | Error: %s ", db.cfg.ID, np, elapsedSend.String(), err) - // If the queue is not full we will resend after a while + // Check if the output is active + if !db.Active { + // Enqueue on error is active, try to store data on buffer if enqueueonerror { - log.Debug("queing data again...") if len(db.iChan) < db.cfg.BufferSize { db.iChan <- data + bufferPercent = (float32(len(db.iChan)) * 100.0) / float32(db.cfg.BufferSize) + log.Infof("Buffer OUTPUT [%s] : %.2f%%", db.cfg.ID, bufferPercent) + db.stats.WriteErrUpdate(0, bufferPercent) time.Sleep(time.Duration(db.cfg.TimeWriteRetry) * time.Second) } + } else { + // Drop data if enqueue on error is false and output is not active + log.Infof("Skipped point on output %s, output is not active and enqueue on error is false", db.cfg.ID) } } else { - log.Debugf("OK on Write batchPoint in DB %s (%d points) | elapsed : %s ", db.cfg.ID, np, elapsedSend.String()) - db.stats.WriteOkUpdate(int64(np), int64(nf), elapsedSend, bufferPercent) - } + // keep trying until we get it (don't drop the data) + startSend := time.Now() + err := db.client.Write(*data) + elapsedSend := time.Since(startSend) + bufferPercent = (float32(len(db.iChan)) * 100.0) / float32(db.cfg.BufferSize) + log.Infof("Buffer OUTPUT [%s] : %.2f%%", db.cfg.ID, bufferPercent) + if err != nil { + db.stats.WriteErrUpdate(elapsedSend, bufferPercent) + log.Errorf("ERROR on Write batchPoint in DB %s (%d points) | elapsed : %s | Error: %s ", db.cfg.ID, np, elapsedSend.String(), err) + // If the queue is not full we will resend after a while + if enqueueonerror { + log.Debug("queing data again...") + if len(db.iChan) < db.cfg.BufferSize { + db.iChan <- data + time.Sleep(time.Duration(db.cfg.TimeWriteRetry) * time.Second) + } + } + } else { + log.Debugf("OK on Write batchPoint in DB %s (%d points) | elapsed : %s ", db.cfg.ID, np, elapsedSend.String()) + db.stats.WriteOkUpdate(int64(np), int64(nf), elapsedSend, bufferPercent) + } + } } func (db *SinkDB) resetBuffer(length int) { @@ -410,14 +436,14 @@ func (db *SinkDB) startSenderGo(r int, wg *sync.WaitGroup) { defer wg.Done() time.Sleep(5) - EnqueueOnWriteError := db.cfg.EnqueueOnWriteError + db.EnqueueOnWriteError = db.cfg.EnqueueOnWriteError log.Infof("beginning Influx Sender thread: [%s]", db.cfg.ID) for { select { case data := <-db.iChan: if !db.Active { - log.Warn("skip send: Output not active") + db.sendBatchPoint(data, db.EnqueueOnWriteError) continue } if data == nil { @@ -429,18 +455,26 @@ func (db *SinkDB) startSenderGo(r int, wg *sync.WaitGroup) { continue } - db.sendBatchPoint(data, EnqueueOnWriteError) + db.sendBatchPoint(data, db.EnqueueOnWriteError) case val := <-db.Node.Read: log.Infof("Received Message...%s: %+v", val.Type, val.Data) switch val.Type { case "resetbuffer": + db.statsData.Lock() db.resetBuffer(db.cfg.BufferSize) + db.statsData.Unlock() case "flushbuffer": + db.statsData.Lock() db.flushBuffer() + db.statsData.Unlock() case "setactive": + db.statsData.Lock() db.Active = val.Data.(bool) + db.statsData.Unlock() case "enqueue_policy_change": - EnqueueOnWriteError = val.Data.(bool) + db.statsData.Lock() + db.EnqueueOnWriteError = val.Data.(bool) + db.statsData.Unlock() case "exit": log.Infof("invoked Force Exit") //need to flush all data diff --git a/pkg/webui/apirt-output.go b/pkg/webui/apirt-output.go index cfefec45..03425150 100644 --- a/pkg/webui/apirt-output.go +++ b/pkg/webui/apirt-output.go @@ -29,13 +29,12 @@ func NewAPIRtOutput(m *macaron.Macaron) error { func RTOutputBufferAction(ctx *Context) { id := ctx.Params(":id") action := ctx.Params(":action") - log.Infof("activating runtime on device %s", id) + log.Infof("apply action: %s on output %s runtime", action, id) out, err := agent.GetOutput(id) if err != nil { ctx.JSON(404, err.Error()) return } - log.Infof("activating runtime on device %s", id) out.Action(action) ctx.JSON(200, "OK") } @@ -49,14 +48,13 @@ func RTGetOutputInfo(ctx *Context) { ctx.JSON(404, err.Error()) return } - log.Infof("get runtime data from id %s", id) ctx.RawAsJSON(200, json) //get only one device info } else { - devstats := agent.GetOutputStats() - ctx.JSON(200, &devstats) + outputs := agent.GetOutputStats() + ctx.JSON(200, &outputs) } return } diff --git a/src/common/elapsedseconds.pipe.ts b/src/common/elapsedseconds.pipe.ts index 3985f441..fc29dca2 100644 --- a/src/common/elapsedseconds.pipe.ts +++ b/src/common/elapsedseconds.pipe.ts @@ -3,11 +3,11 @@ import {Pipe, PipeTransform} from '@angular/core'; @Pipe({ name: 'elapsedseconds' }) export class ElapsedSecondsPipe implements PipeTransform { transform(value: any, ...args: string[]): string { - if (typeof args === 'undefined' || args.length !== 1) { + if (typeof args === 'undefined' || args.length < 1) { throw new Error('ElapsedSecondsPipe: missing required decimals'); } - return this.toSeconds(value, args[0] ,0); + return this.toSeconds(value, args[0], args[1], 0); } //from kbn.js in grafana project toFixed(value , decimals ) { @@ -44,9 +44,13 @@ export class ElapsedSecondsPipe implements PipeTransform { } } - toSeconds(size , decimals, scaledDecimals) { + toSeconds(size , decimals, units = "seconds", scaledDecimals) { if (size === null) { return ""; } + if (units == "ns") { + size = size * 1.e-9 + } + // Less than 1 µs, devide in ns if (Math.abs(size) < 0.000001) { return this.toFixedScaled(size * 1.e9, decimals, scaledDecimals - decimals, -9, " ns"); diff --git a/src/common/ng-table/components/table/ng-table.component.ts b/src/common/ng-table/components/table/ng-table.component.ts index 51e79770..36b8d727 100755 --- a/src/common/ng-table/components/table/ng-table.component.ts +++ b/src/common/ng-table/components/table/ng-table.component.ts @@ -31,6 +31,7 @@ export class NgTableComponent { @Input() public showCustom: boolean = false; @Input() public showConnection: boolean = false; @Input() public showStatus: boolean = false; + @Input() public showOutput: boolean = false; @Input() public editMode: boolean = false; @Input() public exportType: string; @Input() public extraActions: Array; @@ -112,6 +113,13 @@ export class NgTableComponent { let test = new ElapsedSecondsPipe().transform(html,'3'); html = test.toString(); } + if (transform === "elapsednanoseconds") { + let test = new ElapsedSecondsPipe().transform(html,'3','ns'); + html = test.toString(); + } + if (transform === "percent") { + html = html.toString() + " %" + } if (typeof html === 'object') { var test: any = '