From 9c6b66663df5b7d2a9f92112e47426a193a346f3 Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Thu, 30 Mar 2023 03:44:49 +0700 Subject: [PATCH] x-cosmos-block-height (#12) --- api_server.go | 41 +++++++++++++++++++++-- cmd_start.go | 3 ++ config.go | 3 +- rpc_server.go | 20 +++++++----- state.go | 90 ++++++++++++++++++++++++++++++++++++++++++--------- state_test.go | 2 +- subnode.yaml | 2 ++ 7 files changed, 134 insertions(+), 27 deletions(-) diff --git a/api_server.go b/api_server.go index 185b3aa..e828e48 100644 --- a/api_server.go +++ b/api_server.go @@ -1,15 +1,52 @@ package main import ( + "fmt" "log" "net/http" + "strconv" ) func StartApiServer() { handler := func(w http.ResponseWriter, r *http.Request) { + prunedNode := SelectPrunedNodeApi() + selectedHost := prunedNode.Backend.Api // default to pruned node + if r.Method == "GET" { + fmt.Printf("r.RequestURI=%s\n", r.RequestURI) + + xCosmosBlockHeight := r.Header.Get("x-cosmos-block-height") + if xCosmosBlockHeight != "" { + height, err := strconv.ParseInt(xCosmosBlockHeight, 10, 64) + if err != nil { + SendError(w) + } + + node, err := SelectMatchedNodeApi(height) + if err != nil { + SendError(w) + } + + selectedHost = node.Backend.Api + } else { + selectedHost = prunedNode.Backend.Api + } + + r.Host = r.URL.Host + ProxyMapApi[selectedHost].ServeHTTP(w, r) + } else if r.Method == "POST" { + selectedHost = prunedNode.Backend.Api + r.Host = r.URL.Host + ProxyMapApi[selectedHost].ServeHTTP(w, r) + } else { + SendError(w) + } } // handle all requests to your server using the proxy - http.HandleFunc("/", handler) - log.Fatal(http.ListenAndServe(":1337", nil)) + //http.HandleFunc("/", handler) + serverMux := http.NewServeMux() + serverMux.HandleFunc("/", handler) + go func() { + log.Fatal(http.ListenAndServe(":1337", serverMux)) + }() } diff --git a/cmd_start.go b/cmd_start.go index caf015a..98e2985 100644 --- a/cmd_start.go +++ b/cmd_start.go @@ -13,6 +13,9 @@ func startCmd() *cobra.Command { Init() StartRpcServer() StartApiServer() + + select {} + return nil }, } diff --git a/config.go b/config.go index 7b04728..46f7697 100644 --- a/config.go +++ b/config.go @@ -6,7 +6,8 @@ import ( ) type Backend struct { - Rpc string + Rpc string // url to rpc, eg., https://rpc-osmosis-ia.cosmosia.notional.ventures:443 + Api string // url to api, eg., https://api-osmosis-ia.cosmosia.notional.ventures:443 // examples: // [1, 100] => from block 1 to block 100 (subnode) diff --git a/rpc_server.go b/rpc_server.go index 15599bb..5b7256c 100644 --- a/rpc_server.go +++ b/rpc_server.go @@ -13,7 +13,7 @@ import ( func StartRpcServer() { handler := func(w http.ResponseWriter, r *http.Request) { - prunedNode := SelectPrunedNode() + prunedNode := SelectPrunedNodeRpc() selectedHost := prunedNode.Backend.Rpc // default to pruned node if r.Method == "GET" { // URI over HTTP @@ -50,7 +50,7 @@ func StartRpcServer() { SendError(w) } - node, err := SelectMatchedNode(height) + node, err := SelectMatchedNodeRpc(height) if err != nil { SendError(w) } @@ -62,7 +62,7 @@ func StartRpcServer() { } r.Host = r.URL.Host - ProxyMap[selectedHost].ServeHTTP(w, r) + ProxyMapRpc[selectedHost].ServeHTTP(w, r) } else if r.Method == "POST" { // JSONRPC over HTTP body, err := io.ReadAll(r.Body) if err != nil { @@ -116,7 +116,7 @@ func StartRpcServer() { SendError(w) } - node, err := SelectMatchedNode(height) + node, err := SelectMatchedNodeRpc(height) if err != nil { SendError(w) } @@ -134,7 +134,7 @@ func StartRpcServer() { SendError(w) } - node, err := SelectMatchedNode(height) + node, err := SelectMatchedNodeRpc(height) if err != nil { SendError(w) } @@ -143,15 +143,19 @@ func StartRpcServer() { r.Body = io.NopCloser(bytes.NewBuffer(body)) // assign a new body with previous byte slice r.Host = r.URL.Host - ProxyMap[selectedHost].ServeHTTP(w, r) + ProxyMapRpc[selectedHost].ServeHTTP(w, r) } else { SendError(w) } } // handle all requests to your server using the proxy - http.HandleFunc("/", handler) - log.Fatal(http.ListenAndServe(":26657", nil)) + //http.HandleFunc("/", handler) + serverMux := http.NewServeMux() + serverMux.HandleFunc("/", handler) + go func() { + log.Fatal(http.ListenAndServe(":26657", serverMux)) + }() } func SendError(w http.ResponseWriter) { diff --git a/state.go b/state.go index 7a4ff30..4ccd9b6 100644 --- a/state.go +++ b/state.go @@ -21,44 +21,75 @@ type BackendState struct { } var ( - Pool []*BackendState - ProxyMap = make(map[string]*httputil.ReverseProxy) + PoolRpc []*BackendState + PoolApi []*BackendState + ProxyMapRpc = make(map[string]*httputil.ReverseProxy) + ProxyMapApi = make(map[string]*httputil.ReverseProxy) ) func Init() { InitPool() - for _, s := range Pool { + for _, s := range PoolRpc { target, err := url.Parse(s.Name) if err != nil { panic(err) } - ProxyMap[s.Name] = httputil.NewSingleHostReverseProxy(target) + ProxyMapRpc[s.Name] = httputil.NewSingleHostReverseProxy(target) + } + + for _, s := range PoolApi { + target, err := url.Parse(s.Name) + if err != nil { + panic(err) + } + ProxyMapApi[s.Name] = httputil.NewSingleHostReverseProxy(target) } } func InitPool() { - Pool = Pool[:0] // Remove all elements - cfg := GetConfig() + + PoolRpc = PoolRpc[:0] // Remove all elements + PoolApi = PoolApi[:0] // Remove all elements for _, s := range cfg.Upstream { be := s // fix Copying the address of a loop variable in Go - backendState := BackendState{ + backendStateRpc := BackendState{ Name: s.Rpc, NodeType: GetBackendNodeType(&be), LastBlock: 0, Backend: &be, } - fmt.Printf("debug: %+v\n", backendState) - Pool = append(Pool, &backendState) + fmt.Printf("debug: %+v\n", backendStateRpc) + PoolRpc = append(PoolRpc, &backendStateRpc) + + // + backendStateApi := BackendState{ + Name: s.Api, + NodeType: GetBackendNodeType(&be), + LastBlock: 0, + Backend: &be, + } + fmt.Printf("debug: %+v\n", backendStateApi) + PoolApi = append(PoolApi, &backendStateApi) } TaskUpdateState() } -func SelectPrunedNode() *BackendState { - for _, s := range Pool { +func SelectPrunedNodeRpc() *BackendState { + for _, s := range PoolRpc { + if s.NodeType == BackendNodeTypePruned { + return s + } + } + + return nil +} + +func SelectPrunedNodeApi() *BackendState { + for _, s := range PoolApi { if s.NodeType == BackendNodeTypePruned { return s } @@ -67,8 +98,28 @@ func SelectPrunedNode() *BackendState { return nil } -func SelectMatchedNode(height int64) (*BackendState, error) { - for _, s := range Pool { +func SelectMatchedNodeRpc(height int64) (*BackendState, error) { + for _, s := range PoolRpc { + fmt.Printf("debug: %+v\n", s) + if s.NodeType == BackendNodeTypePruned { + earliestHeight := s.LastBlock - s.Backend.Blocks[0] + if height >= earliestHeight { + return s, nil + } + } else if s.NodeType == BackendNodeTypeSubNode { + if (height >= s.Backend.Blocks[0]) && (height <= s.Backend.Blocks[0]) { + return s, nil + } + } else if s.NodeType == BackendNodeTypeArchive { + return s, nil + } + } + + return nil, errors.New("no node matched") +} + +func SelectMatchedNodeApi(height int64) (*BackendState, error) { + for _, s := range PoolApi { fmt.Printf("debug: %+v\n", s) if s.NodeType == BackendNodeTypePruned { earliestHeight := s.LastBlock - s.Backend.Blocks[0] @@ -96,10 +147,19 @@ func TaskUpdateState() { for { select { case <-ticker.C: - for _, s := range Pool { + for _, s := range PoolRpc { if s.NodeType == BackendNodeTypePruned { - fmt.Println(s.Name) + height, err := FetchHeightFromStatus(s.Backend.Rpc) + if err == nil { + s.LastBlock = height + } else { + fmt.Println("Err FetchHeightFromStatus", err) + } + } + } + for _, s := range PoolApi { + if s.NodeType == BackendNodeTypePruned { height, err := FetchHeightFromStatus(s.Backend.Rpc) if err == nil { s.LastBlock = height diff --git a/state_test.go b/state_test.go index 861f41b..02963cd 100644 --- a/state_test.go +++ b/state_test.go @@ -21,7 +21,7 @@ func TestSelectPrunedNode(t *testing.T) { InitPool() - assert.Equal(t, SelectPrunedNode().Name, pruned.Rpc) + assert.Equal(t, SelectPrunedNodeRpc().Name, pruned.Rpc) } func TestReadHeightFromStatusJson(t *testing.T) { diff --git a/subnode.yaml b/subnode.yaml index 431ad9b..e60e6c7 100644 --- a/subnode.yaml +++ b/subnode.yaml @@ -1,5 +1,7 @@ upstream: - rpc: "https://rpc-osmosis-ia.cosmosia.notional.ventures:443" + api: "https://api-osmosis-ia.cosmosia.notional.ventures:443" blocks: [362880] - rpc: "https://rpc-osmosis-archive-ia.cosmosia.notional.ventures:443" + api: "https://api-osmosis-archive-ia.cosmosia.notional.ventures:443" blocks: [] \ No newline at end of file