Skip to content

Commit

Permalink
feat: introduce real-time job update notifications via SSE
Browse files Browse the repository at this point in the history
This commit introduces a new feature that enables clients to receive
real-time notifications for job status updates.

Details:

- A new endpoint `/jobs/{id}/status/subscribe` has been added, which
  implements real-time notifications through Server-Sent Events (SSE).
- By subscribing to this endpoint, clients can efficiently keep track of
  job status changes without the need to repeatedly poll the server.
- A client reference implementation has been added to wfxctl.

Signed-off-by: Michael Adler <[email protected]>
  • Loading branch information
michaeladler committed Jun 23, 2023
1 parent 649698b commit 46296f8
Show file tree
Hide file tree
Showing 42 changed files with 2,415 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Real-time job update notifications via SSE (see #11)

### Fixed

- Send HTTP status code 404 when attempting to access the file server while it is disabled
Expand Down
33 changes: 33 additions & 0 deletions api/job_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"fmt"
"net/http"
"testing"
"time"

"github.com/siemens/wfx/generated/model"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/workflow/dau"
"github.com/steinfletcher/apitest"
Expand Down Expand Up @@ -118,3 +120,34 @@ func TestJobStatusUpdate(t *testing.T) {
Assert(jsonpath.Contains(`$.state`, "DOWNLOADING")).
End()
}

func TestJobStatusSubscribe(t *testing.T) {
db := newInMemoryDB(t)
north, south := createNorthAndSouth(t, db)
job := persistJob(t, db)
jobPath := fmt.Sprintf("/api/wfx/v1/jobs/%s/status/subscribe", job.ID)

handlers := []http.Handler{north, south}
for i, name := range allAPIs {
t.Run(name, func(t *testing.T) {
// add ourselves as subscriber as well
ch := status.AddSubscriber(context.Background(), job.ID)
go func() {
_, err := status.Update(context.Background(), db, job.ID, &model.JobStatus{State: "INSTALLING"}, model.EligibleEnumCLIENT)
require.NoError(t, err)
<-ch
time.Sleep(time.Millisecond * 10)
// now its safe to shutdown
status.ShutdownSubscribers()
}()

apitest.New().
Handler(handlers[i]).
Get(jobPath).
Expect(t).
Status(http.StatusOK).
Assert(jsonpath.Equal(`$.data`, `{"definitionHash":"4f53cda18c2baa0c0354bb5f9a3ecbe5ed12ab4d8e11ba873c2f11161202b945","state":"INSTALLING"}`)).
End()
})
}
}
7 changes: 7 additions & 0 deletions api/northbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/internal/producer"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/persistence"
)
Expand Down Expand Up @@ -263,5 +264,11 @@ func NewNorthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return northbound.NewDeleteJobsIDTagsOK().WithPayload(tags)
})

serverAPI.NorthboundGetJobsIDStatusSubscribeHandler = northbound.GetJobsIDStatusSubscribeHandlerFunc(
func(params northbound.GetJobsIDStatusSubscribeParams) middleware.Responder {
eventChan := status.AddSubscriber(params.HTTPRequest.Context(), params.ID)
return producer.SSEResponder(params.ID, eventChan)
})

return serverAPI, nil
}
7 changes: 7 additions & 0 deletions api/southbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/internal/producer"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/persistence"
)
Expand Down Expand Up @@ -176,5 +177,11 @@ func NewSouthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return southbound.NewGetJobsIDTagsOK().WithPayload(tags)
})

serverAPI.SouthboundGetJobsIDStatusSubscribeHandler = southbound.GetJobsIDStatusSubscribeHandlerFunc(
func(params southbound.GetJobsIDStatusSubscribeParams) middleware.Responder {
eventChan := status.AddSubscriber(params.HTTPRequest.Context(), params.ID)
return producer.SSEResponder(params.ID, eventChan)
})

return serverAPI, nil
}
4 changes: 4 additions & 0 deletions cmd/wfx/cmd/root/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/cmd/wfx/metadata"
"github.com/siemens/wfx/internal/config"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/persistence"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -250,6 +251,9 @@ Examples of tasks are installation of firmware or other types of commands issued
}
}

// Shut down status subscribers (server-sent events)
status.ShutdownSubscribers()

