Skip to content
This repository has been archived by the owner on Jul 25, 2018. It is now read-only.

Commit

Permalink
Merge pull request #19 from rcoh/master
Browse files Browse the repository at this point in the history
Fix more race conditions, cleanup code
  • Loading branch information
rcoh authored Jan 13, 2017
2 parents 03c4cf5 + 9de2b36 commit 5da93f0
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 48 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
`Sumoshell` is collection of utilities to improve analyzing log files written in Go. `grep` can't tell that some log lines span multiple individual lines. Parsing out fields is cumbersome. Aggregating is basically impossible, and there is no good way to view the results. In Sumoshell, each individual command acts as a phase in a pipeline to get the answer you want. Sumoshell brings a lot of the functionality of [Sumo Logic](https://www.sumologic.com) to the command line.

Commands should start with
`sumo search [filter]` which will transform logs into the json format `sumoshell` uses. Commands should end with `render` `render-basic` or `graph` which render the output to the terminal. Each operator is a stand-alone binary allowing them to be easily composed.
`sumo search [filter]` which will transform logs into the json format `sumoshell` uses. Commands should end with `render` or `graph` which render the output to the terminal. Each operator is a stand-alone binary allowing them to be easily composed.

## Installation
[OSX and Linux binaries are provided for sumoshell](https://github.com/SumoLogic/sumoshell/releases). Simply extract the archive and place the binaries on your path.
Expand All @@ -20,7 +20,7 @@ go install ./...
## Usage
Like [SumoLogic](https://www.sumologic.com), sumoshell enables you pass log data through a series of transformations to get your final result. Pipelines start with a source (`tail`, `cat`, etc.) followed by the `sumo` operator. An example pipeline might be:

```tail -f logfile | sumo search "ERROR" | sumo parse "thread=*]" | sumo count thread | render-basic```
```tail -f logfile | sumo search "ERROR" | sumo parse "thread=*]" | sumo count thread | render```

This would produce a count of log messages matching `ERROR` by thead. In the basic renderer, the output would look like:
```
Expand All @@ -40,9 +40,8 @@ _Id _count thread

After using the `sumo` operator, the output will be in JSON. To re-render the output in a human-readable form, `|` the results of your query into one of the three `render` operators.

1. `render-basic`: Capable of rendering aggregate and non-aggregate data. Mimics curses style CLIs by calculating the terminal height and printing new lines to the end to keep your text aligned. Add `nowraw` to drop the raw data when an aggregate isn't present.
2. `render`: Curses based renderer for rendering tabular data.
3. `graph`: Curses based renderer for rendering tabular data as a bar chart.
1. `render`: Capable of rendering aggregate and non-aggregate data. Add `nowraw` to drop the raw data when an aggregate isn't present. Aggregates are updated in place using terminal escape sequences.
2. `graph`: Curses based renderer for rendering tabular data as a bar chart.


### Parsing Data
Expand Down
3 changes: 2 additions & 1 deletion average/Average.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

const Avg = "_avg"

type average struct {
samples *int
sum *float64
Expand Down Expand Up @@ -51,7 +52,7 @@ func Build(args []string) (util.SumoAggOperator, error) {
func (avg average) Flush() {
avg.mu.Lock()
defer avg.mu.Unlock()
if *avg.samples > 0 {
if *avg.samples > 0 {
avg.output(util.CreateStartRelation())
avg.output(util.CreateRelation(currentState(avg)))
avg.output(util.CreateEndRelation())
Expand Down
1 change: 1 addition & 0 deletions count/Count.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

const Count = "_count"

type count struct {
ct *int
base map[string]interface{}
Expand Down
20 changes: 2 additions & 18 deletions group/Grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"strings"
"sync"
"time"
)

type Grouper struct {
Expand All @@ -15,7 +14,7 @@ type Grouper struct {
merger Merger
by []string
key string
mu *sync.Mutex
mu *sync.Mutex
}

type builder func(Merger, string, map[string]interface{}) util.SumoAggOperator
Expand Down Expand Up @@ -83,8 +82,6 @@ type Merger struct {
func NewMerger(sortCol string) Merger {
mu := &sync.Mutex{}
m := Merger{make(map[int]map[string]interface{}), util.NewJsonWriter(), mu, sortCol}
ticker := time.NewTicker(100 * time.Millisecond)
go flush(m, ticker)
return m
}

Expand All @@ -99,10 +96,6 @@ func ExtractId(inp map[string]interface{}) int {
}
}

func WithId(id int) map[string]interface{} {
return map[string]interface{}{Id: id}
}

func (m Merger) Process(inp map[string]interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -115,9 +108,9 @@ func (m Merger) Write(inp map[string]interface{}) {
m.Process(inp)
}


func (m Merger) Flush() {
m.mu.Lock()
defer m.mu.Unlock()
m.output.Write(util.CreateStartRelationMeta("merger"))
// Output keys sorted by index so the ui is consistent
if m.sortCol == "" {
Expand All @@ -138,13 +131,4 @@ func (m Merger) Flush() {
m.output.Write(util.CreateEndRelation())
queryString := strings.Join(os.Args[0:], " ")
m.output.Write(util.CreateMeta(map[string]interface{}{"_queryString": queryString}))
m.mu.Unlock()
}
func flush(m Merger, ticker *time.Ticker) {
for {
select {
case <-ticker.C:
m.Flush()
}
}
}
32 changes: 23 additions & 9 deletions render/Render.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Renderer struct {
height int64
width int64
rowsPrinted *int64
inRelation *bool
inRelation *bool
}

func main() {
Expand Down Expand Up @@ -74,7 +74,7 @@ func (r Renderer) Process(inp map[string]interface{}) {
fmt.Print("\n")
}
if util.IsStartRelation(inp) {
if (*r.inRelation) {
if *r.inRelation {
panic("Already in relation")
}
*r.inRelation = true
Expand All @@ -85,22 +85,25 @@ func (r Renderer) Process(inp map[string]interface{}) {
fmt.Printf("\033[1A")
}
*r.rowsPrinted = 0
for _, col := range *r.cols {
width := (*r.colWidths)[col]
spaces := strings.Repeat(" ", width-len(col))
fmt.Printf("%v%s", col, spaces)
if len(*r.cols) > 0 {
r.printHeader()
}
*r.rowsPrinted += 1
fmt.Printf("\n")
}
if util.IsRelation(inp) {
if (!*r.inRelation) {
// If we haven't printed the header yet
if *r.rowsPrinted >= 20 {
return
}
if !*r.inRelation {
panic("Can't get relation before StartRelation")
}
colsWidth := render.Columns([]map[string]interface{}{inp})
colNames := render.ColumnNames(colsWidth)
*r.cols = colNames
*r.colWidths = colsWidth
if *r.rowsPrinted == 0 {
r.printHeader()
}
for _, col := range colNames {
v, _ := inp[col]
vStr := fmt.Sprint(v)
Expand All @@ -115,3 +118,14 @@ func (r Renderer) Process(inp map[string]interface{}) {
*r.inRelation = false
}
}

func (r Renderer) printHeader() {
for _, col := range *r.cols {
width := (*r.colWidths)[col]
spaces := strings.Repeat(" ", width-len(col))
fmt.Printf("%v%s", col, spaces)
}
*r.rowsPrinted += 1
fmt.Printf("\n")

}
4 changes: 3 additions & 1 deletion search/Main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import "os"
import "bufio"
import "io"
import "github.com/SumoLogic/sumoshell/util"
import "strings"
import (
"strings"
)

func BuildAndConnect(args []string) {
if len(args[1:]) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion sum/Sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type sum struct {
// DO NOT MODIFY BASE
base map[string]interface{}
output func(map[string]interface{})
mu *sync.Mutex
mu *sync.Mutex
}

const Sum = "_sum"
Expand Down
11 changes: 8 additions & 3 deletions sumo/Main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/SumoLogic/sumoshell/util"
"os"
"time"
// "sync"
)

type Builder func([]string) (util.SumoOperator, error)
Expand All @@ -30,7 +31,6 @@ var aggOperators = map[string]AggBuilder{
}

func main() {

args := os.Args
if len(args) == 1 {
fmt.Println("Arguments expected")
Expand Down Expand Up @@ -66,8 +66,11 @@ func connectAggOperator(selector string, args []string) bool {
fmt.Println(err)
} else {
ticker := time.NewTicker(100 * time.Millisecond)
go flush(aggOperator, ticker)
done := make(chan bool)
go flush(aggOperator, ticker, done)
util.ConnectToStdIn(aggOperator)
ticker.Stop()
done <- true
// Flush when the stream completes to ensure all data is accounted for
aggOperator.Flush()
}
Expand All @@ -91,11 +94,13 @@ func handleErrorOrWire(operator util.SumoOperator, err error) {
}
}

func flush(aggOp util.SumoAggOperator, ticker *time.Ticker) {
func flush(aggOp util.SumoAggOperator, ticker *time.Ticker, done chan bool) {
for {
select {
case <-ticker.C:
aggOp.Flush()
case <-done:
return
}
}
}
18 changes: 8 additions & 10 deletions util/Raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import "os"
import "bufio"
import "log"
import (
"sort"
"strconv"
"sync"
"sort"
)

type RawInputHandler struct {
Expand Down Expand Up @@ -151,17 +151,12 @@ func NewRawInputHandler(inp io.Writer) *RawInputHandler {
return &RawInputHandler{inp, []rune{}}
}

func NewRawInputHandlerStdout() *RawInputHandler {
return NewRawInputHandler(os.Stdout)
}

func (handler *RawInputHandler) Flush() {
m := make(map[string]interface{})
m[Raw] = string(handler.buff)
m[Type] = Plus
handler.buff = []rune{}
b, err := json.Marshal(m)
//fmt.Printf(b)
if err != nil {
fmt.Printf("ERROR!", err)
} else {
Expand All @@ -172,7 +167,7 @@ func (handler *RawInputHandler) Flush() {

type JsonWriter struct {
writer io.Writer
mu *sync.Mutex
mu *sync.Mutex
}

func NewJsonWriter() *JsonWriter {
Expand All @@ -198,12 +193,14 @@ func CoerceNumber(v interface{}) (float64, error) {

type Datum []map[string]interface{}
type By func(p1, p2 *map[string]interface{}) bool
func (a datumSorter) Len() int { return len(a.data) }
func (a datumSorter) Swap(i, j int) { a.data[i], a.data[j] = a.data[j], a.data[i] }

func (a datumSorter) Len() int { return len(a.data) }
func (a datumSorter) Swap(i, j int) { a.data[i], a.data[j] = a.data[j], a.data[i] }

// planetSorter joins a By function and a slice of Planets to be sorted.
type datumSorter struct {
data Datum
by func(p1, p2 map[string]interface{}) bool // Closure used in the Less method.
by func(p1, p2 map[string]interface{}) bool // Closure used in the Less method.
}

func (a datumSorter) Less(i, j int) bool {
Expand All @@ -216,6 +213,7 @@ func SortByField(field string, data Datum) {
v2, err2 := CoerceNumber(p2[field])

if err1 != nil || err2 != nil {
fmt.Print(data)
panic(err1)
}

Expand Down

0 comments on commit 5da93f0

Please sign in to comment.