Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#683 keychain sdk configure multiple nodes endpoints #810

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7b6d2d2
Added support for multiple node urls
backsapc Sep 9, 2024
8e48821
Changed config
backsapc Sep 9, 2024
eefa6f1
Fixed test
backsapc Sep 9, 2024
cd821e6
Updated config for wardenkms
backsapc Oct 24, 2024
50fffb0
Merge branch 'main' into feature/683-keychain-sdk-configure-multiple-…
backsapc Oct 24, 2024
dc209fd
Fixed locking
backsapc Oct 24, 2024
f53b2ea
Merge branch 'feature/683-keychain-sdk-configure-multiple-nodes-endpo…
backsapc Oct 24, 2024
90d8ca5
Typo
backsapc Oct 24, 2024
ab22126
Simplified a bit
backsapc Oct 24, 2024
9b4c3a7
Code review
backsapc Oct 24, 2024
b1fe1de
Code review
backsapc Oct 24, 2024
fcd8167
Changed status check
backsapc Oct 24, 2024
31bbb3a
Fixed
backsapc Oct 25, 2024
3980feb
Omit err check
backsapc Oct 25, 2024
5ba8904
Merge branch 'main' into feature/683-keychain-sdk-configure-multiple-…
backsapc Oct 29, 2024
a23df86
Merge branch 'main' into feature/683-keychain-sdk-configure-multiple-…
backsapc Oct 30, 2024
bf12a7d
Merge branch 'main' into feature/683-keychain-sdk-configure-multiple-…
backsapc Nov 2, 2024
6b4ea38
Merge branch 'main' into feature/683-keychain-sdk-configure-multiple-…
backsapc Nov 11, 2024
b5923b8
uward revert
backsapc Nov 20, 2024
d4c3937
Fixed wardenkms startup
backsapc Nov 20, 2024
57bd353
Update keychain-sdk/tx_client_pool.go
backsapc Nov 22, 2024
6a545f3
Update keychain-sdk/example_keychain_test.go
backsapc Nov 22, 2024
3b7404f
Merge branch 'main' into feature/683-keychain-sdk-configure-multiple-…
backsapc Nov 27, 2024
3642a15
Code review comments
backsapc Nov 27, 2024
ce27189
Merge branch 'main' into feature/683-keychain-sdk-configure-multiple-…
backsapc Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cmd/wardenkms/health_check_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package main

type HealthCheckResponse struct {
// Online is the number of nodes that are online
Online uint `json:"online"`

// Total is the total number of nodes
Total uint `json:"total"`

// Threshold is the consensus threshold
Threshold uint8 `json:"threshold"`

// Nodes is a node statuses collection
Nodes []NodeStatus `json:"nodes"`
}

type NodeStatus struct {
// Address is the address of the node
Address string `json:"address"`

// Status is the status of the node
Status string `json:"status"`
}
72 changes: 55 additions & 17 deletions cmd/wardenkms/wardenkms.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"log/slog"
Expand All @@ -20,11 +21,10 @@ import (
)

