Skip to content

Commit

Permalink
x-cosmos-block-height (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
baabeetaa authored Mar 29, 2023
1 parent 0756378 commit 9c6b666
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 27 deletions.
41 changes: 39 additions & 2 deletions api_server.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,52 @@
package main

import (
"fmt"
"log"
"net/http"
"strconv"
)

func StartApiServer() {
handler := func(w http.ResponseWriter, r *http.Request) {
prunedNode := SelectPrunedNodeApi()
selectedHost := prunedNode.Backend.Api // default to pruned node

if r.Method == "GET" {
fmt.Printf("r.RequestURI=%s\n", r.RequestURI)

xCosmosBlockHeight := r.Header.Get("x-cosmos-block-height")
if xCosmosBlockHeight != "" {
height, err := strconv.ParseInt(xCosmosBlockHeight, 10, 64)
if err != nil {
SendError(w)
}

node, err := SelectMatchedNodeApi(height)
if err != nil {
SendError(w)
}

selectedHost = node.Backend.Api
} else {
selectedHost = prunedNode.Backend.Api
}

r.Host = r.URL.Host
ProxyMapApi[selectedHost].ServeHTTP(w, r)
} else if r.Method == "POST" {
selectedHost = prunedNode.Backend.Api
r.Host = r.URL.Host
ProxyMapApi[selectedHost].ServeHTTP(w, r)
} else {
SendError(w)
}
}
// handle all requests to your server using the proxy
http.HandleFunc("/", handler)
log.Fatal(http.ListenAndServe(":1337", nil))
//http.HandleFunc("/", handler)
serverMux := http.NewServeMux()
serverMux.HandleFunc("/", handler)
go func() {
log.Fatal(http.ListenAndServe(":1337", serverMux))
}()
}
3 changes: 3 additions & 0 deletions cmd_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ func startCmd() *cobra.Command {
Init()
StartRpcServer()
StartApiServer()

select {}

return nil
},
}
Expand Down
3 changes: 2 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
)

