Skip to content

Commit

Permalink
feat: instant job update notifications using server-sent events
Browse files Browse the repository at this point in the history
This commit introduces a new feature that enables clients to receive
instant notifications for job status updates.

Details:

- A new endpoint `/jobs/{id}/status/subscribe` has been added, which
  implements instant notifications through Server-Sent Events (SSE).
- By subscribing to this endpoint, clients can efficiently keep track of
  job status changes without the need to repeatedly poll the server.
- A client reference implementation has been added to wfxctl.

Signed-off-by: Michael Adler <[email protected]>
  • Loading branch information
michaeladler committed Jun 26, 2023
1 parent 649698b commit 8f073d8
Show file tree
Hide file tree
Showing 48 changed files with 2,291 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Instant job update notifications using server-sent events (see #11)

### Fixed

- Send HTTP status code 404 when attempting to access the file server while it is disabled
Expand Down
35 changes: 35 additions & 0 deletions api/job_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"fmt"
"net/http"
"testing"
"time"

"github.com/siemens/wfx/generated/model"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/workflow/dau"
"github.com/steinfletcher/apitest"
Expand Down Expand Up @@ -118,3 +120,36 @@ func TestJobStatusUpdate(t *testing.T) {
Assert(jsonpath.Contains(`$.state`, "DOWNLOADING")).
End()
}

func TestJobStatusSubscribe(t *testing.T) {
db := newInMemoryDB(t)
north, south := createNorthAndSouth(t, db)
job := persistJob(t, db)
jobPath := fmt.Sprintf("/api/wfx/v1/jobs/%s/status/subscribe", job.ID)

handlers := []http.Handler{north, south}
for i, name := range allAPIs {
t.Run(name, func(t *testing.T) {
// add ourselves as subscriber as well
ch := status.AddSubscriber(context.Background(), job.ID)
go func() {
_, err := status.Update(context.Background(), db, job.ID, &model.JobStatus{State: "INSTALLING"}, model.EligibleEnumCLIENT)
require.NoError(t, err)
<-ch
time.Sleep(time.Millisecond * 10)
// now its safe to shutdown
status.ShutdownSubscribers()
}()

apitest.New().
Handler(handlers[i]).
Get(jobPath).
Expect(t).
Status(http.StatusOK).
Body(`data: {"definitionHash":"4f53cda18c2baa0c0354bb5f9a3ecbe5ed12ab4d8e11ba873c2f11161202b945","state":"INSTALLING"}
`).
End()
})
}
}
7 changes: 7 additions & 0 deletions api/northbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/internal/producer"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/persistence"
)
Expand Down Expand Up @@ -263,5 +264,11 @@ func NewNorthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return northbound.NewDeleteJobsIDTagsOK().WithPayload(tags)
})

serverAPI.NorthboundGetJobsIDStatusSubscribeHandler = northbound.GetJobsIDStatusSubscribeHandlerFunc(
func(params northbound.GetJobsIDStatusSubscribeParams) middleware.Responder {
eventChan := status.AddSubscriber(params.HTTPRequest.Context(), params.ID)
return producer.SSEResponder(params.ID, eventChan)
})

return serverAPI, nil
}
7 changes: 7 additions & 0 deletions api/southbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/internal/producer"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/persistence"
)
Expand Down Expand Up @@ -176,5 +177,11 @@ func NewSouthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return southbound.NewGetJobsIDTagsOK().WithPayload(tags)
})

serverAPI.SouthboundGetJobsIDStatusSubscribeHandler = southbound.GetJobsIDStatusSubscribeHandlerFunc(
func(params southbound.GetJobsIDStatusSubscribeParams) middleware.Responder {
eventChan := status.AddSubscriber(params.HTTPRequest.Context(), params.ID)
return producer.SSEResponder(params.ID, eventChan)
})

return serverAPI, nil
}
6 changes: 5 additions & 1 deletion cmd/wfx/cmd/root/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/cmd/wfx/metadata"
"github.com/siemens/wfx/internal/config"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/persistence"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -250,7 +251,10 @@ Examples of tasks are installation of firmware or other types of commands issued
}
}

// Create a context with a timeout to allow outstanding requests to complete
// shut down (disconnect) subscribers otherwise we cannot stop the web server due to open connections
status.ShutdownSubscribers()

