-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: servicer module implementation #19
Changes from all commits
cbd4004
23af6ee
5c8a91d
ce910b2
f6fd627
6d5ecf6
40e307b
603e9e9
e730668
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |
"github.com/cosmos/cosmos-sdk/client/flags" | ||
"github.com/cosmos/cosmos-sdk/client/keys" | ||
"github.com/cosmos/cosmos-sdk/client/rpc" | ||
"github.com/cosmos/cosmos-sdk/client/tx" | ||
"github.com/cosmos/cosmos-sdk/server" | ||
serverconfig "github.com/cosmos/cosmos-sdk/server/config" | ||
servertypes "github.com/cosmos/cosmos-sdk/server/types" | ||
|
@@ -41,6 +42,7 @@ import ( | |
|
||
"poktroll/app" | ||
appparams "poktroll/app/params" | ||
"poktroll/x/poktroll" | ||
) | ||
|
||
// NewRootCmd creates a new root command for a Cosmos SDK application | ||
|
@@ -72,15 +74,32 @@ func NewRootCmd() (*cobra.Command, appparams.EncodingConfig) { | |
return err | ||
} | ||
|
||
if err := client.SetCmdClientContextHandler(initClientCtx, cmd); err != nil { | ||
return err | ||
} | ||
|
||
customAppTemplate, customAppConfig := initAppConfig() | ||
customTMConfig := initTendermintConfig() | ||
return server.InterceptConfigsPreRunHandler( | ||
if err := server.InterceptConfigsPreRunHandler( | ||
cmd, customAppTemplate, customAppConfig, customTMConfig, | ||
) | ||
); err != nil { | ||
return err | ||
} | ||
|
||
factory, err := tx.NewFactoryCLI(initClientCtx, cmd.Flags()) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
serverCtx := server.GetServerContextFromCmd(cmd) | ||
// TODO_THIS_COMMIT: factor out keys to constants. | ||
serverCtx.Viper.Set("actorMode", "servicer") | ||
serverCtx.Viper.Set("clientCtx", initClientCtx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see what you're doing here and seems like a good first approach (assuming we clean up the code), but I do want to try to avoid using viper for it. An idea (I don' know if it works) is to potentially leverage func (app *App) RegisterAPIRoutes(apiSvr *api.Server, apiConfig config.APIConfig) {
clientCtx := apiSvr.ClientCtx
// Register new tx routes from grpc-gateway.
authtx.RegisterGRPCGatewayRoutes(clientCtx, apiSvr.GRPCGatewayRouter)
// Register new tendermint queries routes from grpc-gateway.
tmservice.RegisterGRPCGatewayRoutes(clientCtx, apiSvr.GRPCGatewayRouter)
// Register node gRPC service for grpc-gateway.
nodeservice.RegisterGRPCGatewayRoutes(clientCtx, apiSvr.GRPCGatewayRouter)
// Register grpc-gateway routes for all modules.
ModuleBasics.RegisterGRPCGatewayRoutes(clientCtx, apiSvr.GRPCGatewayRouter)
// register app's OpenAPI routes.
docs.RegisterOpenAPIService(Name, apiSvr.Router)
} What if we add a custom This isn't a very well formed idea yet, so I'm still thinking through things... |
||
serverCtx.Viper.Set("factory", factory) | ||
if err := server.SetCmdServerContext(cmd, serverCtx); err != nil { | ||
return err | ||
} | ||
|
||
if err := client.SetCmdClientContextHandler(initClientCtx, cmd); err != nil { | ||
return err | ||
} | ||
return nil | ||
}, | ||
} | ||
|
||
|
@@ -201,6 +220,8 @@ func txCommand() *cobra.Command { | |
} | ||
|
||
func addModuleInitFlags(startCmd *cobra.Command) { | ||
poktroll.AddModuleInitFlags(startCmd) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we add this to every module (portal, servicer, app) and have each one manage its own flag (independently of the other)? |
||
|
||
crisis.AddModuleInitFlags(startCmd) | ||
// this line is used by starport scaffolding # root/arguments | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
syntax = "proto3"; | ||
package poktroll.servicer; | ||
|
||
option go_package = "poktroll/x/servicer/types"; | ||
|
||
message Relay { | ||
RelayRequest req = 1; | ||
RelayResponse res = 2; | ||
} | ||
|
||
message RelayRequest { | ||
string method = 1; | ||
string url = 2; | ||
map<string, string> headers = 3; | ||
bytes payload = 4; | ||
} | ||
|
||
message RelayResponse { | ||
bytes payload = 1; | ||
int32 status_code = 2; | ||
map<string, string> headers = 3; | ||
bytes signature = 4; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
syntax = "proto3"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this and import from the session module |
||
package poktroll.servicer; | ||
|
||
option go_package = "poktroll/x/servicer/types"; | ||
|
||
message Session { | ||
string id = 1; // a universally unique ID for the session | ||
int64 session_number = 2; // a monotonically increasing number representing the # on the chain | ||
int64 session_height = 3; // the height at which the session starts | ||
bytes block_hash = 4; // the hash of the block at which the session starts | ||
int64 num_session_blocks = 5; // the number of blocks the session is valid from | ||
// CONSIDERATION: Should we add a `RelayChain` enum and use it across the board? | ||
// CONSIDERATION: Should a single session support multiple relay chains? | ||
// TECHDEBT: Do we need backwards with v0? https://docs.pokt.network/supported-blockchains/ | ||
string relay_chain = 6; // the relay chain the session is valid for | ||
// CONSIDERATION: Should a single session support multiple geo zones? | ||
string geo_zone = 7; // the target geographic region where the actors are present | ||
// core.Actor application = 7; // the application that is being served | ||
// IMPROVE: `map<string, core.Actor>` with the address as the key can simplify and optimize the logic on the clients | ||
// repeated core.Actor servicers = 8; // the set of servicers that are serving the application | ||
// repeated core.Actor fishermen = 9; // the set of fishermen that are fishing for servicers | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package utils | ||
|
||
import "sync" | ||
|
||
// Observable is a generic interface that allows multiple subscribers to read from a single channel | ||
type Observable[V any] interface { | ||
Subscribe() Subscription[V] | ||
} | ||
|
||
type ObservableImpl[V any] struct { | ||
mu sync.RWMutex | ||
ch <-chan V // private channel that is used to emit values to subscribers | ||
subscribers []chan V // subscribers is a list of channels that are subscribed to the observable | ||
closed bool | ||
} | ||
|
||
// Creates a new observable which emissions are controlled by the emitter channel | ||
func NewControlledObservable[V any](emitter chan V) (Observable[V], chan V) { | ||
// If the caller does not provide an emitter, create a new one and return it | ||
e := make(chan V) | ||
if emitter != nil { | ||
e = emitter | ||
} | ||
o := &ObservableImpl[V]{sync.RWMutex{}, e, []chan V{}, false} | ||
|
||
// Start listening to the emitter and emit values to subscribers | ||
go o.listen(emitter) | ||
|
||
return o, emitter | ||
} | ||
|
||
// Get a subscription to the observable | ||
func (o *ObservableImpl[V]) Subscribe() Subscription[V] { | ||
o.mu.Lock() | ||
defer o.mu.Unlock() | ||
|
||
// Create a channel for the subscriber and append it to the subscribers list | ||
ch := make(chan V, 1) | ||
o.subscribers = append(o.subscribers, ch) | ||
|
||
// Removal function used when unsubscribing from the observable | ||
removeFromObservable := func() { | ||
o.mu.Lock() | ||
defer o.mu.Unlock() | ||
|
||
for i, s := range o.subscribers { | ||
if ch == s { | ||
o.subscribers = append(o.subscribers[:i], o.subscribers[i+1:]...) | ||
break | ||
} | ||
} | ||
} | ||
|
||
// Subscription gets its closed state from the observable | ||
return &SubscriptionImpl[V]{ch, o.closed, removeFromObservable} | ||
} | ||
|
||
// Listen to the emitter and emit values to subscribers | ||
// This function is blocking and should be run in a goroutine | ||
func (o *ObservableImpl[V]) listen(emitter <-chan V) { | ||
for v := range emitter { | ||
// Lock for o.subscribers slice as it can be modified by subscribers | ||
o.mu.RLock() | ||
for _, ch := range o.subscribers { | ||
ch <- v | ||
} | ||
o.mu.RUnlock() | ||
} | ||
|
||
// Here we know that the emitter has been closed, all subscribers should be closed as well | ||
o.mu.Lock() | ||
o.closed = true | ||
for _, ch := range o.subscribers { | ||
close(ch) | ||
o.subscribers = []chan V{} | ||
} | ||
o.mu.Lock() | ||
} | ||
|
||
// Subscription is a generic interface that provide access to the underlying channel | ||
// and allows unsubscribing from an observable | ||
type Subscription[V any] interface { | ||
Unsubscribe() | ||
Ch() <-chan V | ||
} | ||
|
||
type SubscriptionImpl[V any] struct { | ||
ch chan V | ||
closed bool | ||
removeFromObservable func() | ||
} | ||
|
||
func (s *SubscriptionImpl[V]) Unsubscribe() { | ||
close(s.ch) | ||
s.closed = true | ||
s.removeFromObservable() | ||
} | ||
|
||
func (s *SubscriptionImpl[V]) Ch() <-chan V { | ||
return s.ch | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, I was thinking each service can act as multiple actors. I think it would also simplify the logic by initializing all of them (none of the if else statements) and it would just be a noop inside the actual module if the flagis off.