Skip to content

Commit

Permalink
feat: add a message journal
Browse files Browse the repository at this point in the history
This feature adds a message journal that tracks dispatched messages
and worker events. This will be useful for diagnostic purposes.
The message journal can be filtered by the message id or the worker name
and worker messages can be truncated by a user specified length.

Signed-off-by: Jason Jerome <[email protected]>
  • Loading branch information
DuckBoss authored and jirihnidek committed Jan 2, 2024
1 parent eb18f3b commit 9ee405f
Show file tree
Hide file tree
Showing 14 changed files with 853 additions and 17 deletions.
84 changes: 84 additions & 0 deletions cmd/yggctl/actions.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"text/tabwriter"
"text/template"

"github.com/godbus/dbus/v5"
"github.com/google/uuid"
Expand Down Expand Up @@ -87,6 +89,88 @@ func generateControlMessageAction(ctx *cli.Context) error {
return nil
}

func messageJournalAction(ctx *cli.Context) error {
var conn *dbus.Conn
var err error

if os.Getenv("DBUS_SESSION_BUS_ADDRESS") != "" {
conn, err = dbus.ConnectSessionBus()
if err != nil {
return cli.Exit(fmt.Errorf("cannot connect to session bus: %w", err), 1)
}
} else {
conn, err = dbus.ConnectSystemBus()
if err != nil {
return cli.Exit(fmt.Errorf("cannot connect to system bus: %w", err), 1)
}
}

var journalEntries []map[string]string
args := []interface{}{
ctx.String("message-id"),
ctx.String("worker"),
ctx.String("since"),
ctx.String("until"),
ctx.Bool("persistent"),
}
obj := conn.Object("com.redhat.Yggdrasil1", "/com/redhat/Yggdrasil1")
if err := obj.Call("com.redhat.Yggdrasil1.MessageJournal", dbus.Flags(0), args...).Store(&journalEntries); err != nil {
return cli.Exit(fmt.Errorf("cannot list message journal entries: %v", err), 1)
}

switch ctx.String("format") {
case "json":
data, err := json.Marshal(journalEntries)
if err != nil {
return cli.Exit(fmt.Errorf("cannot marshal journal entries: %v", err), 1)
}
fmt.Println(string(data))
case "text":
journalTextTemplate := template.New("journalTextTemplate")
journalTextTemplate, err := journalTextTemplate.Parse(
"{{range .}}{{.message_id}} : {{.sent}} : {{.worker_name}} : " +
"{{if .response_to}}{{.response_to}}{{else}}...{{end}} : " +
"{{if .worker_event}}{{.worker_event}}{{else}}...{{end}} : " +
"{{if .worker_data}}{{.worker_data}}{{else}}...{{end}}\n{{end}}",
)
if err != nil {
return fmt.Errorf("cannot parse journal text template parameters: %w", err)
}
var compiledTextTemplate bytes.Buffer
textCompileErr := journalTextTemplate.Execute(&compiledTextTemplate, journalEntries)
if textCompileErr != nil {
return fmt.Errorf("cannot compile journal text template: %w", textCompileErr)
}
fmt.Println(compiledTextTemplate.String())
case "table":
writer := tabwriter.NewWriter(os.Stdout, 4, 4, 2, ' ', 0)
fmt.Fprint(
writer,
"MESSAGE #\tMESSAGE ID\tSENT\tWORKER NAME\tRESPONSE TO\tWORKER EVENT\tWORKER DATA\n",
)
for idx, entry := range journalEntries {
fmt.Fprintf(
writer,
"%d\t%s\t%s\t%s\t%s\t%v\t%s\n",
idx,
entry["message_id"],
entry["sent"],
entry["worker_name"],
entry["response_to"],
entry["worker_event"],
entry["worker_data"],
)
}
if err := writer.Flush(); err != nil {
return cli.Exit(fmt.Errorf("unable to flush tab writer: %v", err), 1)
}
default:
return cli.Exit(fmt.Errorf("unknown format type: %v", ctx.String("format")), 1)
}

return nil
}

