From ab94b7ef6ccdd112330927007b8333862cc4b6e7 Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 00:41:39 +0800 Subject: [PATCH 01/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 57 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index 09168372..3a3b692e 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -121,6 +121,31 @@ iter: return ctx, fmt.Errorf("no more active nodes left") } +func (c *ConnectionPool) StickyContextExcludeNode(ctx context.Context) (context.Context, error) { + nodeID, _ := ctx.Value(_StickyCtxKey).(uint32) + if nodeID == 0 { + return ctx, fmt.Errorf("no node for exclude") + } + usedNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) + usedNodes = append(usedNodes, nodeID) + + c.nodesMx.RLock() + defer c.nodesMx.RUnlock() + +iter: + for _, node := range c.activeNodes { + for _, usedNode := range usedNodes { + if usedNode == node.id { + continue iter + } + } + + return context.WithValue(ctx, _StickyCtxUsedNodesKey, usedNodes), nil + } + + return ctx, fmt.Errorf("no more active nodes left") +} + func (c *ConnectionPool) StickyContextWithNodeID(ctx context.Context, nodeId uint32) context.Context { return context.WithValue(ctx, _StickyCtxKey, nodeId) } @@ -190,6 +215,11 @@ func (c *ConnectionPool) QueryADNL(ctx context.Context, request tl.Serializable, if err != nil { return err } + } else if nodeIDs, ok := ctx.Value(_StickyCtxUsedNodesKey).([]uint32); ok && len(nodeIDs) > 0 { + node, err = c.queryExcludeSticky(nodeIDs, req) + if err != nil { + return err + } } else { node, err = c.queryWithSmartBalancer(req) if err != nil { @@ -221,23 +251,46 @@ func (c *ConnectionPool) QueryADNL(ctx context.Context, request tl.Serializable, func (c *ConnectionPool) querySticky(id uint32, req *ADNLRequest) (*connection, error) { c.nodesMx.RLock() + defer c.nodesMx.RUnlock() + for _, node := range c.activeNodes { if node.id == id { atomic.AddInt64(&node.weight, -1) _, err := node.queryAdnl(req.QueryID, req.Data) if err == nil { - c.nodesMx.RUnlock() return node, nil } break } } - c.nodesMx.RUnlock() // fallback if bounded node is not available return c.queryWithSmartBalancer(req) } +func (c *ConnectionPool) queryExcludeSticky(ids []uint32, req *ADNLRequest) (*connection, error) { + c.nodesMx.RLock() + defer c.nodesMx.RUnlock() + +iter: + for _, node := range c.activeNodes { + for _, id := range ids { + if node.id == id { + continue iter + } + } + + atomic.AddInt64(&node.weight, -1) + _, err := node.queryAdnl(req.QueryID, req.Data) + if err == nil { + return node, nil + } + } + + // fallback if another nodes are not available + return c.queryWithSmartBalancer(req) +} + func (c *ConnectionPool) queryWithSmartBalancer(req *ADNLRequest) (*connection, error) { var reqNode *connection From 3e90ea5a8118a3f17dedc05505623452853cfa31 Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 00:52:05 +0800 Subject: [PATCH 02/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 39 ++++++++++----------------------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index 3a3b692e..dd75ecba 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -210,18 +210,14 @@ func (c *ConnectionPool) QueryADNL(ctx context.Context, request tl.Serializable, tm := time.Now() var node *connection + nodeIDs, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) if nodeID, ok := ctx.Value(_StickyCtxKey).(uint32); ok && nodeID > 0 { node, err = c.querySticky(nodeID, req) if err != nil { return err } - } else if nodeIDs, ok := ctx.Value(_StickyCtxUsedNodesKey).([]uint32); ok && len(nodeIDs) > 0 { - node, err = c.queryExcludeSticky(nodeIDs, req) - if err != nil { - return err - } } else { - node, err = c.queryWithSmartBalancer(req) + node, err = c.queryWithSmartBalancer(nodeIDs, req) if err != nil { return err } @@ -251,51 +247,36 @@ func (c *ConnectionPool) QueryADNL(ctx context.Context, request tl.Serializable, func (c *ConnectionPool) querySticky(id uint32, req *ADNLRequest) (*connection, error) { c.nodesMx.RLock() - defer c.nodesMx.RUnlock() for _, node := range c.activeNodes { if node.id == id { atomic.AddInt64(&node.weight, -1) _, err := node.queryAdnl(req.QueryID, req.Data) if err == nil { + c.nodesMx.RUnlock() return node, nil } break } } + c.nodesMx.RUnlock() // fallback if bounded node is not available - return c.queryWithSmartBalancer(req) + return c.queryWithSmartBalancer(nil, req) } -func (c *ConnectionPool) queryExcludeSticky(ids []uint32, req *ADNLRequest) (*connection, error) { +func (c *ConnectionPool) queryWithSmartBalancer(excludeNodes []uint32, req *ADNLRequest) (*connection, error) { + var reqNode *connection + c.nodesMx.RLock() - defer c.nodesMx.RUnlock() iter: for _, node := range c.activeNodes { - for _, id := range ids { - if node.id == id { + for _, excludeNode := range excludeNodes { + if node.id == excludeNode { continue iter } } - - atomic.AddInt64(&node.weight, -1) - _, err := node.queryAdnl(req.QueryID, req.Data) - if err == nil { - return node, nil - } - } - - // fallback if another nodes are not available - return c.queryWithSmartBalancer(req) -} - -func (c *ConnectionPool) queryWithSmartBalancer(req *ADNLRequest) (*connection, error) { - var reqNode *connection - - c.nodesMx.RLock() - for _, node := range c.activeNodes { if reqNode == nil { reqNode = node continue From 2ef977707049951028df196969a3456b6e9b6a9d Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 00:53:22 +0800 Subject: [PATCH 03/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index dd75ecba..8acb6a3e 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -210,14 +210,14 @@ func (c *ConnectionPool) QueryADNL(ctx context.Context, request tl.Serializable, tm := time.Now() var node *connection - nodeIDs, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) + excludeNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) if nodeID, ok := ctx.Value(_StickyCtxKey).(uint32); ok && nodeID > 0 { node, err = c.querySticky(nodeID, req) if err != nil { return err } } else { - node, err = c.queryWithSmartBalancer(nodeIDs, req) + node, err = c.queryWithSmartBalancer(excludeNodes, req) if err != nil { return err } From a72ac30a7925680aa6d830458af60ea288ae111d Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 00:54:04 +0800 Subject: [PATCH 04/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index 8acb6a3e..1324d80f 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -124,7 +124,7 @@ iter: func (c *ConnectionPool) StickyContextExcludeNode(ctx context.Context) (context.Context, error) { nodeID, _ := ctx.Value(_StickyCtxKey).(uint32) if nodeID == 0 { - return ctx, fmt.Errorf("no node for exclude") + return ctx, fmt.Errorf("no node to exclude") } usedNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) usedNodes = append(usedNodes, nodeID) From 7cbaf33a6369f985a2dca3395cc46c2546ccebfd Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 01:06:27 +0800 Subject: [PATCH 05/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index 1324d80f..a36a7450 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -270,6 +270,10 @@ func (c *ConnectionPool) queryWithSmartBalancer(excludeNodes []uint32, req *ADNL c.nodesMx.RLock() + if len(c.activeNodes) == 0 { + return nil, ErrNoActiveConnections + } + iter: for _, node := range c.activeNodes { for _, excludeNode := range excludeNodes { @@ -290,7 +294,10 @@ iter: c.nodesMx.RUnlock() if reqNode == nil { - return nil, ErrNoActiveConnections + if len(excludeNodes) == 0 { + return nil, ErrNoActiveConnections + } + return c.queryWithSmartBalancer(nil, req) } atomic.AddInt64(&reqNode.weight, -1) From f276edfb81c0fea85c89daf83dc85a60ab8546cf Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 01:07:40 +0800 Subject: [PATCH 06/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index a36a7450..a112dd46 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -247,7 +247,6 @@ func (c *ConnectionPool) QueryADNL(ctx context.Context, request tl.Serializable, func (c *ConnectionPool) querySticky(id uint32, req *ADNLRequest) (*connection, error) { c.nodesMx.RLock() - for _, node := range c.activeNodes { if node.id == id { atomic.AddInt64(&node.weight, -1) @@ -271,6 +270,7 @@ func (c *ConnectionPool) queryWithSmartBalancer(excludeNodes []uint32, req *ADNL c.nodesMx.RLock() if len(c.activeNodes) == 0 { + c.nodesMx.RUnlock() return nil, ErrNoActiveConnections } From 1addafd0e87626b79e724d11cce8170192bbf035 Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 01:54:20 +0800 Subject: [PATCH 07/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index a112dd46..ba0f1ac5 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -140,7 +140,7 @@ iter: } } - return context.WithValue(ctx, _StickyCtxUsedNodesKey, usedNodes), nil + return c.StickyContextWithNodeID(ctx, node.id), nil } return ctx, fmt.Errorf("no more active nodes left") From b347c186f6a871a378e18547175afb700240f171 Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 02:06:04 +0800 Subject: [PATCH 08/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index ba0f1ac5..47e97606 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -217,7 +217,7 @@ func (c *ConnectionPool) QueryADNL(ctx context.Context, request tl.Serializable, return err } } else { - node, err = c.queryWithSmartBalancer(excludeNodes, req) + node, err = c.queryWithSmartBalancer(req, excludeNodes...) if err != nil { return err } @@ -261,10 +261,10 @@ func (c *ConnectionPool) querySticky(id uint32, req *ADNLRequest) (*connection, c.nodesMx.RUnlock() // fallback if bounded node is not available - return c.queryWithSmartBalancer(nil, req) + return c.queryWithSmartBalancer(req) } -func (c *ConnectionPool) queryWithSmartBalancer(excludeNodes []uint32, req *ADNLRequest) (*connection, error) { +func (c *ConnectionPool) queryWithSmartBalancer(req *ADNLRequest, excludeNodes ...uint32) (*connection, error) { var reqNode *connection c.nodesMx.RLock() @@ -294,10 +294,10 @@ iter: c.nodesMx.RUnlock() if reqNode == nil { - if len(excludeNodes) == 0 { - return nil, ErrNoActiveConnections + if len(excludeNodes) > 0 { + return c.queryWithSmartBalancer(req) } - return c.queryWithSmartBalancer(nil, req) + return nil, ErrNoActiveConnections } atomic.AddInt64(&reqNode.weight, -1) From 9a53390c8fc02d1ab9d47786c855a75bad1346e1 Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 02:11:20 +0800 Subject: [PATCH 09/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index 47e97606..f4830e41 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -140,7 +140,7 @@ iter: } } - return c.StickyContextWithNodeID(ctx, node.id), nil + return context.WithValue(ctx, _StickyCtxUsedNodesKey, usedNodes), nil } return ctx, fmt.Errorf("no more active nodes left") From 2a4fe0a463f223f3eea2e27ce85e625937b68bc1 Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 02:16:32 +0800 Subject: [PATCH 10/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index f4830e41..75ee3015 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -140,7 +140,7 @@ iter: } } - return context.WithValue(ctx, _StickyCtxUsedNodesKey, usedNodes), nil + return context.WithValue(context.WithValue(ctx, _StickyCtxKey, 0), _StickyCtxUsedNodesKey, usedNodes), nil } return ctx, fmt.Errorf("no more active nodes left") From bbcf5f7befcd45dd5183bb5386568650a19f769b Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 02:43:26 +0800 Subject: [PATCH 11/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- liteclient/pool.go | 43 ++++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/liteclient/pool.go b/liteclient/pool.go index 75ee3015..761e8475 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -81,7 +81,7 @@ func NewConnectionPoolWithAuth(key ed25519.PrivateKey) *ConnectionPool { // // In case if sticky node goes down, default balancer will be used as fallback func (c *ConnectionPool) StickyContext(ctx context.Context) context.Context { - if ctx.Value(_StickyCtxKey) != nil { + if c.StickyNodeID(ctx) != 0 { return ctx } @@ -94,11 +94,11 @@ func (c *ConnectionPool) StickyContext(ctx context.Context) context.Context { } c.nodesMx.RUnlock() - return context.WithValue(ctx, _StickyCtxKey, id) + return stickyContextWithNodeID(ctx, id) } func (c *ConnectionPool) StickyContextNextNode(ctx context.Context) (context.Context, error) { - nodeID, _ := ctx.Value(_StickyCtxKey).(uint32) + nodeID := c.StickyNodeID(ctx) usedNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) if nodeID > 0 { usedNodes = append(usedNodes, nodeID) @@ -115,38 +115,51 @@ iter: } } - return context.WithValue(context.WithValue(ctx, _StickyCtxKey, node.id), _StickyCtxUsedNodesKey, usedNodes), nil + return context.WithValue(stickyContextWithNodeID(ctx, node.id), _StickyCtxUsedNodesKey, usedNodes), nil } return ctx, fmt.Errorf("no more active nodes left") } func (c *ConnectionPool) StickyContextExcludeNode(ctx context.Context) (context.Context, error) { - nodeID, _ := ctx.Value(_StickyCtxKey).(uint32) + nodeID := c.StickyNodeID(ctx) if nodeID == 0 { return ctx, fmt.Errorf("no node to exclude") } + usedNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) usedNodes = append(usedNodes, nodeID) c.nodesMx.RLock() defer c.nodesMx.RUnlock() -iter: - for _, node := range c.activeNodes { - for _, usedNode := range usedNodes { - if usedNode == node.id { - continue iter - } - } - - return context.WithValue(context.WithValue(ctx, _StickyCtxKey, 0), _StickyCtxUsedNodesKey, usedNodes), nil + if len(c.activeNodes) < len(usedNodes) { + return context.WithValue(stickyContextWithNodeID(ctx, 0), _StickyCtxUsedNodesKey, usedNodes), nil } return ctx, fmt.Errorf("no more active nodes left") } func (c *ConnectionPool) StickyContextWithNodeID(ctx context.Context, nodeId uint32) context.Context { + usedNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) + if len(usedNodes) == 0 { + return context.WithValue(ctx, _StickyCtxKey, nodeId) + } + + nodes := make([]uint32, 0, len(usedNodes)) + for _, node := range usedNodes { + if node != nodeId { + nodes = append(nodes, node) + } + } + if len(nodes) == len(usedNodes) { + return stickyContextWithNodeID(ctx, nodeId) + } + + return context.WithValue(stickyContextWithNodeID(ctx, nodeId), _StickyCtxUsedNodesKey, usedNodes) +} + +func stickyContextWithNodeID(ctx context.Context, nodeId uint32) context.Context { return context.WithValue(ctx, _StickyCtxKey, nodeId) } @@ -211,7 +224,7 @@ func (c *ConnectionPool) QueryADNL(ctx context.Context, request tl.Serializable, var node *connection excludeNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) - if nodeID, ok := ctx.Value(_StickyCtxKey).(uint32); ok && nodeID > 0 { + if nodeID := c.StickyNodeID(ctx); nodeID > 0 { node, err = c.querySticky(nodeID, req) if err != nil { return err From ad95abeb5eb311ae05ec9bd06b5d21a3ceeec2d0 Mon Sep 17 00:00:00 2001 From: "s.klimov" Date: Sun, 26 May 2024 02:46:18 +0800 Subject: [PATCH 12/12] add StickyContextExcludeNode Signed-off-by: s.klimov --- ton/api.go | 1 + ton/retrier.go | 6 +++++- ton/timeouter.go | 4 ++++ ton/waiter.go | 4 ++++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/ton/api.go b/ton/api.go index e86c74af..d592f9ff 100644 --- a/ton/api.go +++ b/ton/api.go @@ -34,6 +34,7 @@ type LiteClient interface { QueryLiteserver(ctx context.Context, payload tl.Serializable, result tl.Serializable) error StickyContext(ctx context.Context) context.Context StickyContextNextNode(ctx context.Context) (context.Context, error) + StickyContextExcludeNode(ctx context.Context) (context.Context, error) StickyNodeID(ctx context.Context) uint32 } diff --git a/ton/retrier.go b/ton/retrier.go index 94edba32..e91d5666 100644 --- a/ton/retrier.go +++ b/ton/retrier.go @@ -25,7 +25,7 @@ func (w *retryClient) QueryLiteserver(ctx context.Context, payload tl.Serializab tries++ if err != nil { - if !errors.Is(err, liteclient.ErrADNLReqTimeout) && !errors.Is(err, context.DeadlineExceeded){ + if !errors.Is(err, liteclient.ErrADNLReqTimeout) && !errors.Is(err, context.DeadlineExceeded) { return err } @@ -69,6 +69,10 @@ func (w *retryClient) StickyNodeID(ctx context.Context) uint32 { return w.original.StickyNodeID(ctx) } +func (w *retryClient) StickyContextExcludeNode(ctx context.Context) (context.Context, error) { + return w.original.StickyContextExcludeNode(ctx) +} + func (w *retryClient) StickyContextNextNode(ctx context.Context) (context.Context, error) { return w.original.StickyContextNextNode(ctx) } diff --git a/ton/timeouter.go b/ton/timeouter.go index cd617437..2c08e795 100644 --- a/ton/timeouter.go +++ b/ton/timeouter.go @@ -27,6 +27,10 @@ func (c *timeoutClient) StickyNodeID(ctx context.Context) uint32 { return c.original.StickyNodeID(ctx) } +func (w *timeoutClient) StickyContextExcludeNode(ctx context.Context) (context.Context, error) { + return w.original.StickyContextExcludeNode(ctx) +} + func (c *timeoutClient) StickyContextNextNode(ctx context.Context) (context.Context, error) { return c.original.StickyContextNextNode(ctx) } diff --git a/ton/waiter.go b/ton/waiter.go index ae36d584..c008747f 100644 --- a/ton/waiter.go +++ b/ton/waiter.go @@ -49,3 +49,7 @@ func (w *waiterClient) StickyNodeID(ctx context.Context) uint32 { func (w *waiterClient) StickyContextNextNode(ctx context.Context) (context.Context, error) { return w.original.StickyContextNextNode(ctx) } + +func (w *waiterClient) StickyContextExcludeNode(ctx context.Context) (context.Context, error) { + return w.original.StickyContextExcludeNode(ctx) +}