From 6446bf0036c9d9f5c451ca44b78ceb6a20dcfd13 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Sun, 25 Dec 2016 19:02:36 -0500 Subject: [PATCH 1/3] Fix race conditions and improve renderer --- count/Count.go | 8 ++++++-- group/Grouper.go | 13 ++++++++++--- render/Render.go | 30 ++++++++++++++++++++---------- util/Raw.go | 14 ++++++++++++-- 4 files changed, 48 insertions(+), 17 deletions(-) diff --git a/count/Count.go b/count/Count.go index b840267..cd41cc0 100644 --- a/count/Count.go +++ b/count/Count.go @@ -3,22 +3,24 @@ package count import ( "github.com/SumoLogic/sumoshell/group" "github.com/SumoLogic/sumoshell/util" + "sync" ) type count struct { ct *int base map[string]interface{} output func(map[string]interface{}) + mu *sync.Mutex } func makeCount() count { ct := 0 - return count{&ct, make(map[string]interface{}), util.NewJsonWriter().Write} + return count{&ct, make(map[string]interface{}), util.NewJsonWriter().Write, &sync.Mutex{}} } func aggregateCount(output grouper.Merger, key string, base map[string]interface{}) util.SumoAggOperator { ct := 0 - count := count{&ct, base, output.Write} + count := count{&ct, base, output.Write, &sync.Mutex{}} return count } @@ -34,6 +36,8 @@ func Build(args []string) (util.SumoAggOperator, error) { } func (ct count) Flush() { + ct.mu.Lock() + defer ct.mu.Unlock() ct.output(util.CreateStartRelation()) ct.output(util.CreateRelation(currentState(ct))) ct.output(util.CreateEndRelation()) diff --git a/group/Grouper.go b/group/Grouper.go index 1c032bc..f5face1 100644 --- a/group/Grouper.go +++ b/group/Grouper.go @@ -15,6 +15,7 @@ type Grouper struct { merger Merger by []string key string + mu *sync.Mutex } type builder func(Merger, string, map[string]interface{}) util.SumoAggOperator @@ -27,10 +28,14 @@ func NewAggregate( ctor := func(base map[string]interface{}) util.SumoAggOperator { return constructor(merger, key, base) } - return Grouper{ctor, make(map[string]util.SumoAggOperator), merger, by, key} + + mu := &sync.Mutex{} + return Grouper{ctor, make(map[string]util.SumoAggOperator), merger, by, key, mu} } func (g Grouper) Flush() { + g.mu.Lock() + defer g.mu.Unlock() for _, v := range g.operators { v.Flush() } @@ -38,6 +43,8 @@ func (g Grouper) Flush() { } func (g Grouper) Process(inp map[string]interface{}) { + g.mu.Lock() + defer g.mu.Unlock() var keys []string for _, key := range g.by { val, ok := inp[key] @@ -107,16 +114,16 @@ func (m Merger) Write(inp map[string]interface{}) { } func (m Merger) Flush() { - m.output.Write(util.CreateStartRelation()) m.mu.Lock() + m.output.Write(util.CreateStartRelationMeta("merger")) // Output keys sorted by index so the ui is consistent for i := 0; i < len(m.aggregate); i++ { m.output.Write(util.CreateRelation(m.aggregate[i])) } - m.mu.Unlock() m.output.Write(util.CreateEndRelation()) queryString := strings.Join(os.Args[0:], " ") m.output.Write(util.CreateMeta(map[string]interface{}{"_queryString": queryString})) + m.mu.Unlock() } func flush(m Merger, ticker *time.Ticker) { for { diff --git a/render/Render.go b/render/Render.go index 4df465c..c5ffa95 100644 --- a/render/Render.go +++ b/render/Render.go @@ -15,6 +15,7 @@ type Renderer struct { height int64 width int64 rowsPrinted *int64 + inRelation *bool } func main() { @@ -30,11 +31,12 @@ func main() { width, _ := strconv.ParseInt(wstr, 10, 64) rows := int64(0) + inRelation := false if len(os.Args) == 2 && os.Args[1] == "noraw" { - util.ConnectToStdIn(Renderer{false, &m, &cols, height, width, &rows}) + util.ConnectToStdIn(Renderer{false, &m, &cols, height, width, &rows, &inRelation}) } else { - util.ConnectToStdIn(Renderer{true, &m, &cols, height, width, &rows}) + util.ConnectToStdIn(Renderer{true, &m, &cols, height, width, &rows, &inRelation}) } } @@ -72,16 +74,29 @@ func (r Renderer) Process(inp map[string]interface{}) { fmt.Print("\n") } if util.IsStartRelation(inp) { - fmt.Println("======") + if (*r.inRelation) { + panic("Already in relation") + } + *r.inRelation = true + for i := int64(0); i < *r.rowsPrinted; i++ { + // Clear the row + fmt.Printf("\033[2K") + // Go up one row + fmt.Printf("\033[1A") + } + *r.rowsPrinted = 0 for _, col := range *r.cols { width := (*r.colWidths)[col] spaces := strings.Repeat(" ", width-len(col)) fmt.Printf("%v%s", col, spaces) } - *r.rowsPrinted += 2 + *r.rowsPrinted += 1 fmt.Printf("\n") } if util.IsRelation(inp) { + if (!*r.inRelation) { + panic("Can't get relation before StartRelation") + } colsWidth := render.Columns([]map[string]interface{}{inp}) colNames := render.ColumnNames(colsWidth) *r.cols = colNames @@ -97,11 +112,6 @@ func (r Renderer) Process(inp map[string]interface{}) { } if util.IsEndRelation(inp) { - if *r.rowsPrinted < r.height-1 { - for i := *r.rowsPrinted; i < r.height; i++ { - fmt.Printf("\n") - } - } - *r.rowsPrinted = 0 + *r.inRelation = false } } diff --git a/util/Raw.go b/util/Raw.go index 20d2a72..64810ec 100644 --- a/util/Raw.go +++ b/util/Raw.go @@ -8,7 +8,10 @@ import "encoding/json" import "os" import "bufio" import "log" -import "strconv" +import ( + "strconv" + "sync" +) type RawInputHandler struct { output io.Writer @@ -103,6 +106,10 @@ func CreateStartRelation() map[string]interface{} { return map[string]interface{}{Type: StartRelation} } +func CreateStartRelationMeta(origin string) map[string]interface{} { + return map[string]interface{}{Type: StartRelation, Meta: origin} +} + func CreateEndRelation() map[string]interface{} { return map[string]interface{}{Type: EndRelation} } @@ -164,10 +171,11 @@ func (handler *RawInputHandler) Flush() { type JsonWriter struct { writer io.Writer + mu *sync.Mutex } func NewJsonWriter() *JsonWriter { - return &JsonWriter{os.Stdout} + return &JsonWriter{os.Stdout, &sync.Mutex{}} } func (writer *JsonWriter) Write(inp map[string]interface{}) { @@ -176,8 +184,10 @@ func (writer *JsonWriter) Write(inp map[string]interface{}) { if err != nil { fmt.Printf("ERROR!", err) } else { + writer.mu.Lock() writer.writer.Write(jsonBytes) writer.writer.Write([]byte{'\n'}) + writer.mu.Unlock() } } From f16250c42d4b35252e215ec075bbd59282419b69 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Sun, 25 Dec 2016 21:48:02 -0500 Subject: [PATCH 2/3] Fix count, sum, avg bugs --- average/Average.go | 18 +++++++++++------- example/test2 | 10 ++++++++++ sum/Sum.go | 11 ++++++++--- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/average/Average.go b/average/Average.go index 9af4a0f..a1fffa9 100644 --- a/average/Average.go +++ b/average/Average.go @@ -5,6 +5,7 @@ import ( "github.com/SumoLogic/sumoshell/group" "github.com/SumoLogic/sumoshell/util" "strconv" + "sync" ) type average struct { @@ -13,22 +14,22 @@ type average struct { key string // DO NOT MODIFY BASE base map[string]interface{} - ready *bool + mu *sync.Mutex output func(map[string]interface{}) } func makeAverage(key string) average { samp := 0 v := 0.0 - ready := false - return average{&samp, &v, key, make(map[string]interface{}), &ready, util.NewJsonWriter().Write} + mu := sync.Mutex{} + return average{&samp, &v, key, make(map[string]interface{}), &mu, util.NewJsonWriter().Write} } func aggregateAverage(output grouper.Merger, key string, base map[string]interface{}) util.SumoAggOperator { samp := 0 v := 0.0 - ready := false - return average{&samp, &v, key, base, &ready, output.Write} + mu := sync.Mutex{} + return average{&samp, &v, key, base, &mu, output.Write} } func Build(args []string) (util.SumoAggOperator, error) { @@ -47,7 +48,9 @@ func Build(args []string) (util.SumoAggOperator, error) { } func (avg average) Flush() { - if *avg.ready { + avg.mu.Lock() + defer avg.mu.Unlock() + if *avg.samples > 0 { avg.output(util.CreateStartRelation()) avg.output(util.CreateRelation(currentState(avg))) avg.output(util.CreateEndRelation()) @@ -64,13 +67,14 @@ func currentState(a average) map[string]interface{} { } func (a average) Process(inp map[string]interface{}) { + a.mu.Lock() + defer a.mu.Unlock() v, keyInMap := inp[a.key] if keyInMap { f, keyIsNumber := strconv.ParseFloat(fmt.Sprint(v), 64) if keyIsNumber == nil { *a.samples += 1 *a.sum += f - *a.ready = true } } } diff --git a/example/test2 b/example/test2 index 7054f73..d6a2666 100644 --- a/example/test2 +++ b/example/test2 @@ -1,4 +1,14 @@ [a=1] +[a=1] [a=2] [a=3] [a=4] +[a=4] +[a=4] +[a=4] +[a=3] +[a=3] +[a=3] +[a=4] +[a=4] +[a=5] diff --git a/sum/Sum.go b/sum/Sum.go index 1dd1cb1..c0dc6fe 100644 --- a/sum/Sum.go +++ b/sum/Sum.go @@ -5,6 +5,7 @@ import ( "github.com/SumoLogic/sumoshell/group" "github.com/SumoLogic/sumoshell/util" "strconv" + "sync" ) type sum struct { @@ -13,17 +14,17 @@ type sum struct { // DO NOT MODIFY BASE base map[string]interface{} output func(map[string]interface{}) + mu *sync.Mutex } func makeSum(key string) sum { sumV := 0.0 - return sum{&sumV, key, make(map[string]interface{}), util.NewJsonWriter().Write} - + return sum{&sumV, key, make(map[string]interface{}), util.NewJsonWriter().Write, &sync.Mutex{}} } func aggregateSum(output grouper.Merger, key string, base map[string]interface{}) util.SumoAggOperator { sumV := 0.0 - return sum{&sumV, key, base, output.Write} + return sum{&sumV, key, base, output.Write, &sync.Mutex{}} } func Build(args []string) (util.SumoAggOperator, error) { @@ -42,6 +43,8 @@ func Build(args []string) (util.SumoAggOperator, error) { } func (sumOp sum) Flush() { + sumOp.mu.Lock() + defer sumOp.mu.Unlock() sumOp.output(util.CreateStartRelation()) sumOp.output(util.CreateRelation(currentState(sumOp))) sumOp.output(util.CreateEndRelation()) @@ -57,6 +60,8 @@ func currentState(s sum) map[string]interface{} { } func (s sum) Process(inp map[string]interface{}) { + s.mu.Lock() + defer s.mu.Unlock() v, keyInMap := inp[s.key] if keyInMap { f, keyIsNumber := strconv.ParseFloat(fmt.Sprint(v), 64) From a6a585233b3378673830493bd6e369b7c00e6198 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Mon, 26 Dec 2016 16:57:51 -0500 Subject: [PATCH 3/3] Add support for auto-sorting based on key fields --- average/Average.go | 5 +++-- count/Count.go | 7 +++++-- group/Grouper.go | 27 +++++++++++++++++++++------ sum/Sum.go | 6 ++++-- util/Raw.go | 29 +++++++++++++++++++++++++++++ 5 files changed, 62 insertions(+), 12 deletions(-) diff --git a/average/Average.go b/average/Average.go index a1fffa9..590d33a 100644 --- a/average/Average.go +++ b/average/Average.go @@ -8,6 +8,7 @@ import ( "sync" ) +const Avg = "_avg" type average struct { samples *int sum *float64 @@ -41,7 +42,7 @@ func Build(args []string) (util.SumoAggOperator, error) { key := args[0] //_ := args[1] keyFields := args[2:] - return grouper.NewAggregate(aggregateAverage, keyFields, key), nil + return grouper.NewAggregate(aggregateAverage, keyFields, key, Avg), nil } else { return nil, util.ParseError("Need a argument to average (`avg keyname`)") } @@ -62,7 +63,7 @@ func currentState(a average) map[string]interface{} { for key, val := range a.base { ret[key] = val } - ret["_avg"] = *a.sum / float64(*a.samples) + ret[Avg] = *a.sum / float64(*a.samples) return ret } diff --git a/count/Count.go b/count/Count.go index cd41cc0..e4dbd6e 100644 --- a/count/Count.go +++ b/count/Count.go @@ -6,6 +6,7 @@ import ( "sync" ) +const Count = "_count" type count struct { ct *int base map[string]interface{} @@ -31,7 +32,7 @@ func Build(args []string) (util.SumoAggOperator, error) { } else { keyFields := args // key is meaningless for count - return grouper.NewAggregate(aggregateCount, keyFields, ""), nil + return grouper.NewAggregate(aggregateCount, keyFields, "", Count), nil } } @@ -48,11 +49,13 @@ func currentState(ct count) map[string]interface{} { for key, val := range ct.base { ret[key] = val } - ret["_count"] = *ct.ct + ret[Count] = *ct.ct return ret } func (ct count) Process(inp map[string]interface{}) { + ct.mu.Lock() + defer ct.mu.Unlock() if util.IsPlus(inp) { *ct.ct += 1 } diff --git a/group/Grouper.go b/group/Grouper.go index f5face1..665c769 100644 --- a/group/Grouper.go +++ b/group/Grouper.go @@ -23,8 +23,9 @@ type builder func(Merger, string, map[string]interface{}) util.SumoAggOperator func NewAggregate( constructor builder, by []string, - key string) Grouper { - merger := NewMerger() + key string, + sortCol string) Grouper { + merger := NewMerger(sortCol) ctor := func(base map[string]interface{}) util.SumoAggOperator { return constructor(merger, key, base) } @@ -76,11 +77,12 @@ type Merger struct { aggregate map[int]map[string]interface{} output *util.JsonWriter mu *sync.Mutex + sortCol string } -func NewMerger() Merger { +func NewMerger(sortCol string) Merger { mu := &sync.Mutex{} - m := Merger{make(map[int]map[string]interface{}), util.NewJsonWriter(), mu} + m := Merger{make(map[int]map[string]interface{}), util.NewJsonWriter(), mu, sortCol} ticker := time.NewTicker(100 * time.Millisecond) go flush(m, ticker) return m @@ -113,12 +115,25 @@ func (m Merger) Write(inp map[string]interface{}) { m.Process(inp) } + func (m Merger) Flush() { m.mu.Lock() m.output.Write(util.CreateStartRelationMeta("merger")) // Output keys sorted by index so the ui is consistent - for i := 0; i < len(m.aggregate); i++ { - m.output.Write(util.CreateRelation(m.aggregate[i])) + if m.sortCol == "" { + for i := 0; i < len(m.aggregate); i++ { + m.output.Write(util.CreateRelation(m.aggregate[i])) + } + } else { + aggs := make([]map[string]interface{}, 0, len(m.aggregate)) + for i := 0; i < len(m.aggregate); i++ { + aggs = append(aggs, m.aggregate[i]) + } + util.SortByField(m.sortCol, aggs) + for i := 0; i < len(aggs); i++ { + m.output.Write(util.CreateRelation(aggs[i])) + + } } m.output.Write(util.CreateEndRelation()) queryString := strings.Join(os.Args[0:], " ") diff --git a/sum/Sum.go b/sum/Sum.go index c0dc6fe..1ad14ca 100644 --- a/sum/Sum.go +++ b/sum/Sum.go @@ -17,6 +17,8 @@ type sum struct { mu *sync.Mutex } +const Sum = "_sum" + func makeSum(key string) sum { sumV := 0.0 return sum{&sumV, key, make(map[string]interface{}), util.NewJsonWriter().Write, &sync.Mutex{}} @@ -36,7 +38,7 @@ func Build(args []string) (util.SumoAggOperator, error) { key := args[0] //_ := relevantArgs[1] keyFields := args[2:] - return grouper.NewAggregate(aggregateSum, keyFields, key), nil + return grouper.NewAggregate(aggregateSum, keyFields, key, Sum), nil } else { return nil, util.ParseError("Need a argument to average (`sum field`)") } @@ -55,7 +57,7 @@ func currentState(s sum) map[string]interface{} { for key, val := range s.base { ret[key] = val } - ret["_sum"] = *s.sum + ret[Sum] = *s.sum return ret } diff --git a/util/Raw.go b/util/Raw.go index 64810ec..abba581 100644 --- a/util/Raw.go +++ b/util/Raw.go @@ -11,6 +11,7 @@ import "log" import ( "strconv" "sync" + "sort" ) type RawInputHandler struct { @@ -194,3 +195,31 @@ func (writer *JsonWriter) Write(inp map[string]interface{}) { func CoerceNumber(v interface{}) (float64, error) { return strconv.ParseFloat(fmt.Sprint(v), 64) } + +type Datum []map[string]interface{} +type By func(p1, p2 *map[string]interface{}) bool +func (a datumSorter) Len() int { return len(a.data) } +func (a datumSorter) Swap(i, j int) { a.data[i], a.data[j] = a.data[j], a.data[i] } +// planetSorter joins a By function and a slice of Planets to be sorted. +type datumSorter struct { + data Datum + by func(p1, p2 map[string]interface{}) bool // Closure used in the Less method. +} + +func (a datumSorter) Less(i, j int) bool { + return a.by(a.data[i], a.data[j]) +} + +func SortByField(field string, data Datum) { + by := func(p1, p2 map[string]interface{}) bool { + v1, err1 := CoerceNumber(p1[field]) + v2, err2 := CoerceNumber(p2[field]) + + if err1 != nil || err2 != nil { + panic(err1) + } + + return v1 < v2 + } + sort.Sort(sort.Reverse(&datumSorter{data, by})) +}