Skip to content
This repository has been archived by the owner on Jul 25, 2018. It is now read-only.

Commit

Permalink
Fix EOL parsing. Closes #7
Browse files Browse the repository at this point in the history
  • Loading branch information
Russell Cohen committed Oct 26, 2015
2 parents b438b94 + c5c17dd commit c87d5f9
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 39 deletions.
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
`Sumoshell` is collection of utilities to improve analyzing log files written in Go. `grep` can't tell that some log lines span multiple individual lines. Parsing out fields is cumbersome. Aggregating is basically impossible, and there is no good way to view the results. In Sumoshell, each individual command acts as a phase in a pipeline to get the answer you want. Sumoshell brings a lot of the functionality of [Sumo Logic](www.sumologic.com) to the command line.

Commands should start with
`sumo` which will transform logs into the json format `sumoshell` uses. Commands should end with `render` `render-basic` or `graph` which render the output to the terminal. Each operator is a stand-alone binary allowing them to be easily composed.
`sumo search [filter]` which will transform logs into the json format `sumoshell` uses. Commands should end with `render` `render-basic` or `graph` which render the output to the terminal. Each operator is a stand-alone binary allowing them to be easily composed.

## Installation
Currently [OSX and Linux binaries are provided for sumoshell](https://github.com/SumoLogic/sumoshell/releases). Simply extract the archive and place the binaries on your path.
Expand All @@ -20,17 +20,17 @@ go install ./...
## Usage
Like [SumoLogic](https://www.sumologic.com), sumoshell enables you pass log data through a series of transformations to get your final result. Pipelines start with a source (`tail`, `cat`, etc.) followed by the `sumo` operator. An example pipeline might be:

```tail -f logfile | sumo "ERROR" | ss parse "thread=*]" | ss count thread | render-basic```
```tail -f logfile | sumo search "ERROR" | sumo parse "thread=*]" | sumo count thread | render-basic```

This would produce a count of log messages matching error by thead. In the basic renderer, the output would look like:
This would produce a count of log messages matching `ERROR` by thead. In the basic renderer, the output would look like:
```
_Id _count thread
0 4 C
1 4 A
2 1 B
```
### The `sumo` operator
The sumo operator performs 3 steps:
### The `sumo search` operator
`sumo search` takes an optional filter parameter to allow for basic searching. The sumo operator performs 3 steps:

1. Break a text file into logical log messages. This merges things like stack traces into a single message for easy searching.
2. Allow basic searching.
Expand All @@ -49,14 +49,14 @@ After using the `sumo` operator, the output will be in JSON. To re-render the ou

sumoshell supports a basic parse operator similar to the `parse` operator in `SumoLogic`. Queries take the form:
```
... | ss parse "[pattern=*] pattern2:'*' morePatterns=(*)" as pattern, pattern2, more | ...
... | sumo parse "[pattern=*] pattern2:'*' morePatterns=(*)" as pattern, pattern2, more | ...
```

### Filtering Data

sumoshell supports a filter operator similar to the `where` operator in `SumoLogic`. Queries take the form:
```
... | ss parse "[host=*]" as host | ss filter host = server1
... | sumo parse "[host=*]" as host | sumo filter host = server1
```

This will drop any log lines that don't have `server1` as the host.
Expand All @@ -67,22 +67,22 @@ sumoshell currently supports 3 aggregate operators:

1. `count`. Example queries:
```
... | ss count # number of rows
... | ss count key # number of rows per key
... | ss count key value # number of rows per the cartesian product of (key, value)
... | sumo count # number of rows
... | sumo count key # number of rows per key
... | sumo count key value # number of rows per the cartesian product of (key, value)
```

2. `sumosum` Example queries:
```
... | ss sum k # sum of all k's
... | ss sum v by k # sum of all v's by k
... | sumo sum k # sum of all k's
... | sumo sum v by k # sum of all v's by k
```

3. `average` Example queries:
```
... | ss average k # average of all k's
... | ss average v by k # average of all v's by k
... | sumo average k # average of all k's
... | sumo average v by k # average of all v's by k
```
8 changes: 7 additions & 1 deletion parse/Parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ func findNumExtractions(parseExpression string) int {
}

func regexFromPat(pat string) *regexp.Regexp {
regex := ".*?" + strings.Replace(regexp.QuoteMeta(pat), "\\*", "(.*?)", -1) + ".*"
regex := ".*?" + strings.Replace(regexp.QuoteMeta(pat), "\\*", "(.*?)", -1)
if strings.HasSuffix(pat, "*") {
regex += "$"

} else {
regex += ".*$"
}
return regexp.MustCompile(regex)
}

Expand Down
39 changes: 39 additions & 0 deletions search/Main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package search

import "os"
import "bufio"
import "io"
import "github.com/SumoLogic/sumoshell/util"
import "strings"

func BuildAndConnect(args []string) {
if len(args[1:]) > 0 {
read(args[1])
} else {
read("")
}
}

func read(filterString string) {
r, w := io.Pipe()
handler := util.NewRawInputHandler(w)
go util.ConnectToReader(SumoFilter{filterString, util.NewJsonWriter()}, r)
bio := bufio.NewReader(os.Stdin)
var line, hasMoreInLine, err = bio.ReadLine()
for err != io.EOF || hasMoreInLine {
handler.Process(line)
line, hasMoreInLine, err = bio.ReadLine()
}
handler.Flush()
}

type SumoFilter struct {
param string
output *util.JsonWriter
}

func (filt SumoFilter) Process(inp map[string]interface{}) {
if strings.Contains(util.ExtractRaw(inp), filt.param) {
filt.output.Write(inp)
}
}
Binary file removed ss/ss
Binary file not shown.
108 changes: 84 additions & 24 deletions sumo/Main.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,99 @@
package main

import "os"
import "bufio"
import "io"
import "github.com/SumoLogic/sumoshell/util"
import "strings"
import (
"fmt"
"github.com/SumoLogic/sumoshell/average"
"github.com/SumoLogic/sumoshell/count"
"github.com/SumoLogic/sumoshell/filter"
"github.com/SumoLogic/sumoshell/parse"
"github.com/SumoLogic/sumoshell/search"
"github.com/SumoLogic/sumoshell/sum"
"github.com/SumoLogic/sumoshell/util"
"os"
"time"
)

type Builder func([]string) (util.SumoOperator, error)
type AggBuilder func([]string) (util.SumoAggOperator, error)

var operators = map[string]Builder{
"parse": parse.Build,
"filter": filter.Build,
}

var aggOperators = map[string]AggBuilder{
"count": count.Build,
"average": average.Build,
"sum": sum.Build,
}

func main() {
if len(os.Args[1:]) > 0 {
read(os.Args[1])

args := os.Args
if len(args) == 1 {
fmt.Println("Arguments expected")
} else {
selectingArg := args[1]
actualArgs := os.Args[1:]
nonAggWorked := connectNonAggOperator(selectingArg, actualArgs)
if nonAggWorked {
return
}

aggWorked := connectAggOperator(selectingArg, actualArgs)
if aggWorked {
return
}

if selectingArg == "search" {
search.BuildAndConnect(actualArgs)
return
}
fmt.Println("Operator " + selectingArg + " unrecognized")
}
}

func connectAggOperator(selector string, args []string) bool {
aggBuilder, aggOk := aggOperators[selector]
if !aggOk {
return false
}

aggOperator, err := aggBuilder(args)
if err != nil {
fmt.Println(err)
} else {
read("")
ticker := time.NewTicker(100 * time.Millisecond)
go flush(aggOperator, ticker)
util.ConnectToStdIn(aggOperator)
// Flush when the stream completes to ensure all data is accounted for
aggOperator.Flush()
}
return true
}

func read(filterString string) {
r, w := io.Pipe()
handler := util.NewRawInputHandler(w)
go util.ConnectToReader(SumoFilter{filterString, util.NewJsonWriter()}, r)
bio := bufio.NewReader(os.Stdin)
var line, hasMoreInLine, err = bio.ReadLine()
for err != io.EOF || hasMoreInLine {
handler.Process(line)
line, hasMoreInLine, err = bio.ReadLine()
func connectNonAggOperator(selector string, args []string) bool {
builder, ok := operators[selector]
if ok {
operator, err := builder(args)
handleErrorOrWire(operator, err)
}
handler.Flush()
return ok
}

type SumoFilter struct {
param string
output *util.JsonWriter
func handleErrorOrWire(operator util.SumoOperator, err error) {
if err != nil {
fmt.Println(err)
} else {
util.ConnectToStdIn(operator)
}
}

func (filt SumoFilter) Process(inp map[string]interface{}) {
if strings.Contains(util.ExtractRaw(inp), filt.param) {
filt.output.Write(inp)
func flush(aggOp util.SumoAggOperator, ticker *time.Ticker) {
for {
select {
case <-ticker.C:
aggOp.Flush()
}
}
}

0 comments on commit c87d5f9

Please sign in to comment.