type Backend struct {
Rpc string
Rpc string // url to rpc, eg., https://rpc-osmosis-ia.cosmosia.notional.ventures:443
Api string // url to api, eg., https://api-osmosis-ia.cosmosia.notional.ventures:443

// examples:
// [1, 100] => from block 1 to block 100 (subnode)
Expand Down
20 changes: 12 additions & 8 deletions rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func StartRpcServer() {
handler := func(w http.ResponseWriter, r *http.Request) {
prunedNode := SelectPrunedNode()
prunedNode := SelectPrunedNodeRpc()
selectedHost := prunedNode.Backend.Rpc // default to pruned node

if r.Method == "GET" { // URI over HTTP
Expand Down Expand Up @@ -50,7 +50,7 @@ func StartRpcServer() {
SendError(w)
}

node, err := SelectMatchedNode(height)
node, err := SelectMatchedNodeRpc(height)
if err != nil {
SendError(w)
}
Expand All @@ -62,7 +62,7 @@ func StartRpcServer() {
}

r.Host = r.URL.Host
ProxyMap[selectedHost].ServeHTTP(w, r)
ProxyMapRpc[selectedHost].ServeHTTP(w, r)
} else if r.Method == "POST" { // JSONRPC over HTTP
body, err := io.ReadAll(r.Body)
if err != nil {
Expand Down Expand Up @@ -116,7 +116,7 @@ func StartRpcServer() {
SendError(w)
}

node, err := SelectMatchedNode(height)
node, err := SelectMatchedNodeRpc(height)
if err != nil {
SendError(w)
}
Expand All @@ -134,7 +134,7 @@ func StartRpcServer() {
SendError(w)
}

node, err := SelectMatchedNode(height)
node, err := SelectMatchedNodeRpc(height)
if err != nil {
SendError(w)
}
Expand All @@ -143,15 +143,19 @@ func StartRpcServer() {

r.Body = io.NopCloser(bytes.NewBuffer(body)) // assign a new body with previous byte slice
r.Host = r.URL.Host
ProxyMap[selectedHost].ServeHTTP(w, r)
ProxyMapRpc[selectedHost].ServeHTTP(w, r)
} else {
SendError(w)
}
}

// handle all requests to your server using the proxy
http.HandleFunc("/", handler)
log.Fatal(http.ListenAndServe(":26657", nil))
//http.HandleFunc("/", handler)
serverMux := http.NewServeMux()
serverMux.HandleFunc("/", handler)
go func() {
log.Fatal(http.ListenAndServe(":26657", serverMux))
}()
}

func SendError(w http.ResponseWriter) {
Expand Down
90 changes: 75 additions & 15 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,75 @@ type BackendState struct {
}

var (
Pool []*BackendState
ProxyMap = make(map[string]*httputil.ReverseProxy)
PoolRpc []*BackendState
PoolApi []*BackendState
ProxyMapRpc = make(map[string]*httputil.ReverseProxy)
ProxyMapApi = make(map[string]*httputil.ReverseProxy)
)

func Init() {
InitPool()

for _, s := range Pool {
for _, s := range PoolRpc {
target, err := url.Parse(s.Name)
if err != nil {
panic(err)
}
ProxyMap[s.Name] = httputil.NewSingleHostReverseProxy(target)
ProxyMapRpc[s.Name] = httputil.NewSingleHostReverseProxy(target)
}

for _, s := range PoolApi {
target, err := url.Parse(s.Name)
if err != nil {
panic(err)
}
ProxyMapApi[s.Name] = httputil.NewSingleHostReverseProxy(target)
}
}

func InitPool() {
Pool = Pool[:0] // Remove all elements

cfg := GetConfig()

PoolRpc = PoolRpc[:0] // Remove all elements
PoolApi = PoolApi[:0] // Remove all elements
for _, s := range cfg.Upstream {
be := s // fix Copying the address of a loop variable in Go

backendState := BackendState{
backendStateRpc := BackendState{
Name: s.Rpc,
NodeType: GetBackendNodeType(&be),
LastBlock: 0,
Backend: &be,
}
fmt.Printf("debug: %+v\n", backendState)
Pool = append(Pool, &backendState)
fmt.Printf("debug: %+v\n", backendStateRpc)
PoolRpc = append(PoolRpc, &backendStateRpc)

//
backendStateApi := BackendState{
Name: s.Api,
NodeType: GetBackendNodeType(&be),
LastBlock: 0,
Backend: &be,
}
fmt.Printf("debug: %+v\n", backendStateApi)
PoolApi = append(PoolApi, &backendStateApi)
}

TaskUpdateState()
}

func SelectPrunedNode() *BackendState {
for _, s := range Pool {
func SelectPrunedNodeRpc() *BackendState {
for _, s := range PoolRpc {
if s.NodeType == BackendNodeTypePruned {
return s
}
}

return nil
}

func SelectPrunedNodeApi() *BackendState {
for _, s := range PoolApi {
if s.NodeType == BackendNodeTypePruned {
return s
}
Expand All @@ -67,8 +98,28 @@ func SelectPrunedNode() *BackendState {
return nil
}

func SelectMatchedNode(height int64) (*BackendState, error) {
for _, s := range Pool {
func SelectMatchedNodeRpc(height int64) (*BackendState, error) {
for _, s := range PoolRpc {
fmt.Printf("debug: %+v\n", s)
if s.NodeType == BackendNodeTypePruned {
earliestHeight := s.LastBlock - s.Backend.Blocks[0]
if height >= earliestHeight {
return s, nil
}
} else if s.NodeType == BackendNodeTypeSubNode {
if (height >= s.Backend.Blocks[0]) && (height <= s.Backend.Blocks[0]) {
return s, nil
}
} else if s.NodeType == BackendNodeTypeArchive {
return s, nil
}
}

return nil, errors.New("no node matched")
}

func SelectMatchedNodeApi(height int64) (*BackendState, error) {
for _, s := range PoolApi {
fmt.Printf("debug: %+v\n", s)
if s.NodeType == BackendNodeTypePruned {
earliestHeight := s.LastBlock - s.Backend.Blocks[0]
Expand Down Expand Up @@ -96,10 +147,19 @@ func TaskUpdateState() {
for {
select {
case <-ticker.C:
for _, s := range Pool {
for _, s := range PoolRpc {
if s.NodeType == BackendNodeTypePruned {
fmt.Println(s.Name)
height, err := FetchHeightFromStatus(s.Backend.Rpc)
if err == nil {
s.LastBlock = height
} else {
fmt.Println("Err FetchHeightFromStatus", err)
}
}
}

for _, s := range PoolApi {
if s.NodeType == BackendNodeTypePruned {
height, err := FetchHeightFromStatus(s.Backend.Rpc)
if err == nil {
s.LastBlock = height
Expand Down
2 changes: 1 addition & 1 deletion state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestSelectPrunedNode(t *testing.T) {

InitPool()

assert.Equal(t, SelectPrunedNode().Name, pruned.Rpc)
assert.Equal(t, SelectPrunedNodeRpc().Name, pruned.Rpc)
}

func TestReadHeightFromStatusJson(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions subnode.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
upstream:
- rpc: "https://rpc-osmosis-ia.cosmosia.notional.ventures:443"
api: "https://api-osmosis-ia.cosmosia.notional.ventures:443"
blocks: [362880]
- rpc: "https://rpc-osmosis-archive-ia.cosmosia.notional.ventures:443"
api: "https://api-osmosis-archive-ia.cosmosia.notional.ventures:443"
blocks: []

0 comments on commit 9c6b666

Please sign in to comment.