Skip to content

Commit

Permalink
feat: Add more events for starting and stopping of worker
Browse files Browse the repository at this point in the history
* When worker is started, then it sends event "started"
* It also sends event "stopped", when it is terminated and
  proces cannot handle message anymore
  • Loading branch information
jirihnidek committed Nov 13, 2023
1 parent 522e6ae commit 8b6314f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 12 deletions.
1 change: 1 addition & 0 deletions internal/work/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (d *Dispatcher) Connect() error {
go func() {
for s := range signals {
log.Tracef("received signal: %#v", s)

dest, err := d.senderName(dbus.Sender(s.Sender))
if err != nil {
log.Errorf("cannot find sender: %v", err)
Expand Down
39 changes: 27 additions & 12 deletions ipc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ var InterfaceDispatcher string
type DispatcherEvent uint

const (
// Emitted when the dispatcher receives the "disconnect" command.
// DispatcherEventReceivedDisconnect is emitted when the dispatcher receives
// the "disconnect" command.
DispatcherEventReceivedDisconnect DispatcherEvent = 1

// Emitted when the transport unexpected disconnects from the network.
// DispatcherEventUnexpectedDisconnect is emitted when the transport unexpected
// disconnects from the network.
DispatcherEventUnexpectedDisconnect DispatcherEvent = 2

// Emitted when the transport reconnects to the network.
// DispatcherEventConnectionRestored is emitted when the transport reconnects
// to the network.
DispatcherEventConnectionRestored DispatcherEvent = 3
)

Expand All @@ -29,28 +32,40 @@ type WorkerEventName uint

const (

// Emitted when the worker "accepts" a dispatched message and begins
// "working".
// WorkerEventNameBegin is emitted when the worker "accepts"
// a dispatched message and begins "working".
WorkerEventNameBegin WorkerEventName = 1

// Emitted when the worker finishes "working".
// WorkerEventNameEnd is emitted when the worker finishes "working".
WorkerEventNameEnd WorkerEventName = 2

// Emitted when the worker wishes to continue to announce it is
// working.
// WorkerEventNameWorking is emitted when the worker wishes
// to continue to announce it is working.
WorkerEventNameWorking WorkerEventName = 3

// WorkerEventNameStarted is emitted when finished starting
// process and it can
WorkerEventNameStarted WorkerEventName = 5

// WorkerEventNameStopped is emitted when worker is stopped,
// and it cannot process any message
WorkerEventNameStopped WorkerEventName = 6
)

func (e WorkerEventName) String() string {
switch e {
case 1:
case WorkerEventNameBegin:
return "BEGIN"
case 2:
case WorkerEventNameEnd:
return "END"
case 3:
case WorkerEventNameWorking:
return "WORKING"
case WorkerEventNameStarted:
return "STARTED"
case WorkerEventNameStopped:
return "STOPPED"
}
return ""
return "UNKNOWN"
}

type WorkerEvent struct {
Expand Down
22 changes: 22 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ func (w *Worker) Connect(quit <-chan os.Signal) error {
return fmt.Errorf("request name failed")
}

// Emit a started event
err = w.EmitEvent(
ipc.WorkerEventNameStarted,
"",
"",
map[string]string{},
)
if err != nil {
return fmt.Errorf("cannot emit event: %w", err)
}

signals := make(chan *dbus.Signal)
w.conn.Signal(signals)
go func() {
Expand All @@ -142,6 +153,17 @@ func (w *Worker) Connect(quit <-chan os.Signal) error {

<-quit

// Emit a stopped event
err = w.EmitEvent(
ipc.WorkerEventNameStopped,
"",
"",
map[string]string{},
)
if err != nil {
return fmt.Errorf("cannot emit event: %w", err)
}

return nil
}

Expand Down

0 comments on commit 8b6314f

Please sign in to comment.