Skip to content

Commit

Permalink
client: add connState channel
Browse files Browse the repository at this point in the history
Add support for an optional channel to receive client state changes.

Fixes #741
  • Loading branch information
magiconair committed Feb 9, 2025
1 parent 4476e95 commit 883309c
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 11 deletions.
33 changes: 22 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ type Client struct {
// atomicState of the client
atomicState atomic.Value // ConnState

// stateCh is an optional channel for connection state changes. May be nil.
stateCh chan<- ConnState

// list of cached atomicNamespaces on the server
atomicNamespaces atomic.Value // []string

Expand Down Expand Up @@ -197,10 +200,12 @@ func NewClient(endpoint string, opts ...Option) (*Client, error) {
pendingAcks: make([]*ua.SubscriptionAcknowledgement, 0),
pausech: make(chan struct{}, 2),
resumech: make(chan struct{}, 2),
stateCh: cfg.stateCh,
}
c.pauseSubscriptions(context.Background())
c.setPublishTimeout(uasc.MaxTimeout)
c.setState(Closed)
// cannot use setState here since it would trigger the stateCh
c.atomicState.Store(Closed)
c.setSecureChannel(nil)
c.setSession(nil)
c.setNamespaces([]string{})
Expand Down Expand Up @@ -230,7 +235,7 @@ func (c *Client) Connect(ctx context.Context) error {
return errors.Errorf("already connected")
}

c.setState(Connecting)
c.setState(ctx, Connecting)
if err := c.Dial(ctx); err != nil {
stats.RecordError(err)

Expand All @@ -251,7 +256,7 @@ func (c *Client) Connect(ctx context.Context) error {

return err
}
c.setState(Connected)
c.setState(ctx, Connected)

mctx, mcancel := context.WithCancel(context.Background())
c.mcancel = mcancel
Expand Down Expand Up @@ -282,7 +287,7 @@ func (c *Client) monitor(ctx context.Context) {
defer dlog.Printf("done")

defer c.mcancel()
defer c.setState(Closed)
defer c.setState(ctx, Closed)

action := none
for {
Expand All @@ -306,7 +311,7 @@ func (c *Client) monitor(ctx context.Context) {
}

// tell the handler the connection is disconnected
c.setState(Disconnected)
c.setState(ctx, Disconnected)
dlog.Print("disconnected")

if !c.cfg.sechan.AutoReconnect {
Expand Down Expand Up @@ -387,7 +392,7 @@ func (c *Client) monitor(ctx context.Context) {
c.setSecureChannel(nil)
}

c.setState(Reconnecting)
c.setState(ctx, Reconnecting)

dlog.Printf("trying to recreate secure channel")
for {
Expand All @@ -412,7 +417,7 @@ func (c *Client) monitor(ctx context.Context) {
// This only works if the session is still open on the server
// otherwise recreate it

c.setState(Reconnecting)
c.setState(ctx, Reconnecting)

s := c.Session()
if s == nil {
Expand Down Expand Up @@ -443,7 +448,7 @@ func (c *Client) monitor(ctx context.Context) {
case recreateSession:
dlog.Printf("action: recreateSession")

c.setState(Reconnecting)
c.setState(ctx, Reconnecting)
// create a new session to replace the previous one

// clear any previous session as we know the server has closed it
Expand Down Expand Up @@ -548,7 +553,7 @@ func (c *Client) monitor(ctx context.Context) {
activeSubs++
}

c.setState(Connected)
c.setState(ctx, Connected)
action = none

case abortReconnect:
Expand Down Expand Up @@ -618,7 +623,7 @@ func (c *Client) Close(ctx context.Context) error {
// try to close the session but ignore any error
// so that we close the underlying channel and connection.
c.CloseSession(ctx)
c.setState(Closed)
c.setState(ctx, Closed)

if c.mcancel != nil {
c.mcancel()
Expand Down Expand Up @@ -650,8 +655,14 @@ func (c *Client) State() ConnState {
return c.atomicState.Load().(ConnState)
}

func (c *Client) setState(s ConnState) {
func (c *Client) setState(ctx context.Context, s ConnState) {
c.atomicState.Store(s)
if c.stateCh != nil {
select {
case <-ctx.Done():
case c.stateCh <- s:
}
}
n := new(expvar.Int)
n.Set(int64(s))
stats.Client().Set("State", n)
Expand Down
62 changes: 62 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,65 @@ func TestCloneBrowseRequest(t *testing.T) {
})
}
}

func TestClient_SetState(t *testing.T) {
tests := []struct {
name string
state ConnState
withChan bool
ctxCancel bool
}{
{
name: "set state without channel",
state: Connected,
withChan: false,
},
{
name: "set state with channel",
state: Connecting,
withChan: true,
},
{
name: "set state with cancelled context",
state: Disconnected,
withChan: true,
ctxCancel: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var opts []Option
var stateCh chan ConnState
if tt.withChan {
stateCh = make(chan ConnState, 1)
opts = append(opts, StateChangedCh(stateCh))
}

c, err := NewClient("opc.tcp://example.com:4840", opts...)
require.NoError(t, err)

if tt.ctxCancel {
cancel()
}

c.setState(ctx, tt.state)

// Verify state was set correctly
require.Equal(t, tt.state, c.State())

// Verify channel received state if channel exists
if tt.withChan && !tt.ctxCancel {
select {
case state := <-stateCh:
require.Equal(t, tt.state, state)
default:
t.Fatal("expected state on channel but got none")
}
}
})
}
}
12 changes: 12 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Config struct {
dialer *uacp.Dialer
sechan *uasc.Config
session *uasc.SessionConfig
stateCh chan<- ConnState
}

// NewDialer creates a uacp.Dialer from the config options
Expand Down Expand Up @@ -560,6 +561,17 @@ func SendBufferSize(n uint32) Option {
}
}

// StateChangedCh sets the channel for receiving client connection state changes.
//
// The caller must either consume the channel immediately or provide a buffer
// to prevent blocking state changes in the client.
func StateChangedCh(ch chan<- ConnState) Option {
return func(cfg *Config) error {
cfg.stateCh = ch
return nil
}
}

func initDialer(cfg *Config) {
if cfg.dialer == nil {
cfg.dialer = &uacp.Dialer{}
Expand Down
9 changes: 9 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func TestOptions(t *testing.T) {
require.NoError(t, err, "WriteFile(keyPEMFile) failed")
defer os.Remove(keyPEMFile)

connStateCh := make(chan ConnState)

tests := []struct {
name string
opt Option
Expand Down Expand Up @@ -306,6 +308,13 @@ func TestOptions(t *testing.T) {
cfg: &Config{},
err: notFoundError("certificate", "x"),
},
{
name: `ConnStateCh`,
opt: StateChangedCh(connStateCh),
cfg: &Config{
stateCh: connStateCh,
},
},
{
name: `Lifetime(10ms)`,
opt: Lifetime(10 * time.Millisecond),
Expand Down

0 comments on commit 883309c

Please sign in to comment.