Skip to content

Commit

Permalink
Merge pull request #9 from indix/rsyslog_multi_file_support
Browse files Browse the repository at this point in the history
Template changes to support multiple files through rsyslog and fixed …
  • Loading branch information
ashwanthkumar authored Apr 3, 2017
2 parents 950840a + e6a2574 commit 3c01112
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 48 deletions.
28 changes: 13 additions & 15 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions mesos/client.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package mesos

import "errors"
import (
"errors"
"github.com/parnurzeal/gorequest"
)

type Mesos interface {
SlaveState(slaveHost string) (SlaveState, error)
}

type MesosClient struct {
type mesosClient struct {
Request *gorequest.SuperAgent
}

func NewMesosClient() Mesos {
client := new(mesosClient)
client.Request = gorequest.New()
return client
}

func combineErrors(errs []error) error {
Expand Down
8 changes: 3 additions & 5 deletions mesos/slaves.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package mesos
import (
"encoding/json"
"time"

"github.com/parnurzeal/gorequest"
)

type SlaveState struct {
Expand Down Expand Up @@ -36,9 +34,9 @@ type Executor struct {
Source string `json:"source"`
}

func (m *MesosClient) SlaveState(slaveHost string) (SlaveState, error) {
request := gorequest.New()
response, body, errs := request.

func (m *mesosClient) SlaveState(slaveHost string) (SlaveState, error) {
response, body, errs := m.Request.
Get(slaveHost).
Timeout(10 * time.Minute).
End()
Expand Down
9 changes: 5 additions & 4 deletions rsyslog-logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ const RsyslogTemplate = `
# Created via marathon-logger,
# PLEASE DON'T EDIT THIS FILE MANUALLY
# Name - {{ .App }}
# File - {{ .FileName }}
# File - {{ .FileNames }}
######################################
module(load="imfile")
{{range $fileName := .FileNames}}
input(type="imfile"
File="{{ .WorkDir }}/{{ .FileName }}"
Tag="{{ .CleanAppName }} {{.TaskID}}"
File="{{ $.WorkDir }}/{{$fileName}}"
Tag="{{$.CleanAppName}} {{$.TaskID}} {{$fileName}}"
Severity="info")
{{end}}
`

// Rsyslog backend implementation
Expand Down
51 changes: 48 additions & 3 deletions rsyslog-logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,68 @@ func TestRenderRsyslogTemplate(t *testing.T) {
Labels: label,
TaskID: "abcdefghij",
CWD: "/foo/bar",
FileName: "test_file_name.txt",
FileNames: []string{"test_file_name.txt"},
}

expected := `
######################################
# Created via marathon-logger,
# PLEASE DON'T EDIT THIS FILE MANUALLY
# Name - /test.aayush.http
# File - test_file_name.txt
# File - [test_file_name.txt]
######################################
module(load="imfile")
input(type="imfile"
File="/foo/bar/abcdefghij/test_file_name.txt"
Tag="test.aayush.http abcdefghij"
Tag="test.aayush.http abcdefghij test_file_name.txt"
Severity="info")
`
template, err := rsyslog.render(taskInfo)
assert.NoError(t, err)
assert.Equal(t, expected, template)
}

func TestRenderRsyslogTemplateForMultipleFiles(t *testing.T) {

hostname, err := os.Hostname()
var rsyslog = Rsyslog{
WorkDir: "/foo/bar",
}
label := map[string]string{
"logs.enabled": "true",
}
taskInfo := TaskInfo{
App: "/test.aayush.http",
Hostname: hostname,
Labels: label,
TaskID: "abcdefghij",
CWD: "/foo/bar",
FileNames: []string{"test_file_name1.txt","test_file_name2.txt"},
}

expected := `
######################################
# Created via marathon-logger,
# PLEASE DON'T EDIT THIS FILE MANUALLY
# Name - /test.aayush.http
# File - [test_file_name1.txt test_file_name2.txt]
######################################
module(load="imfile")
input(type="imfile"
File="/foo/bar/abcdefghij/test_file_name1.txt"
Tag="test.aayush.http abcdefghij test_file_name1.txt"
Severity="info")
input(type="imfile"
File="/foo/bar/abcdefghij/test_file_name2.txt"
Tag="test.aayush.http abcdefghij test_file_name2.txt"
Severity="info")
`
template, err := rsyslog.render(taskInfo)
assert.NoError(t, err)
Expand Down
36 changes: 17 additions & 19 deletions task-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
const LogFilesToMonitor = "logs.files"

type TaskInfo struct {
App string
Labels map[string]string
TaskID string
Hostname string
CWD string // Current working directory of the task in the slave
FileName string // Actual file name to that we need monitor for logs
WorkDir string // WorkDir location of marathon-logger where we setup Symlink
App string
Labels map[string]string
TaskID string
Hostname string
CWD string // Current working directory of the task in the slave
FileNames []string // Actual file name to that we need monitor for logs
WorkDir string // WorkDir location of marathon-logger where we setup Symlink
}

// CleanAppName cleans the app-name string for `/` characters
Expand Down Expand Up @@ -54,7 +54,7 @@ func (t *TaskManager) Start() {
t.AddLogs = make(chan TaskInfo)
t.RemoveLogs = make(chan string)
t.KnownTasks = make(map[string]time.Time)
t.Client = &mesos.MesosClient{}
t.Client = mesos.NewMesosClient()
go t.run()
fmt.Println("Task Manager Started.")
fmt.Printf("Task Manager - Task's MaxHeartBeatInterval is %v\n", t.MaxTasksHeartBeatInterval)
Expand Down Expand Up @@ -101,18 +101,16 @@ func (t *TaskManager) run() {
if executor != nil {
logFiles := strings.Split(maps.GetString(task.Labels, LogFilesToMonitor, "stdout"), ",")
t.KnownTasks[task.TaskID] = time.Now()
for _, file := range logFiles {
taskInfo := TaskInfo{
App: task.App,
Hostname: task.Hostname,
Labels: task.Labels,
TaskID: task.TaskID,
CWD: executor.Directory,
FileName: file,
}
// fmt.Printf("%v\n", taskInfo)
t.AddLogs <- taskInfo
taskInfo := TaskInfo{
App: task.App,
Hostname: task.Hostname,
Labels: task.Labels,
TaskID: task.TaskID,
CWD: executor.Directory,
FileNames: logFiles,
}
// fmt.Printf("%v\n", taskInfo)
t.AddLogs <- taskInfo
} else {
log.Printf("[WARN] Couldn't find the executor that spun up the task %s", task.TaskID)
}
Expand Down

0 comments on commit 3c01112

Please sign in to comment.