diff --git a/app/app.go b/app/app.go index 9846ce93..340e4125 100644 --- a/app/app.go +++ b/app/app.go @@ -14,9 +14,10 @@ import ( "github.com/cometbft/cometbft/libs/log" tmos "github.com/cometbft/cometbft/libs/os" "github.com/cosmos/cosmos-sdk/baseapp" - "github.com/cosmos/cosmos-sdk/client" + cosmosClient "github.com/cosmos/cosmos-sdk/client" nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node" "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" + "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/runtime" @@ -113,6 +114,7 @@ import ( applicationmodule "poktroll/x/application" applicationmodulekeeper "poktroll/x/application/keeper" applicationmoduletypes "poktroll/x/application/types" + "poktroll/x/poktroll" poktrollmodule "poktroll/x/poktroll" poktrollmodulekeeper "poktroll/x/poktroll/keeper" poktrollmoduletypes "poktroll/x/poktroll/types" @@ -234,7 +236,7 @@ type App struct { cdc *codec.LegacyAmino appCodec codec.Codec interfaceRegistry types.InterfaceRegistry - txConfig client.TxConfig + txConfig cosmosClient.TxConfig invCheckPeriod uint @@ -556,7 +558,7 @@ func New( keys[poktrollmoduletypes.StoreKey], keys[poktrollmoduletypes.MemStoreKey], app.GetSubspace(poktrollmoduletypes.ModuleName), - ) + ) // TECHDEBT: Do something with the error poktrollModule := poktrollmodule.NewAppModule(appCodec, app.PoktrollKeeper, app.AccountKeeper, app.BankKeeper) app.PortalKeeper = *portalmodulekeeper.NewKeeper( @@ -583,7 +585,34 @@ func New( app.GetSubspace(servicermoduletypes.ModuleName), app.BankKeeper, ) - servicerModule := servicermodule.NewAppModule(appCodec, app.ServicerKeeper, app.AccountKeeper, app.BankKeeper) + + servicerEnabled := appOpts.Get(poktroll.FlagEnableServicerMode).(bool) + applicationEnabled := appOpts.Get(poktroll.FlagEnableApplicationMode).(bool) + portalEnabled := appOpts.Get(poktroll.FlagEnablePortalMode).(bool) + + factory := appOpts.Get("factory").(tx.Factory) + clientCtx := appOpts.Get("clientCtx").(cosmosClient.Context) + + var actorModule module.AppModule + if servicerEnabled && !applicationEnabled && !portalEnabled { + actorModule = servicermodule.NewAppModule( + appCodec, + app.ServicerKeeper, + app.AccountKeeper, + app.BankKeeper, + // TODO_THIS_COMMIT: refactor & de-dup key to a constant + clientCtx, + factory, + ) + } else if applicationEnabled && !servicerEnabled && !portalEnabled { + // actorModule = applicationmodule.NewAppModule(appCodec, app.ApplicationKeeper, app.AccountKeeper, app.BankKeeper) + } else if portalEnabled && !applicationEnabled && !servicerEnabled { + // actorModule = portalmodule.NewAppModule(appCodec, app.PortalKeeper, app.AccountKeeper, app.BankKeeper) + } else if !portalEnabled && !applicationEnabled && !servicerEnabled { + // No actor module + } else { + panic("only one of the following flags can be set: --servicer, --application, --portal") + } app.SessionKeeper = *sessionmodulekeeper.NewKeeper( appCodec, @@ -628,7 +657,7 @@ func New( // NOTE: Any module instantiated in the module manager that is later modified // must be passed by reference here. - app.mm = module.NewManager( + modules := []module.AppModule{ genutil.NewAppModule( app.AccountKeeper, app.StakingKeeper, @@ -657,13 +686,23 @@ func New( poktrollModule, portalModule, applicationModule, - servicerModule, sessionModule, // this line is used by starport scaffolding # stargate/app/appModule - crisis.NewAppModule(app.CrisisKeeper, skipGenesisInvariants, app.GetSubspace(crisistypes.ModuleName)), // always be last to make sure that it checks for all invariants and not only part of them + } + + if servicerEnabled || applicationEnabled || portalEnabled { + modules = append(modules, actorModule) + } + + // always be last to make sure that it checks for all invariants and not only part of them + modules = append( + modules, + crisis.NewAppModule(app.CrisisKeeper, skipGenesisInvariants, app.GetSubspace(crisistypes.ModuleName)), ) + app.mm = module.NewManager(modules...) + // During begin block slashing happens after distr.BeginBlocker so that // there is nothing left over in the validator fee pool, so as to keep the // CanWithdrawInvariant invariant. @@ -898,7 +937,7 @@ func (app *App) InterfaceRegistry() types.InterfaceRegistry { } // TxConfig returns SimApp's TxConfig -func (app *App) TxConfig() client.TxConfig { +func (app *App) TxConfig() cosmosClient.TxConfig { return app.txConfig } @@ -950,12 +989,12 @@ func (app *App) RegisterAPIRoutes(apiSvr *api.Server, apiConfig config.APIConfig } // RegisterTxService implements the Application.RegisterTxService method. -func (app *App) RegisterTxService(clientCtx client.Context) { +func (app *App) RegisterTxService(clientCtx cosmosClient.Context) { authtx.RegisterTxService(app.BaseApp.GRPCQueryRouter(), clientCtx, app.BaseApp.Simulate, app.interfaceRegistry) } // RegisterTendermintService implements the Application.RegisterTendermintService method. -func (app *App) RegisterTendermintService(clientCtx client.Context) { +func (app *App) RegisterTendermintService(clientCtx cosmosClient.Context) { tmservice.RegisterTendermintService( clientCtx, app.BaseApp.GRPCQueryRouter(), @@ -965,7 +1004,7 @@ func (app *App) RegisterTendermintService(clientCtx client.Context) { } // RegisterNodeService implements the Application.RegisterNodeService method. -func (app *App) RegisterNodeService(clientCtx client.Context) { +func (app *App) RegisterNodeService(clientCtx cosmosClient.Context) { nodeservice.RegisterNodeService(clientCtx, app.GRPCQueryRouter()) } diff --git a/cmd/poktrolld/cmd/root.go b/cmd/poktrolld/cmd/root.go index b29fcbde..dbdf6c87 100644 --- a/cmd/poktrolld/cmd/root.go +++ b/cmd/poktrolld/cmd/root.go @@ -19,6 +19,7 @@ import ( "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/client/keys" "github.com/cosmos/cosmos-sdk/client/rpc" + "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/server" serverconfig "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" @@ -41,6 +42,7 @@ import ( "poktroll/app" appparams "poktroll/app/params" + "poktroll/x/poktroll" ) // NewRootCmd creates a new root command for a Cosmos SDK application @@ -72,15 +74,32 @@ func NewRootCmd() (*cobra.Command, appparams.EncodingConfig) { return err } - if err := client.SetCmdClientContextHandler(initClientCtx, cmd); err != nil { - return err - } - customAppTemplate, customAppConfig := initAppConfig() customTMConfig := initTendermintConfig() - return server.InterceptConfigsPreRunHandler( + if err := server.InterceptConfigsPreRunHandler( cmd, customAppTemplate, customAppConfig, customTMConfig, - ) + ); err != nil { + return err + } + + factory, err := tx.NewFactoryCLI(initClientCtx, cmd.Flags()) + if err != nil { + return err + } + + serverCtx := server.GetServerContextFromCmd(cmd) + // TODO_THIS_COMMIT: factor out keys to constants. + serverCtx.Viper.Set("actorMode", "servicer") + serverCtx.Viper.Set("clientCtx", initClientCtx) + serverCtx.Viper.Set("factory", factory) + if err := server.SetCmdServerContext(cmd, serverCtx); err != nil { + return err + } + + if err := client.SetCmdClientContextHandler(initClientCtx, cmd); err != nil { + return err + } + return nil }, } @@ -201,6 +220,8 @@ func txCommand() *cobra.Command { } func addModuleInitFlags(startCmd *cobra.Command) { + poktroll.AddModuleInitFlags(startCmd) + crisis.AddModuleInitFlags(startCmd) // this line is used by starport scaffolding # root/arguments } diff --git a/go.mod b/go.mod index 1ed7831f..5430947e 100644 --- a/go.mod +++ b/go.mod @@ -275,3 +275,5 @@ replace github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.2021 replace github.com/cosmos/cosmos-sdk => github.com/rollkit/cosmos-sdk v0.47.3-rollkit-v0.10.2-no-fraud-proofs replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 + +replace github.com/pokt-network/smt => github.com/pokt-network/smt v0.6.2-0.20230907101623-9d2e4983c5f1 diff --git a/go.sum b/go.sum index 71aded60..cff4a42a 100644 --- a/go.sum +++ b/go.sum @@ -1576,8 +1576,8 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/pokt-network/smt v0.6.2-0.20230906162127-ef2341c474a0 h1:EFkH0qu/MrmDgz0i9FiHRFTGQAx6fkqlYRnlg7NgKpY= -github.com/pokt-network/smt v0.6.2-0.20230906162127-ef2341c474a0/go.mod h1:IhNcYL5XOHTfagy8GBKM23Xhd2uvhgbTtsGSMQtCxR8= +github.com/pokt-network/smt v0.6.2-0.20230907101623-9d2e4983c5f1 h1:ABPxZMZSGFqCbOXyEuGys1k4hLp0yORTrhPFr3676h4= +github.com/pokt-network/smt v0.6.2-0.20230907101623-9d2e4983c5f1/go.mod h1:IhNcYL5XOHTfagy8GBKM23Xhd2uvhgbTtsGSMQtCxR8= github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4= github.com/polydawn/refmt v0.89.0/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= github.com/polyfloyd/go-errorlint v1.0.0/go.mod h1:KZy4xxPJyy88/gldCe5OdW6OQRtNO3EZE7hXzmnebgA= diff --git a/localnet/kubernetes/poktrolld.yaml b/localnet/kubernetes/poktrolld.yaml index 12b4d5ab..e236013e 100644 --- a/localnet/kubernetes/poktrolld.yaml +++ b/localnet/kubernetes/poktrolld.yaml @@ -56,7 +56,7 @@ data: # start the node # You can attach to this process with delve (dlv) for debugging purpose with `dlv attach $(pgrep poktrolld) --listen :40004 --headless --api-version=2 --accept-multiclient` - run inside the container! - poktrolld start --rollkit.aggregator true --rollkit.da_layer celestia --rollkit.da_config='{"base_url":"http://celestia-rollkit:26658","timeout":60000000000,"fee":600000,"gas_limit":6000000,"auth_token":"'$AUTH_TOKEN'"}' --rollkit.namespace_id $NAMESPACE_ID --rollkit.da_start_height $DA_BLOCK_HEIGHT --rpc.laddr tcp://127.0.0.1:36657 --p2p.laddr "0.0.0.0:36656" + poktrolld start --servicer --rollkit.aggregator true --rollkit.da_layer celestia --rollkit.da_config='{"base_url":"http://celestia-rollkit:26658","timeout":60000000000,"fee":600000,"gas_limit":6000000,"auth_token":"'$AUTH_TOKEN'"}' --rollkit.namespace_id $NAMESPACE_ID --rollkit.da_start_height $DA_BLOCK_HEIGHT --rpc.laddr tcp://127.0.0.1:36657 --p2p.laddr "0.0.0.0:36656" # OR debug the node (uncomment this line but comment previous line) # dlv exec /usr/local/bin/poktrolld --listen :40004 --headless --api-version=2 --accept-multiclient -- poktrolld start --rollkit.aggregator true --rollkit.da_layer celestia --rollkit.da_config='{"base_url":"http://celestia-rollkit:26658","timeout":60000000000,"fee":600000,"gas_limit":6000000,"auth_token":"'$AUTH_TOKEN'"}' --rollkit.namespace_id $NAMESPACE_ID --rollkit.da_start_height $DA_BLOCK_HEIGHT --rpc.laddr tcp://127.0.0.1:36657 --p2p.laddr "0.0.0.0:36656" diff --git a/proto/poktroll/servicer/relay_request.proto b/proto/poktroll/servicer/relay_request.proto new file mode 100644 index 00000000..28929de3 --- /dev/null +++ b/proto/poktroll/servicer/relay_request.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; +package poktroll.servicer; + +option go_package = "poktroll/x/servicer/types"; + +message Relay { + RelayRequest req = 1; + RelayResponse res = 2; +} + +message RelayRequest { + string method = 1; + string url = 2; + map headers = 3; + bytes payload = 4; +} + +message RelayResponse { + bytes payload = 1; + int32 status_code = 2; + map headers = 3; + bytes signature = 4; +} \ No newline at end of file diff --git a/proto/poktroll/servicer/session.proto b/proto/poktroll/servicer/session.proto new file mode 100644 index 00000000..93c5c908 --- /dev/null +++ b/proto/poktroll/servicer/session.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; +package poktroll.servicer; + +option go_package = "poktroll/x/servicer/types"; + +message Session { + string id = 1; // a universally unique ID for the session + int64 session_number = 2; // a monotonically increasing number representing the # on the chain + int64 session_height = 3; // the height at which the session starts + bytes block_hash = 4; // the hash of the block at which the session starts + int64 num_session_blocks = 5; // the number of blocks the session is valid from + // CONSIDERATION: Should we add a `RelayChain` enum and use it across the board? + // CONSIDERATION: Should a single session support multiple relay chains? + // TECHDEBT: Do we need backwards with v0? https://docs.pokt.network/supported-blockchains/ + string relay_chain = 6; // the relay chain the session is valid for + // CONSIDERATION: Should a single session support multiple geo zones? + string geo_zone = 7; // the target geographic region where the actors are present + // core.Actor application = 7; // the application that is being served + // IMPROVE: `map` with the address as the key can simplify and optimize the logic on the clients + // repeated core.Actor servicers = 8; // the set of servicers that are serving the application + // repeated core.Actor fishermen = 9; // the set of fishermen that are fishing for servicers +} \ No newline at end of file diff --git a/utils/observable.go b/utils/observable.go new file mode 100644 index 00000000..faaef9b6 --- /dev/null +++ b/utils/observable.go @@ -0,0 +1,101 @@ +package utils + +import "sync" + +// Observable is a generic interface that allows multiple subscribers to read from a single channel +type Observable[V any] interface { + Subscribe() Subscription[V] +} + +type ObservableImpl[V any] struct { + mu sync.RWMutex + ch <-chan V // private channel that is used to emit values to subscribers + subscribers []chan V // subscribers is a list of channels that are subscribed to the observable + closed bool +} + +// Creates a new observable which emissions are controlled by the emitter channel +func NewControlledObservable[V any](emitter chan V) (Observable[V], chan V) { + // If the caller does not provide an emitter, create a new one and return it + e := make(chan V) + if emitter != nil { + e = emitter + } + o := &ObservableImpl[V]{sync.RWMutex{}, e, []chan V{}, false} + + // Start listening to the emitter and emit values to subscribers + go o.listen(emitter) + + return o, emitter +} + +// Get a subscription to the observable +func (o *ObservableImpl[V]) Subscribe() Subscription[V] { + o.mu.Lock() + defer o.mu.Unlock() + + // Create a channel for the subscriber and append it to the subscribers list + ch := make(chan V, 1) + o.subscribers = append(o.subscribers, ch) + + // Removal function used when unsubscribing from the observable + removeFromObservable := func() { + o.mu.Lock() + defer o.mu.Unlock() + + for i, s := range o.subscribers { + if ch == s { + o.subscribers = append(o.subscribers[:i], o.subscribers[i+1:]...) + break + } + } + } + + // Subscription gets its closed state from the observable + return &SubscriptionImpl[V]{ch, o.closed, removeFromObservable} +} + +// Listen to the emitter and emit values to subscribers +// This function is blocking and should be run in a goroutine +func (o *ObservableImpl[V]) listen(emitter <-chan V) { + for v := range emitter { + // Lock for o.subscribers slice as it can be modified by subscribers + o.mu.RLock() + for _, ch := range o.subscribers { + ch <- v + } + o.mu.RUnlock() + } + + // Here we know that the emitter has been closed, all subscribers should be closed as well + o.mu.Lock() + o.closed = true + for _, ch := range o.subscribers { + close(ch) + o.subscribers = []chan V{} + } + o.mu.Lock() +} + +// Subscription is a generic interface that provide access to the underlying channel +// and allows unsubscribing from an observable +type Subscription[V any] interface { + Unsubscribe() + Ch() <-chan V +} + +type SubscriptionImpl[V any] struct { + ch chan V + closed bool + removeFromObservable func() +} + +func (s *SubscriptionImpl[V]) Unsubscribe() { + close(s.ch) + s.closed = true + s.removeFromObservable() +} + +func (s *SubscriptionImpl[V]) Ch() <-chan V { + return s.ch +} diff --git a/x/poktroll/module.go b/x/poktroll/module.go index a9805b4e..55f01218 100644 --- a/x/poktroll/module.go +++ b/x/poktroll/module.go @@ -4,18 +4,19 @@ import ( "context" "encoding/json" "fmt" + // this line is used by starport scaffolding # 1 "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/spf13/cobra" abci "github.com/cometbft/cometbft/abci/types" - "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" cdctypes "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/module" + "poktroll/x/poktroll/client/cli" "poktroll/x/poktroll/keeper" "poktroll/x/poktroll/types" @@ -26,6 +27,13 @@ var ( _ module.AppModuleBasic = AppModuleBasic{} ) +// Module init related flags +const ( + FlagEnableServicerMode = "servicer" + FlagEnableApplicationMode = "application" + FlagEnablePortalMode = "portal" +) + // ---------------------------------------------------------------------------- // AppModuleBasic // ---------------------------------------------------------------------------- @@ -146,3 +154,10 @@ func (am AppModule) BeginBlock(_ sdk.Context, _ abci.RequestBeginBlock) {} func (am AppModule) EndBlock(_ sdk.Context, _ abci.RequestEndBlock) []abci.ValidatorUpdate { return []abci.ValidatorUpdate{} } + +// AddModuleInitFlags implements servertypes.ModuleInitFlags interface. +func AddModuleInitFlags(startCmd *cobra.Command) { + startCmd.Flags().Bool(FlagEnableServicerMode, false, "Start full node in servicer mode") + startCmd.Flags().Bool(FlagEnableApplicationMode, false, "Start full node in application mode") + startCmd.Flags().Bool(FlagEnablePortalMode, false, "Start full node in portal mode") +} diff --git a/x/servicer/components/miner/miner.go b/x/servicer/components/miner/miner.go new file mode 100644 index 00000000..58cfd3aa --- /dev/null +++ b/x/servicer/components/miner/miner.go @@ -0,0 +1,134 @@ +package miner + +import ( + "context" + "hash" + + cosmosClient "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/pokt-network/smt" + + "poktroll/utils" + "poktroll/x/servicer/types" +) + +type Miner struct { + smst smt.SMST + // TECHDEBT: update after switching to logger module (i.e. once + // servicer is external to poktrolld) + //logger log.Logger + relays utils.Observable[*types.Relay] + sessions utils.Observable[*types.Session] + cometClient cosmosClient.TendermintRPC + txConfig cosmosClient.TxConfig + factory tx.Factory + + hasher hash.Hash +} + +func NewMiner( + hasher hash.Hash, + store smt.KVStore, + cometClient cosmosClient.TendermintRPC, + txConfig cosmosClient.TxConfig, + factory tx.Factory, +) *Miner { //, client cometrpc.Client) *Miner { + m := &Miner{ + cometClient: cometClient, + txConfig: txConfig, + factory: factory, + } + m.smst = *smt.NewSparseMerkleSumTree(store, hasher) + m.hasher = hasher + + go m.handleSessionEnd() + go m.handleRelays() + + return m +} + +func (m *Miner) submitClaim() error { + txBuilder := m.txConfig.NewTxBuilder() + + // TODO_THIS_COMMIT: + // Construct & set claim msg + txBuilder.SetMsgs() + + // Sign tx!! + if err := tx.Sign( + m.factory, + "SOME KEY NAME!!!!", + txBuilder, + false, + ); err != nil { + return err + } + // alternatively: + // client.SignTx() + + builtTx := txBuilder.GetTx() + txBz, err := m.txConfig.TxEncoder()(builtTx) + if err != nil { + return err + } + + txBcastResult, err := m.cometClient.BroadcastTxCommit(context.TODO(), txBz) + if err != nil { + return err + } + + // TODO_THIS_COMMIT: do something with this (?) + _ = txBcastResult + return nil +} + +func (m *Miner) submitProof(hash []byte) error { + //key, valueHash, sum, proof, err := m.smst.ProveClosest(hash) + //if err != nil { + // return err + //} + + //result := <-m.client.SubmitProof(context.TODO(), key, valueHash, sum, proof, err) + //return result.Error() + return nil +} + +func (m *Miner) MineRelays(relays utils.Observable[*types.Relay], sessions utils.Observable[*types.Session]) { + m.relays = relays + m.sessions = sessions +} + +func (m *Miner) handleSessionEnd() { + ch := m.sessions.Subscribe().Ch() + for session := range ch { + if err := m.submitClaim(); err != nil { + continue + } + + // Wait for some time + m.submitProof([]byte(session.BlockHash)) + } +} + +func (m *Miner) handleRelays() { + ch := m.relays.Subscribe().Ch() + for relay := range ch { + //m.logger.Info("TODO handle relay 🔂 %+v", relay) + + // TODO get the serialized byte representation of the relay + relayBz, err := relay.Marshal() + if err != nil { + //m.logger.Error("failed to marshal relay: %s", err) + continue + } + + // Is it correct that we need to hash the key while smst.Update() could do it + // since smst has a reference to the hasher + hash := m.hasher.Sum([]byte(relayBz)) + m.update(hash, relayBz, 1) + } +} + +func (m *Miner) update(key []byte, value []byte, weight uint64) error { + return m.smst.Update(key, value, weight) +} diff --git a/x/servicer/components/relayer/relayer.go b/x/servicer/components/relayer/relayer.go new file mode 100644 index 00000000..10c4772c --- /dev/null +++ b/x/servicer/components/relayer/relayer.go @@ -0,0 +1,139 @@ +package relayer + +import ( + "bufio" + "bytes" + "io" + "log" + "net" + "net/http" + + "poktroll/utils" + "poktroll/x/servicer/types" +) + +type Relayer struct { + localAddr string + serviceAddr string + logger *log.Logger + output chan *types.Relay + outputObservable utils.Observable[*types.Relay] +} + +func NewRelayer(logger *log.Logger) *Relayer { + r := &Relayer{output: make(chan *types.Relay), logger: logger} + r.outputObservable, _ = utils.NewControlledObservable[*types.Relay](r.output) + + r.localAddr = "localhost:8545" + r.serviceAddr = "localhost:8546" + + go r.listen() + + return r +} + +func (r *Relayer) Relays() utils.Observable[*types.Relay] { + return r.outputObservable +} + +func (r *Relayer) listen() { + if err := http.ListenAndServe(r.localAddr, r); err != nil { + r.logger.Fatal(err) + } +} + +func (r *Relayer) ServeHTTP(wr http.ResponseWriter, req *http.Request) { + requestHeaders := make(map[string]string) + for k, v := range req.Header { + requestHeaders[k] = v[0] + } + + relayRequest := &types.RelayRequest{ + Method: req.Method, + Url: req.URL.String(), + Headers: requestHeaders, + } + + if req.Body != nil { + // Read the request body + requestBody, err := io.ReadAll(req.Body) + if err != nil { + r.replyWithError(500, err, wr) + return + } + relayRequest.Payload = requestBody + } + + // Change the request host to the service address + req.Host = r.serviceAddr + req.URL.Host = r.serviceAddr + req.Body = io.NopCloser(bytes.NewBuffer(relayRequest.Payload)) + + // Connect to the service + remoteConnection, err := net.Dial("tcp", r.serviceAddr) + if err != nil { + r.replyWithError(500, err, wr) + return + } + defer remoteConnection.Close() + + // Send the request to the service + err = req.Write(remoteConnection) + if err != nil { + r.replyWithError(500, err, wr) + return + } + + // Read the response from the service + response, err := http.ReadResponse(bufio.NewReader(remoteConnection), req) + if err != nil { + r.replyWithError(500, err, wr) + return + } + + var responseBody []byte + if response.Body != nil { + // Read the request body + responseBody, err = io.ReadAll(response.Body) + if err != nil { + r.replyWithError(500, err, wr) + return + } + } + + wr.WriteHeader(response.StatusCode) + + responseHeaders := make(map[string]string) + for k, v := range response.Header { + wr.Header().Add(k, v[0]) + } + + // Send the response to the client + _, err = wr.Write(responseBody) + if err != nil { + // TODO: handle error + return + } + + relay := &types.Relay{ + Req: relayRequest, + Res: &types.RelayResponse{ + StatusCode: int32(response.StatusCode), + Headers: responseHeaders, + Payload: responseBody, + }, + } + + relay.Res.Signature = r.signResponse(relay) + + r.output <- relay +} + +func (r *Relayer) signResponse(relay *types.Relay) []byte { + return nil +} + +func (r *Relayer) replyWithError(statusCode int, err error, wr http.ResponseWriter) { + wr.WriteHeader(statusCode) + wr.Write([]byte(err.Error())) +} diff --git a/x/servicer/components/session_manager/session_manager.go b/x/servicer/components/session_manager/session_manager.go new file mode 100644 index 00000000..7833edfe --- /dev/null +++ b/x/servicer/components/session_manager/session_manager.go @@ -0,0 +1,58 @@ +package sessionmanager + +import ( + "fmt" + + "poktroll/utils" + "poktroll/x/servicer/types" +) + +type SessionManager struct { + blocksPerSession int64 + session *types.Session + sessionTicker utils.Observable[*types.Session] + latestSecret []byte + + newSessions chan *types.Session + newBlocks chan *types.Block +} + +func NewSessionManager(newBlocks chan *types.Block) *SessionManager { + sm := &SessionManager{ + session: &types.Session{}, + newBlocks: newBlocks, + } + sm.sessionTicker, sm.newSessions = utils.NewControlledObservable[*types.Session](nil) + + // TODO_THIS_COMMIT: make blocksPerSession a module param. + sm.blocksPerSession = 2 + + go sm.handleBlocks() + + return sm +} + +func (sm *SessionManager) ClosedSessions() utils.Observable[*types.Session] { + return sm.sessionTicker +} + +func (sm *SessionManager) handleBlocks() { + // tick sessions along as new blocks are received + for block := range sm.newBlocks { + // discover a new session every `blocksPerSession` blocks + if block.Height%sm.blocksPerSession == 0 { + sm.session = &types.Session{ + SessionNumber: block.Height / sm.blocksPerSession, + SessionHeight: sm.session.SessionNumber * sm.blocksPerSession, + BlockHash: block.Hash, + } + + // set the latest secret for claim and proof use + sm.latestSecret = block.Hash + go func() { + fmt.Println("NEW SESSION") + sm.newSessions <- sm.session + }() + } + } +} diff --git a/x/servicer/module.go b/x/servicer/module.go index a0ea5642..8255778f 100644 --- a/x/servicer/module.go +++ b/x/servicer/module.go @@ -2,21 +2,29 @@ package servicer import ( "context" + "crypto/sha256" "encoding/json" "fmt" + "github.com/cosmos/cosmos-sdk/client/tx" + "log" + // this line is used by starport scaffolding # 1 "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/pokt-network/smt" "github.com/spf13/cobra" abci "github.com/cometbft/cometbft/abci/types" - "github.com/cosmos/cosmos-sdk/client" + cosmosClient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" cdctypes "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/module" "poktroll/x/servicer/client/cli" + "poktroll/x/servicer/components/miner" + "poktroll/x/servicer/components/relayer" + sessionmanager "poktroll/x/servicer/components/session_manager" "poktroll/x/servicer/keeper" "poktroll/x/servicer/types" ) @@ -60,7 +68,7 @@ func (AppModuleBasic) DefaultGenesis(cdc codec.JSONCodec) json.RawMessage { } // ValidateGenesis used to validate the GenesisState, given in its json.RawMessage form -func (AppModuleBasic) ValidateGenesis(cdc codec.JSONCodec, config client.TxEncodingConfig, bz json.RawMessage) error { +func (AppModuleBasic) ValidateGenesis(cdc codec.JSONCodec, config cosmosClient.TxEncodingConfig, bz json.RawMessage) error { var genState types.GenesisState if err := cdc.UnmarshalJSON(bz, &genState); err != nil { return fmt.Errorf("failed to unmarshal %s genesis state: %w", types.ModuleName, err) @@ -69,7 +77,7 @@ func (AppModuleBasic) ValidateGenesis(cdc codec.JSONCodec, config client.TxEncod } // RegisterGRPCGatewayRoutes registers the gRPC Gateway routes for the module -func (AppModuleBasic) RegisterGRPCGatewayRoutes(clientCtx client.Context, mux *runtime.ServeMux) { +func (AppModuleBasic) RegisterGRPCGatewayRoutes(clientCtx cosmosClient.Context, mux *runtime.ServeMux) { types.RegisterQueryHandlerClient(context.Background(), mux, types.NewQueryClient(clientCtx)) } @@ -94,6 +102,13 @@ type AppModule struct { keeper keeper.Keeper accountKeeper types.AccountKeeper bankKeeper types.BankKeeper + + // Servicer private components + relayer *relayer.Relayer + sessionManager *sessionmanager.SessionManager + miner *miner.Miner + + newBlocks chan *types.Block } func NewAppModule( @@ -101,12 +116,37 @@ func NewAppModule( keeper keeper.Keeper, accountKeeper types.AccountKeeper, bankKeeper types.BankKeeper, + // TODO_THIS_COMMIT: use an enum or something + clientCtx cosmosClient.Context, + factory tx.Factory, ) AppModule { + relayer := relayer.NewRelayer(log.Default()) + newBlocks := make(chan *types.Block) + sessionManager := sessionmanager.NewSessionManager(newBlocks) + + storePath := "/tmp/smt" + kvStore, err := smt.NewKVStore(storePath) + + if err != nil { + panic(fmt.Errorf("failed to create KVStore %q: %w", storePath, err)) + } + + cometClient := clientCtx.Client + txConfig := clientCtx.TxConfig + + miner := miner.NewMiner(sha256.New(), kvStore, cometClient, txConfig, factory) + miner.MineRelays(relayer.Relays(), sessionManager.ClosedSessions()) + return AppModule{ AppModuleBasic: NewAppModuleBasic(cdc), keeper: keeper, accountKeeper: accountKeeper, bankKeeper: bankKeeper, + + relayer: relayer, + sessionManager: sessionManager, + miner: miner, + newBlocks: newBlocks, } } @@ -143,6 +183,10 @@ func (AppModule) ConsensusVersion() uint64 { return 1 } func (am AppModule) BeginBlock(_ sdk.Context, _ abci.RequestBeginBlock) {} // EndBlock contains the logic that is automatically triggered at the end of each block -func (am AppModule) EndBlock(_ sdk.Context, _ abci.RequestEndBlock) []abci.ValidatorUpdate { +func (am AppModule) EndBlock(ctx sdk.Context, req abci.RequestEndBlock) []abci.ValidatorUpdate { + am.newBlocks <- &types.Block{ + Height: ctx.BlockHeight(), + Hash: ctx.HeaderHash(), + } return []abci.ValidatorUpdate{} } diff --git a/x/servicer/types/block.go b/x/servicer/types/block.go new file mode 100644 index 00000000..6ed1cee0 --- /dev/null +++ b/x/servicer/types/block.go @@ -0,0 +1,6 @@ +package types + +type Block struct { + Height int64 + Hash []byte +}