// Create a context with a timeout to allow outstanding requests to complete
var timeout time.Duration
k.Read(func(k *koanf.Koanf) {
Expand Down
2 changes: 2 additions & 0 deletions cmd/wfxctl/cmd/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/getstatus"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/gettags"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/query"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/subscribestatus"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatedefinition"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatestatus"
"github.com/spf13/cobra"
Expand All @@ -43,4 +44,5 @@ func init() {
Command.AddCommand(addtags.Command)
Command.AddCommand(deltags.Command)
Command.AddCommand(gettags.Command)
Command.AddCommand(subscribestatus.Command)
}
97 changes: 97 additions & 0 deletions cmd/wfxctl/cmd/job/subscribestatus/subscribe_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package subscribestatus

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"bufio"
"fmt"
"net/http"

"github.com/bcicen/jstream"
"github.com/go-openapi/runtime"
"github.com/go-openapi/runtime/client"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/siemens/wfx/cmd/wfxctl/errutil"
"github.com/siemens/wfx/cmd/wfxctl/flags"
generatedClient "github.com/siemens/wfx/generated/client"
"github.com/siemens/wfx/generated/client/jobs"
)

const (
idFlag = "id"
)

func init() {
f := Command.Flags()
f.String(idFlag, "", "job id")
}

type SSETransport struct {
baseCmd *flags.BaseCmd
}

func (t SSETransport) Submit(op *runtime.ClientOperation) (interface{}, error) {
cfg := t.baseCmd.CreateTransportConfig()
r := client.New(cfg.Host, generatedClient.DefaultBasePath, cfg.Schemes)
req := errutil.Must(r.CreateHttpRequest(op))

// this also works with unix-domain sockets
client := errutil.Must(t.baseCmd.CreateHTTPClient())
// no timeout for this connection, it can be open for a long time
client.Timeout = 0
resp := errutil.Must(client.Do(req))
if !isSuccess(resp.StatusCode) {
return nil, fmt.Errorf("got http status code %d", resp.StatusCode)
}

defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)

decoder := jstream.NewDecoder(reader, 0)
for mv := range decoder.Stream() {
if event, ok := mv.Value.(map[string]any); ok {
fmt.Println(event["data"])
}
}

return jobs.NewGetJobsIDStatusSubscribeOK(), nil
}

func isSuccess(statusCode int) bool {
return statusCode >= http.StatusOK && statusCode < 300
}

var Command = &cobra.Command{
Use: "subscribe-status",
Short: "Subscribe to job update events",
Example: `
wfxctl job subscribe-status --id=1
`,
TraverseChildren: true,
Run: func(cmd *cobra.Command, args []string) {
baseCmd := flags.NewBaseCmd()
httpClient := errutil.Must(baseCmd.CreateHTTPClient())
params := jobs.NewGetJobsIDStatusSubscribeParams().
WithHTTPClient(httpClient).
WithID(flags.Koanf.String(idFlag))

if params.ID == "" {
log.Fatal().Msg("Job ID missing")
}

client := baseCmd.CreateClient()
client.SetTransport(SSETransport{baseCmd: &baseCmd})
_, err := client.Jobs.GetJobsIDStatusSubscribe(params)
if err != nil {
log.Fatal().Err(err).Msg("Failed to subscribe to job status")
}
},
}
52 changes: 52 additions & 0 deletions cmd/wfxctl/cmd/job/subscribestatus/subscribe_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package subscribestatus

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"

"github.com/siemens/wfx/cmd/wfxctl/errutil"
"github.com/siemens/wfx/cmd/wfxctl/flags"
"github.com/siemens/wfx/generated/model"
"github.com/stretchr/testify/assert"
)

func TestSubscribeJobStatus(t *testing.T) {
const expectedPath = "/api/wfx/v1/jobs/1/status/subscribe"
var actualPath string

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
actualPath = r.URL.Path

event := model.SseEvent{Data: "hello world"}
b := errutil.Must(json.Marshal(event))

w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(b)
}))
defer ts.Close()

u, _ := url.Parse(ts.URL)
_ = flags.Koanf.Set(flags.ClientHostFlag, u.Hostname())
port, _ := strconv.Atoi(u.Port())
_ = flags.Koanf.Set(flags.ClientPortFlag, port)

_ = flags.Koanf.Set(idFlag, "1")

err := Command.Execute()
assert.NoError(t, err)

assert.Equal(t, expectedPath, actualPath)
}
8 changes: 5 additions & 3 deletions cmd/wfxctl/flags/basecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func (b *BaseCmd) CreateHTTPClient() (*http.Client, error) {
}

func (b *BaseCmd) CreateClient() *client.WorkflowExecutor {
return client.NewHTTPClientWithConfig(strfmt.Default, b.CreateTransportConfig())
}

func (b *BaseCmd) CreateTransportConfig() *client.TransportConfig {
var host string
var schemes []string
if b.EnableTLS {
Expand All @@ -134,11 +138,9 @@ func (b *BaseCmd) CreateClient() *client.WorkflowExecutor {
schemes = []string{"http"}
host = fmt.Sprintf("%s:%d", b.Host, b.Port)
}

cfg := client.DefaultTransportConfig().
return client.DefaultTransportConfig().
WithHost(host).
WithSchemes(schemes)
return client.NewHTTPClientWithConfig(strfmt.Default, cfg)
}

func (b *BaseCmd) CreateMgmtClient() *client.WorkflowExecutor {
Expand Down
Loading

0 comments on commit 46296f8

Please sign in to comment.