type Config struct {
ChainID string `env:"CHAIN_ID, default=warden_1337-1"`
GRPCURL string `env:"GRPC_URL, default=localhost:9090"`
GRPCInsecure bool `env:"GRPC_INSECURE, default=true"`
Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"`
KeychainId uint64 `env:"KEYCHAIN_ID, default=1"`
ChainID string `env:"CHAIN_ID, default=warden_1337-1"`
GRPCURLs string `env:"GRPC_URLS, default=[{\"GRPCUrl\":\"localhost:9090\",\"GRPCInsecure\":true}]"`
Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"`
KeychainId uint64 `env:"KEYCHAIN_ID, default=1"`

KeyringMnemonic string `env:"KEYRING_MNEMONIC, required"`
KeyringPassword string `env:"KEYRING_PASSWORD, required"`
Expand All @@ -38,6 +38,8 @@ type Config struct {
HttpAddr string `env:"HTTP_ADDR, default=:8080"`

LogLevel slog.Level `env:"LOG_LEVEL, default=debug"`

ConsensusNodeThreshold uint8 `env:"CONSENSUS_NODE_THRESHOLD, default=1"`
}

func main() {
Expand All @@ -56,18 +58,24 @@ func main() {
return
}

var grpcConfigs []keychain.GRPCNodeConfig
if err := json.Unmarshal([]byte(cfg.GRPCURLs), &grpcConfigs); err != nil {
logger.Error("failed to initialize grpc configs", "error", err)
return
}

app := keychain.NewApp(keychain.Config{
Logger: logger,
ChainID: cfg.ChainID,
GRPCURL: cfg.GRPCURL,
GRPCInsecure: cfg.GRPCInsecure,
Mnemonic: cfg.Mnemonic,
KeychainID: cfg.KeychainId,
GasLimit: cfg.GasLimit,
BatchInterval: cfg.BatchInterval,
BatchSize: cfg.BatchSize,
TxTimeout: cfg.TxTimeout,
TxFees: sdk.NewCoins(sdk.NewCoin("award", math.NewInt(cfg.TxFee))),
Logger: logger,
ChainID: cfg.ChainID,
Mnemonic: cfg.Mnemonic,
KeychainID: cfg.KeychainId,
GasLimit: cfg.GasLimit,
BatchInterval: cfg.BatchInterval,
BatchSize: cfg.BatchSize,
TxTimeout: cfg.TxTimeout,
TxFees: sdk.NewCoins(sdk.NewCoin("award", math.NewInt(cfg.TxFee))),
Nodes: grpcConfigs,
ConsensusNodeThreshold: cfg.ConsensusNodeThreshold,
})

app.SetKeyRequestHandler(func(w keychain.KeyResponseWriter, req *keychain.KeyRequest) {
Expand Down Expand Up @@ -120,11 +128,41 @@ func main() {
if cfg.HttpAddr != "" {
logger.Info("starting HTTP server", "addr", cfg.HttpAddr)
http.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) {
if app.ConnectionState() == connectivity.Ready {
connectionStates := app.ConnectionState()

readyConnectionsCount := uint(0)
nodes := make([]NodeStatus, 0, len(connectionStates))

for url, state := range connectionStates {
if state == connectivity.Ready {
readyConnectionsCount += 1
}

nodes = append(nodes, NodeStatus{
Address: url,
Status: state.String(),
})
}

bytes, err := json.Marshal(HealthCheckResponse{
Online: readyConnectionsCount,
Total: uint(len(connectionStates)),
Nodes: nodes,
Threshold: cfg.ConsensusNodeThreshold,
})

if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

if readyConnectionsCount >= uint(cfg.ConsensusNodeThreshold) {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}

_, _ = w.Write(bytes)
})
go func() { _ = http.ListenAndServe(cfg.HttpAddr, nil) }()
}
Expand Down
24 changes: 16 additions & 8 deletions keychain-sdk/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@ type Config struct {
// ChainID is the chain ID of the chain to connect to.
ChainID string

// GRPCURL is the URL of the gRPC server to connect to.
// e.g. "localhost:9090"
GRPCURL string

// GRPCInsecure determines whether to allow an insecure connection to the
// gRPC server.
GRPCInsecure bool

// KeychainID is the ID of the keychain this instance will fetch requests
// for.
KeychainID uint64
Expand Down Expand Up @@ -57,4 +49,20 @@ type Config struct {
// If the transaction isn't included in a block, it will be considered as
// failed (but the blockchain might still include in a block later).
TxTimeout time.Duration

// ConsensusNodeThreshold represents the number of nodes required to execute a pending key/sign request.
ConsensusNodeThreshold uint8

// Nodes is the list of URLs of the gRPC server to connect to.
Nodes []GRPCNodeConfig
}

type GRPCNodeConfig struct {
// Insecure determines whether to allow an insecure connection to the
// gRPC server.
Insecure bool

// Host is the URL of the gRPC server to connect to.
// e.g. "localhost:9090"
Host string
}
12 changes: 6 additions & 6 deletions keychain-sdk/example_keychain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ func Main() {
Logger: logger, // not required, but recommended

// setup the connection to the Warden Protocol node
ChainID: "warden",
GRPCURL: "localhost:9090",
GRPCInsecure: true,
ChainID: "warden_1337-1",

// setup the account used to write txs
KeychainID: 1,
Mnemonic: "virus boat radio apple pilot ask vault exhaust again state doll stereo slide exhibit scissors miss attack boat budget egg bird mask more trick",

// setup throughput for batching responses
GasLimit: 400000,
BatchInterval: 8 * time.Second,
BatchSize: 10,
GasLimit: 400000,
BatchInterval: 8 * time.Second,
BatchSize: 10,
Nodes: []keychain.GRPCNodeConfig{{Host: "localhost:9090", Insecure: false}},
ConsensusNodeThreshold: 1,
})

app.SetKeyRequestHandler(func(w keychain.KeyResponseWriter, req *keychain.KeyRequest) {
Expand Down
59 changes: 48 additions & 11 deletions keychain-sdk/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,72 @@
package tracker

import (
"fmt"
"sync"
)

type Action int

const (
ActionSkip Action = iota
ActionProcess
)

type stringSet map[string]struct{}

// add safely adds a string to the set
func (s stringSet) add(str string) bool {
if _, exists := s[str]; exists {
return false
}
s[str] = struct{}{}
return true
}

type T struct {
rw sync.RWMutex
ingested map[uint64]struct{}
threshold uint8
rw sync.RWMutex
ingested map[uint64]stringSet
}

func New() *T {
func New(threshold uint8) *T {
return &T{
ingested: make(map[uint64]struct{}),
threshold: threshold,
ingested: make(map[uint64]stringSet),
}
}

func (t *T) IsNew(id uint64) bool {
t.rw.RLock()
defer t.rw.RUnlock()
_, ok := t.ingested[id]
return !ok
func (t *T) ingestTracker(id uint64) stringSet {
value, ok := t.ingested[id]
if !ok {
t.ingested[id] = make(stringSet)

return t.ingested[id]
}

return value
}

func (t *T) Ingested(id uint64) {
func (t *T) Ingest(id uint64, ingesterId string) (Action, error) {
t.rw.Lock()
defer t.rw.Unlock()
t.ingested[id] = struct{}{}

value := t.ingestTracker(id)

if !value.add(ingesterId) {
return ActionSkip, fmt.Errorf("already ingested")
}

if uint64(len(value)) == uint64(t.threshold) {
return ActionProcess, nil
}

return ActionSkip, nil
}

func (t *T) Done(id uint64) {
t.rw.Lock()
defer t.rw.Unlock()

delete(t.ingested, id)
}
26 changes: 14 additions & 12 deletions keychain-sdk/internal/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ type W struct {
// included in a block after being broadcasted.
TxTimeout time.Duration

// Client is the client used to send transactions to the chain.
Client *client.TxClient

Logger *slog.Logger

GasLimit uint64
Expand All @@ -35,23 +32,26 @@ type W struct {
batch Batch
}

type SyncTxClient interface {
SendWaitTx(ctx context.Context, txBytes []byte) (string, error)
BuildTx(ctx context.Context, gasLimit uint64, fees sdk.Coins, msgers ...client.Msger) ([]byte, error)
}

func New(
client *client.TxClient,
batchSize int,
batchInterval time.Duration,
txTimeout time.Duration,
logger *slog.Logger,
) *W {
return &W{
Client: client,
BatchInterval: batchInterval,
TxTimeout: txTimeout,
Logger: logger,
batch: Batch{messages: make(chan BatchItem, batchSize)},
}
}

func (w *W) Start(ctx context.Context, flushErrors chan error) error {
func (w *W) Start(ctx context.Context, client SyncTxClient, flushErrors chan error) error {
w.Logger.Info("starting tx writer")
for {
select {
Expand All @@ -62,9 +62,11 @@ func (w *W) Start(ctx context.Context, flushErrors chan error) error {
if w.TxTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, w.TxTimeout)
}
if err := w.Flush(ctx); err != nil {

if err := w.Flush(ctx, client); err != nil {
flushErrors <- err
}

cancel()
time.Sleep(w.BatchInterval)
}
Expand Down Expand Up @@ -97,7 +99,7 @@ func (w *W) fees() sdk.Coins {
return w.Fees
}

func (w *W) Flush(ctx context.Context) error {
func (w *W) Flush(ctx context.Context, txClient SyncTxClient) error {
msgs := w.batch.Clear()
if len(msgs) == 0 {
w.Logger.Debug("flushing batch", "empty", true)
Expand All @@ -115,7 +117,7 @@ func (w *W) Flush(ctx context.Context) error {
msgers[i] = item.Msger
}

if err := w.sendWaitTx(ctx, msgers...); err != nil {
if err := w.sendWaitTx(ctx, txClient, msgers...); err != nil {
for _, item := range msgs {
item.Done <- err
}
Expand All @@ -125,18 +127,18 @@ func (w *W) Flush(ctx context.Context) error {
return nil
}

func (w *W) sendWaitTx(ctx context.Context, msgs ...client.Msger) error {
func (w *W) sendWaitTx(ctx context.Context, txClient SyncTxClient, msgs ...client.Msger) error {
w.sendTxLock.Lock()
defer w.sendTxLock.Unlock()

w.Logger.Info("flushing batch", "count", len(msgs))

tx, err := w.Client.BuildTx(ctx, w.gasLimit(), w.fees(), msgs...)
tx, err := txClient.BuildTx(ctx, w.gasLimit(), w.fees(), msgs...)
if err != nil {
return err
}

hash, err := w.Client.SendWaitTx(ctx, tx)
hash, err := txClient.SendWaitTx(ctx, tx)
if err != nil {
return err
}
Expand Down
Loading
Loading