Skip to content
This repository has been archived by the owner on Jul 25, 2018. It is now read-only.

Commit

Permalink
Merge pull request #14 from rcoh/master
Browse files Browse the repository at this point in the history
Fix bugs, improve rendering
  • Loading branch information
rcoh authored Dec 26, 2016
2 parents 60ec6ab + a6a5852 commit 03c4cf5
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 39 deletions.
23 changes: 14 additions & 9 deletions average/Average.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,32 @@ import (
"github.com/SumoLogic/sumoshell/group"
"github.com/SumoLogic/sumoshell/util"
"strconv"
"sync"
)

const Avg = "_avg"
type average struct {
samples *int
sum *float64
key string
// DO NOT MODIFY BASE
base map[string]interface{}
ready *bool
mu *sync.Mutex
output func(map[string]interface{})
}

func makeAverage(key string) average {
samp := 0
v := 0.0
ready := false
return average{&samp, &v, key, make(map[string]interface{}), &ready, util.NewJsonWriter().Write}
mu := sync.Mutex{}
return average{&samp, &v, key, make(map[string]interface{}), &mu, util.NewJsonWriter().Write}
}

func aggregateAverage(output grouper.Merger, key string, base map[string]interface{}) util.SumoAggOperator {
samp := 0
v := 0.0
ready := false
return average{&samp, &v, key, base, &ready, output.Write}
mu := sync.Mutex{}
return average{&samp, &v, key, base, &mu, output.Write}
}

func Build(args []string) (util.SumoAggOperator, error) {
Expand All @@ -40,14 +42,16 @@ func Build(args []string) (util.SumoAggOperator, error) {
key := args[0]
//_ := args[1]
keyFields := args[2:]
return grouper.NewAggregate(aggregateAverage, keyFields, key), nil
return grouper.NewAggregate(aggregateAverage, keyFields, key, Avg), nil
} else {
return nil, util.ParseError("Need a argument to average (`avg keyname`)")
}
}