// create a context with a timeout to allow outstanding requests to complete
var timeout time.Duration
k.Read(func(k *koanf.Koanf) {
timeout = k.Duration(gracefulTimeoutFlag)
Expand Down
2 changes: 2 additions & 0 deletions cmd/wfxctl/cmd/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/getstatus"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/gettags"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/query"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/subscribestatus"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatedefinition"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatestatus"
"github.com/spf13/cobra"
Expand All @@ -43,4 +44,5 @@ func init() {
Command.AddCommand(addtags.Command)
Command.AddCommand(deltags.Command)
Command.AddCommand(gettags.Command)
Command.AddCommand(subscribestatus.Command)
}
92 changes: 92 additions & 0 deletions cmd/wfxctl/cmd/job/subscribestatus/subscribe_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package subscribestatus

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"os"
"time"

"github.com/Southclaws/fault"
"github.com/go-openapi/runtime"
"github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/tmaxmax/go-sse"

"github.com/siemens/wfx/cmd/wfxctl/errutil"
"github.com/siemens/wfx/cmd/wfxctl/flags"
generatedClient "github.com/siemens/wfx/generated/client"
"github.com/siemens/wfx/generated/client/jobs"
)

const (
idFlag = "id"
)

func init() {
f := Command.Flags()
f.String(idFlag, "", "job id")
}

type SSETransport struct {
baseCmd *flags.BaseCmd
}

// Submit implements the runtime.ClientTransport interface.
func (t SSETransport) Submit(op *runtime.ClientOperation) (interface{}, error) {
cfg := t.baseCmd.CreateTransportConfig()
rt := client.New(cfg.Host, generatedClient.DefaultBasePath, cfg.Schemes)
req := errutil.Must(rt.CreateHttpRequest(op))

httpClient := errutil.Must(t.baseCmd.CreateHTTPClient())
httpClient.Timeout = 0

client := sse.Client{
HTTPClient: httpClient,
DefaultReconnectionTime: time.Second * 5,
ResponseValidator: sse.DefaultValidator,
}

conn := client.NewConnection(req)
unsubscribe := conn.SubscribeMessages(func(event sse.Event) {
_, _ = os.Stdout.Write(event.Data)
os.Stdout.Write([]byte("\n"))
})
defer unsubscribe()

err := conn.Connect()
if err != nil {
return nil, fault.Wrap(err)
}

return jobs.NewGetJobsIDStatusSubscribeOK(), nil
}

var Command = &cobra.Command{
Use: "subscribe-status",
Short: "Subscribe to job update events",
Example: `
wfxctl job subscribe-status --id=1
`,
TraverseChildren: true,
Run: func(cmd *cobra.Command, args []string) {
params := jobs.NewGetJobsIDStatusSubscribeParams().
WithID(flags.Koanf.String(idFlag))
if params.ID == "" {
log.Fatal().Msg("Job ID missing")
}

baseCmd := flags.NewBaseCmd()
executor := generatedClient.New(SSETransport{baseCmd: &baseCmd}, strfmt.Default)
if _, err := executor.Jobs.GetJobsIDStatusSubscribe(params); err != nil {
log.Fatal().Err(err).Msg("Failed to subscribe to job status")
}
},
}
48 changes: 48 additions & 0 deletions cmd/wfxctl/cmd/job/subscribestatus/subscribe_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package subscribestatus

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"

"github.com/siemens/wfx/cmd/wfxctl/flags"
"github.com/stretchr/testify/assert"
)

func TestSubscribeJobStatus(t *testing.T) {
const expectedPath = "/api/wfx/v1/jobs/1/status/subscribe"
var actualPath string

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
actualPath = r.URL.Path

w.Header().Add("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`data: "hello world"
`))
}))
defer ts.Close()

u, _ := url.Parse(ts.URL)
_ = flags.Koanf.Set(flags.ClientHostFlag, u.Hostname())
port, _ := strconv.Atoi(u.Port())
_ = flags.Koanf.Set(flags.ClientPortFlag, port)

_ = flags.Koanf.Set(idFlag, "1")

err := Command.Execute()
assert.NoError(t, err)

assert.Equal(t, expectedPath, actualPath)
}
9 changes: 6 additions & 3 deletions cmd/wfxctl/flags/basecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (b *BaseCmd) CreateHTTPClient() (*http.Client, error) {
if err != nil {
return nil, fault.Wrap(err)
}
log.Info().Msg("Using unix-domain socket transport")
return &http.Client{
Transport: &http.Transport{
Dial: func(_, _ string) (net.Conn, error) {
Expand Down Expand Up @@ -125,6 +126,10 @@ func (b *BaseCmd) CreateHTTPClient() (*http.Client, error) {
}

func (b *BaseCmd) CreateClient() *client.WorkflowExecutor {
return client.NewHTTPClientWithConfig(strfmt.Default, b.CreateTransportConfig())
}

func (b *BaseCmd) CreateTransportConfig() *client.TransportConfig {
var host string
var schemes []string
if b.EnableTLS {
Expand All @@ -134,11 +139,9 @@ func (b *BaseCmd) CreateClient() *client.WorkflowExecutor {
schemes = []string{"http"}
host = fmt.Sprintf("%s:%d", b.Host, b.Port)
}

cfg := client.DefaultTransportConfig().
return client.DefaultTransportConfig().
WithHost(host).
WithSchemes(schemes)
return client.NewHTTPClientWithConfig(strfmt.Default, cfg)
}

func (b *BaseCmd) CreateMgmtClient() *client.WorkflowExecutor {
Expand Down
Loading

0 comments on commit 8f073d8

Please sign in to comment.