func workersAction(c *cli.Context) error {
conn, err := connectBus()
if err != nil {
Expand Down
46 changes: 46 additions & 0 deletions cmd/yggctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,52 @@ from stdin.`,
},
Action: dispatchAction,
},
{
Name: "message-journal",
Usage: "Show events emitted by workers",
UsageText: "yggctl message-journal",
Description: "The message-journal command retrieves a list of events emitted by workers",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "persistent",
Aliases: []string{"p"},
Usage: "Include events emitted by workers from persistent storage",
Required: false,
},
&cli.StringFlag{
Name: "worker",
Aliases: []string{"w"},
Usage: "Include only events emitted by `WORKER`",
Required: false,
},
&cli.StringFlag{
Name: "message-id",
Aliases: []string{"m"},
Usage: "Include only events emitted for message ID `STRING`",
Required: false,
},
&cli.StringFlag{
Name: "since",
Aliases: []string{"s"},
Usage: "Include only events emitted after `TIMESTAMP` (YYYY-MM-DD HH:MM:SS)",
Required: false,
},
&cli.StringFlag{
Name: "until",
Aliases: []string{"u"},
Usage: "Include only events emitted before `TIMESTAMP` (YYYY-MM-DD HH:MM:SS)",
Required: false,
},
&cli.StringFlag{
Name: "format",
Aliases: []string{"f"},
Usage: "Print output in `FORMAT` (json, table or text)",
Value: "table",
Required: false,
},
},
Action: messageJournalAction,
},
{
Name: "listen",
Usage: "Listen to worker event output",
Expand Down
27 changes: 27 additions & 0 deletions cmd/yggd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
internaldbus "github.com/redhatinsights/yggdrasil/dbus"
"github.com/redhatinsights/yggdrasil/internal/config"
"github.com/redhatinsights/yggdrasil/internal/constants"
"github.com/redhatinsights/yggdrasil/internal/messagejournal"
"github.com/redhatinsights/yggdrasil/internal/tags"
"github.com/redhatinsights/yggdrasil/internal/transport"
"github.com/redhatinsights/yggdrasil/internal/work"
Expand Down Expand Up @@ -200,6 +201,32 @@ func (c *Client) ListWorkers() (map[string]map[string]string, *dbus.Error) {
return c.dispatcher.FlattenDispatchers(), nil
}

// MessageJournal implements the com.redhat.Yggdrasil1.MessageJournal method.
func (c *Client) MessageJournal(
messageID string,
worker string,
since string,
until string,
persistent bool,
) ([]map[string]string, *dbus.Error) {
filter := messagejournal.Filter{
Persistent: persistent,
MessageID: messageID,
Worker: worker,
Since: since,
Until: until,
}

if c.dispatcher.MessageJournal == nil {
return nil, dbus.MakeFailedError(fmt.Errorf("message journal is not enabled"))
}
journal, err := c.dispatcher.MessageJournal.GetEntries(filter)
if err != nil {
return nil, dbus.MakeFailedError(err)
}
return journal, nil
}

// Dispatch implements the com.redhat.Yggdrasil1.Dispatch method.
func (c *Client) Dispatch(
directive string,
Expand Down
38 changes: 38 additions & 0 deletions cmd/yggd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/redhatinsights/yggdrasil/internal/config"
"github.com/redhatinsights/yggdrasil/internal/constants"
"github.com/redhatinsights/yggdrasil/internal/http"
"github.com/redhatinsights/yggdrasil/internal/messagejournal"
"github.com/redhatinsights/yggdrasil/internal/transport"
"github.com/redhatinsights/yggdrasil/internal/work"

Expand Down Expand Up @@ -68,6 +69,7 @@ func setupDefaultConfig(c *cli.Context) {
MQTTReconnectDelay: c.Duration(config.FlagNameMQTTReconnectDelay),
MQTTConnectTimeout: c.Duration(config.FlagNameMQTTConnectTimeout),
MQTTPublishTimeout: c.Duration(config.FlagNameMQTTPublishTimeout),
MessageJournal: c.String(config.FlagNameMessageJournal),
}
}

Expand Down Expand Up @@ -171,6 +173,29 @@ func setupClient(
return client, transporter, nil
}

// setupMessageJournal tries to set up a message journal database to track
// worker emitted events at the provided path.
func setupMessageJournal(client *Client) error {
messageJournalPath := config.DefaultConfig.MessageJournal
if messageJournalPath != "" {
journalFilePath := filepath.Clean(messageJournalPath)
journal, err := messagejournal.Open(journalFilePath)
if err != nil {
return cli.Exit(
fmt.Errorf(
"cannot initialize message journal database at '%v': %w",
journalFilePath,
err,
),
1,
)
}
client.dispatcher.MessageJournal = journal
log.Debugf("initialized message journal at '%v'", journalFilePath)
}
return nil
}

// setupTLS tries to set up new TLS config and HTTP client
func setupTLS() (*http.Client, *tls.Config, error) {
tlsConfig, err := config.DefaultConfig.CreateTLSConfig()
Expand Down Expand Up @@ -351,6 +376,15 @@ func mainAction(c *cli.Context) error {
return cli.Exit(fmt.Errorf("cannot setup client: %w", err), 1)
}

// Create a message journal if a journal path is provided
// or if it is enabled in the config.
// This message journal contains a persistent database that
// tracks events emitted by workers across yggd sessions.
err = setupMessageJournal(client)
if err != nil {
return err
}

// Create watcher for certificate changes
TlSEvents, err := config.DefaultConfig.WatcherUpdate()
if err != nil {
Expand Down Expand Up @@ -529,6 +563,10 @@ func main() {
Value: 30 * time.Second,
Hidden: true,
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: config.FlagNameMessageJournal,
Usage: "Record worker events and messages in the database `FILE`",
}),
}

app.EnableBashCompletion = true
Expand Down
31 changes: 31 additions & 0 deletions dbus/com.redhat.Yggdrasil1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,37 @@
<arg type="a{sa{ss}}" name="workers" direction="out" />
</method>

<!--
MessageJournal:
@message_id: Filter journal entries to only contain entries with this message id value.
@worker: Filter journal entries to only contain entries with this worker name.
@since: Filter journal entries to only contain entries from this date time value.
@until: Filter journal entries to only contain entries up to this date time value.
@persistent: Filter journal entries to only contain entries from the persistent collection of
journal entries spanning multiple sessions. By default, journal entries are collected from
the active session.
@messages: Array of dictionary objects matching the input filter parameters.
Each element in the array is a dictionary with key/value pairs as follows:
"message_id": <string value>,
"sent": <string value>,
"worker_name": <string value>,
"response_to": <string value>,
"worker_event": <string value>,
"worker_message": <string value>,
Returns the set of worker messages dispatched to workers and
events emitted by workers.
-->
<method name="MessageJournal">
<arg type="s" name="message_id" direction="in" />
<arg type="s" name="worker" direction="in" />
<arg type="s" name="since" direction="in" />
<arg type="s" name="until" direction="in" />
<arg type="b" name="persistent" direction="in" />
<arg type="aa{ss}" name="messages" direction="out" />
</method>

<!--
WorkerEvent:
@worker: Name of the worker emitting the event.
Expand Down
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/godbus/dbus/v5 v5.1.0
github.com/golang-migrate/migrate/v4 v4.16.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.5.0
github.com/mattn/go-sqlite3 v1.14.16
github.com/pelletier/go-toml v1.9.5
github.com/rjeczalik/notify v0.9.3
github.com/urfave/cli/v2 v2.27.1
Expand All @@ -19,10 +21,13 @@ require (
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.13.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
22 changes: 18 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,30 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang-migrate/migrate/v4 v4.16.2 h1:8coYbMKUyInrFk1lfGfRovTLAW7PhWp8qQDT2iKfuoA=
github.com/golang-migrate/migrate/v4 v4.16.2/go.mod h1:pfcJX4nPHaVdc5nmdCikFBWtm+UBpiZjRNNsyBbp0/o=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -30,16 +41,19 @@ github.com/rjeczalik/notify v0.9.3/go.mod h1:gF3zSOrafR9DQEWSE8TjfI9NkooDxbyT4Ug
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/urfave/cli/v2 v2.27.1 h1:8xSQ6szndafKVRmfyeUMxkNUJQMjL1F2zmsZ+qHpfho=
github.com/urfave/cli/v2 v2.27.1/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
Expand Down
Loading

0 comments on commit 9ee405f

Please sign in to comment.