func (avg average) Flush() {
if *avg.ready {
avg.mu.Lock()
defer avg.mu.Unlock()
if *avg.samples > 0 {
avg.output(util.CreateStartRelation())
avg.output(util.CreateRelation(currentState(avg)))
avg.output(util.CreateEndRelation())
Expand All @@ -59,18 +63,19 @@ func currentState(a average) map[string]interface{} {
for key, val := range a.base {
ret[key] = val
}
ret["_avg"] = *a.sum / float64(*a.samples)
ret[Avg] = *a.sum / float64(*a.samples)
return ret
}

func (a average) Process(inp map[string]interface{}) {
a.mu.Lock()
defer a.mu.Unlock()
v, keyInMap := inp[a.key]
if keyInMap {
f, keyIsNumber := strconv.ParseFloat(fmt.Sprint(v), 64)
if keyIsNumber == nil {
*a.samples += 1
*a.sum += f
*a.ready = true
}
}
}
15 changes: 11 additions & 4 deletions count/Count.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@ package count
import (
"github.com/SumoLogic/sumoshell/group"
"github.com/SumoLogic/sumoshell/util"
"sync"
)

const Count = "_count"
type count struct {
ct *int
base map[string]interface{}
output func(map[string]interface{})
mu *sync.Mutex
}

func makeCount() count {
ct := 0
return count{&ct, make(map[string]interface{}), util.NewJsonWriter().Write}
return count{&ct, make(map[string]interface{}), util.NewJsonWriter().Write, &sync.Mutex{}}
}

func aggregateCount(output grouper.Merger, key string, base map[string]interface{}) util.SumoAggOperator {
ct := 0
count := count{&ct, base, output.Write}
count := count{&ct, base, output.Write, &sync.Mutex{}}
return count
}

Expand All @@ -29,11 +32,13 @@ func Build(args []string) (util.SumoAggOperator, error) {
} else {
keyFields := args
// key is meaningless for count
return grouper.NewAggregate(aggregateCount, keyFields, ""), nil
return grouper.NewAggregate(aggregateCount, keyFields, "", Count), nil
}
}

func (ct count) Flush() {
ct.mu.Lock()
defer ct.mu.Unlock()
ct.output(util.CreateStartRelation())
ct.output(util.CreateRelation(currentState(ct)))
ct.output(util.CreateEndRelation())
Expand All @@ -44,11 +49,13 @@ func currentState(ct count) map[string]interface{} {
for key, val := range ct.base {
ret[key] = val
}
ret["_count"] = *ct.ct
ret[Count] = *ct.ct
return ret
}

func (ct count) Process(inp map[string]interface{}) {
ct.mu.Lock()
defer ct.mu.Unlock()
if util.IsPlus(inp) {
*ct.ct += 1
}
Expand Down
10 changes: 10 additions & 0 deletions example/test2
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
[a=1]
[a=1]
[a=2]
[a=3]
[a=4]
[a=4]
[a=4]
[a=4]
[a=3]
[a=3]
[a=3]
[a=4]
[a=4]
[a=5]
40 changes: 31 additions & 9 deletions group/Grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,37 @@ type Grouper struct {
merger Merger
by []string
key string
mu *sync.Mutex
}

type builder func(Merger, string, map[string]interface{}) util.SumoAggOperator

func NewAggregate(
constructor builder,
by []string,
key string) Grouper {
merger := NewMerger()
key string,
sortCol string) Grouper {
merger := NewMerger(sortCol)
ctor := func(base map[string]interface{}) util.SumoAggOperator {
return constructor(merger, key, base)
}
return Grouper{ctor, make(map[string]util.SumoAggOperator), merger, by, key}

mu := &sync.Mutex{}
return Grouper{ctor, make(map[string]util.SumoAggOperator), merger, by, key, mu}
}

func (g Grouper) Flush() {
g.mu.Lock()
defer g.mu.Unlock()
for _, v := range g.operators {
v.Flush()
}
g.merger.Flush()
}

func (g Grouper) Process(inp map[string]interface{}) {
g.mu.Lock()
defer g.mu.Unlock()
var keys []string
for _, key := range g.by {
val, ok := inp[key]
Expand Down Expand Up @@ -69,11 +77,12 @@ type Merger struct {
aggregate map[int]map[string]interface{}
output *util.JsonWriter
mu *sync.Mutex
sortCol string
}

func NewMerger() Merger {
func NewMerger(sortCol string) Merger {
mu := &sync.Mutex{}
m := Merger{make(map[int]map[string]interface{}), util.NewJsonWriter(), mu}
m := Merger{make(map[int]map[string]interface{}), util.NewJsonWriter(), mu, sortCol}
ticker := time.NewTicker(100 * time.Millisecond)
go flush(m, ticker)
return m
Expand Down Expand Up @@ -106,17 +115,30 @@ func (m Merger) Write(inp map[string]interface{}) {
m.Process(inp)
}


func (m Merger) Flush() {
m.output.Write(util.CreateStartRelation())
m.mu.Lock()
m.output.Write(util.CreateStartRelationMeta("merger"))
// Output keys sorted by index so the ui is consistent
for i := 0; i < len(m.aggregate); i++ {
m.output.Write(util.CreateRelation(m.aggregate[i]))
if m.sortCol == "" {
for i := 0; i < len(m.aggregate); i++ {
m.output.Write(util.CreateRelation(m.aggregate[i]))
}
} else {
aggs := make([]map[string]interface{}, 0, len(m.aggregate))
for i := 0; i < len(m.aggregate); i++ {
aggs = append(aggs, m.aggregate[i])
}
util.SortByField(m.sortCol, aggs)
for i := 0; i < len(aggs); i++ {
m.output.Write(util.CreateRelation(aggs[i]))

}
}
m.mu.Unlock()
m.output.Write(util.CreateEndRelation())
queryString := strings.Join(os.Args[0:], " ")
m.output.Write(util.CreateMeta(map[string]interface{}{"_queryString": queryString}))
m.mu.Unlock()
}
func flush(m Merger, ticker *time.Ticker) {
for {
Expand Down
30 changes: 20 additions & 10 deletions render/Render.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Renderer struct {
height int64
width int64
rowsPrinted *int64
inRelation *bool
}

func main() {
Expand All @@ -30,11 +31,12 @@ func main() {
width, _ := strconv.ParseInt(wstr, 10, 64)

rows := int64(0)
inRelation := false

if len(os.Args) == 2 && os.Args[1] == "noraw" {
util.ConnectToStdIn(Renderer{false, &m, &cols, height, width, &rows})
util.ConnectToStdIn(Renderer{false, &m, &cols, height, width, &rows, &inRelation})
} else {
util.ConnectToStdIn(Renderer{true, &m, &cols, height, width, &rows})
util.ConnectToStdIn(Renderer{true, &m, &cols, height, width, &rows, &inRelation})
}
}

Expand Down Expand Up @@ -72,16 +74,29 @@ func (r Renderer) Process(inp map[string]interface{}) {
fmt.Print("\n")
}
if util.IsStartRelation(inp) {
fmt.Println("======")
if (*r.inRelation) {
panic("Already in relation")
}
*r.inRelation = true
for i := int64(0); i < *r.rowsPrinted; i++ {
// Clear the row
fmt.Printf("\033[2K")
// Go up one row
fmt.Printf("\033[1A")
}
*r.rowsPrinted = 0
for _, col := range *r.cols {
width := (*r.colWidths)[col]
spaces := strings.Repeat(" ", width-len(col))
fmt.Printf("%v%s", col, spaces)
}
*r.rowsPrinted += 2
*r.rowsPrinted += 1
fmt.Printf("\n")
}
if util.IsRelation(inp) {
if (!*r.inRelation) {
panic("Can't get relation before StartRelation")
}
colsWidth := render.Columns([]map[string]interface{}{inp})
colNames := render.ColumnNames(colsWidth)
*r.cols = colNames
Expand All @@ -97,11 +112,6 @@ func (r Renderer) Process(inp map[string]interface{}) {
}

if util.IsEndRelation(inp) {
if *r.rowsPrinted < r.height-1 {
for i := *r.rowsPrinted; i < r.height; i++ {
fmt.Printf("\n")
}
}
*r.rowsPrinted = 0
*r.inRelation = false
}
}
17 changes: 12 additions & 5 deletions sum/Sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/SumoLogic/sumoshell/group"
"github.com/SumoLogic/sumoshell/util"
"strconv"
"sync"
)

type sum struct {
Expand All @@ -13,17 +14,19 @@ type sum struct {
// DO NOT MODIFY BASE
base map[string]interface{}
output func(map[string]interface{})
mu *sync.Mutex
}

const Sum = "_sum"

func makeSum(key string) sum {
sumV := 0.0
return sum{&sumV, key, make(map[string]interface{}), util.NewJsonWriter().Write}

return sum{&sumV, key, make(map[string]interface{}), util.NewJsonWriter().Write, &sync.Mutex{}}
}

func aggregateSum(output grouper.Merger, key string, base map[string]interface{}) util.SumoAggOperator {
sumV := 0.0
return sum{&sumV, key, base, output.Write}
return sum{&sumV, key, base, output.Write, &sync.Mutex{}}
}

func Build(args []string) (util.SumoAggOperator, error) {
Expand All @@ -35,13 +38,15 @@ func Build(args []string) (util.SumoAggOperator, error) {
key := args[0]
//_ := relevantArgs[1]
keyFields := args[2:]
return grouper.NewAggregate(aggregateSum, keyFields, key), nil
return grouper.NewAggregate(aggregateSum, keyFields, key, Sum), nil
} else {
return nil, util.ParseError("Need a argument to average (`sum field`)")
}
}

func (sumOp sum) Flush() {
sumOp.mu.Lock()
defer sumOp.mu.Unlock()
sumOp.output(util.CreateStartRelation())
sumOp.output(util.CreateRelation(currentState(sumOp)))
sumOp.output(util.CreateEndRelation())
Expand All @@ -52,11 +57,13 @@ func currentState(s sum) map[string]interface{} {
for key, val := range s.base {
ret[key] = val
}
ret["_sum"] = *s.sum
ret[Sum] = *s.sum
return ret
}

func (s sum) Process(inp map[string]interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
v, keyInMap := inp[s.key]
if keyInMap {
f, keyIsNumber := strconv.ParseFloat(fmt.Sprint(v), 64)
Expand Down
Loading

0 comments on commit 03c4cf5

Please sign in to comment.