Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add more events for starting and stopping of worker #187

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/work/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (d *Dispatcher) Connect() error {
go func() {
for s := range signals {
log.Tracef("received signal: %#v", s)

dest, err := d.senderName(dbus.Sender(s.Sender))
if err != nil {
log.Errorf("cannot find sender: %v", err)
Expand Down
10 changes: 10 additions & 0 deletions ipc/com.redhat.Yggdrasil1.Worker1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@
3 = WORKING
Emitted when the worker wishes to continue to announce it is
working.

4 = STARTED
Emitted when the worker is started, and it is ready
to handle received messages. The message_id and response_id are
empty.

5 = STOPPED
Emitted when the worker is stopped, and it is not able
to handle received messages anymore. The message_id and
response_id are also empty.
-->
<signal name="Event">
<arg type="u" name="name" />
Expand Down
40 changes: 28 additions & 12 deletions ipc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ipc

import (
_ "embed"
"fmt"
)

//go:embed com.redhat.Yggdrasil1.Dispatcher1.xml
Expand All @@ -12,13 +13,16 @@ var InterfaceDispatcher string
type DispatcherEvent uint

const (
// Emitted when the dispatcher receives the "disconnect" command.
// DispatcherEventReceivedDisconnect is emitted when the dispatcher receives
// the "disconnect" command.
DispatcherEventReceivedDisconnect DispatcherEvent = 1

// Emitted when the transport unexpected disconnects from the network.
// DispatcherEventUnexpectedDisconnect is emitted when the transport unexpected
// disconnects from the network.
DispatcherEventUnexpectedDisconnect DispatcherEvent = 2

// Emitted when the transport reconnects to the network.
// DispatcherEventConnectionRestored is emitted when the transport reconnects
// to the network.
DispatcherEventConnectionRestored DispatcherEvent = 3
)

Expand All @@ -29,28 +33,40 @@ type WorkerEventName uint

const (

// Emitted when the worker "accepts" a dispatched message and begins
// "working".
// WorkerEventNameBegin is emitted when the worker "accepts"
// a dispatched message and begins "working".
WorkerEventNameBegin WorkerEventName = 1

// Emitted when the worker finishes "working".
// WorkerEventNameEnd is emitted when the worker finishes "working".
WorkerEventNameEnd WorkerEventName = 2

// Emitted when the worker wishes to continue to announce it is
// working.
// WorkerEventNameWorking is emitted when the worker wishes
// to continue to announce it is working.
WorkerEventNameWorking WorkerEventName = 3

// WorkerEventNameStarted is emitted when worker finished starting
// process, and it can start process received messages.
WorkerEventNameStarted WorkerEventName = 4

// WorkerEventNameStopped is emitted when worker is stopped,
// and it cannot process any message.
WorkerEventNameStopped WorkerEventName = 5
)

func (e WorkerEventName) String() string {
switch e {
case 1:
case WorkerEventNameBegin:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow. We were not doing this before? Good find and refactor!

return "BEGIN"
case 2:
case WorkerEventNameEnd:
return "END"
case 3:
case WorkerEventNameWorking:
return "WORKING"
case WorkerEventNameStarted:
return "STARTED"
case WorkerEventNameStopped:
return "STOPPED"
}
return ""
return fmt.Sprintf("UNKNOWN (value: %d)", e)
}

type WorkerEvent struct {
Expand Down
22 changes: 22 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ func (w *Worker) Connect(quit <-chan os.Signal) error {
return fmt.Errorf("request name failed")
}

// Emit a started event
err = w.EmitEvent(
ipc.WorkerEventNameStarted,
"",
"",
map[string]string{},
)
if err != nil {
return fmt.Errorf("cannot emit event: %w", err)
}

signals := make(chan *dbus.Signal)
w.conn.Signal(signals)
go func() {
Expand All @@ -142,6 +153,17 @@ func (w *Worker) Connect(quit <-chan os.Signal) error {

<-quit

// Emit a stopped event
err = w.EmitEvent(
ipc.WorkerEventNameStopped,
"",
"",
map[string]string{},
)
if err != nil {
return fmt.Errorf("cannot emit event: %w", err)
}

return nil
}

Expand Down