From 8cfba90a2cbc8458ddfb0bc72cf8554424122c1f Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 21 Mar 2024 13:17:01 -0600 Subject: [PATCH] GRPC ingester config, code gen and wiring (#147) --- Makefile | 3 + app/sidecar_query_server.go | 41 +- config-testnet.json | 7 +- config.json | 7 +- domain/config.go | 2 + domain/ingester.go | 15 + domain/mvc/ingest.go | 25 + ingest/README.md | 44 ++ ingest/delivery/grpc/ingest_grpc_handler.go | 82 +++ ingest/usecase/ingest_usecase.go | 49 ++ sqsdomain/proto/ingest.proto | 74 +++ sqsdomain/proto/types/ingest.pb.go | 566 ++++++++++++++++++++ sqsdomain/proto/types/ingest_grpc.pb.go | 228 ++++++++ 13 files changed, 1132 insertions(+), 11 deletions(-) create mode 100644 domain/ingester.go create mode 100644 domain/mvc/ingest.go create mode 100644 ingest/README.md create mode 100644 ingest/delivery/grpc/ingest_grpc_handler.go create mode 100644 ingest/usecase/ingest_usecase.go create mode 100644 sqsdomain/proto/ingest.proto create mode 100644 sqsdomain/proto/types/ingest.pb.go create mode 100644 sqsdomain/proto/types/ingest_grpc.pb.go diff --git a/Makefile b/Makefile index 47da6897..2a5dbe59 100644 --- a/Makefile +++ b/Makefile @@ -119,3 +119,6 @@ sqs-update-mainnet-state: # Bench tests pricing bench-pricing: go test -bench BenchmarkGetPrices -run BenchmarkGetPrices github.com/osmosis-labs/sqs/tokens/usecase -count=6 + +proto-gen: + protoc --go_out=./ --go-grpc_out=./ --proto_path=./sqsdomain/proto ./sqsdomain/proto/ingest.proto diff --git a/app/sidecar_query_server.go b/app/sidecar_query_server.go index 329f7c81..0d0a5aa2 100644 --- a/app/sidecar_query_server.go +++ b/app/sidecar_query_server.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net" "net/http" "time" @@ -11,6 +12,9 @@ import ( "github.com/redis/go-redis/v9" "go.uber.org/zap" + ingestrpcdelivry "github.com/osmosis-labs/sqs/ingest/delivery/grpc" + ingestusecase "github.com/osmosis-labs/sqs/ingest/usecase" + chaininfousecase "github.com/osmosis-labs/sqs/chaininfo/usecase" poolsHttpDelivery "github.com/osmosis-labs/sqs/pools/delivery/http" poolsUseCase "github.com/osmosis-labs/sqs/pools/usecase" @@ -39,7 +43,6 @@ import ( // and exposes endpoints for querying formatter and processed data from frontend. type SideCarQueryServer interface { GetTxManager() repository.TxManager - GetPoolsRepository() poolsredisrepo.PoolsRepository GetChainInfoRepository() chaininforedisrepo.ChainInfoRepository GetRouterRepository() routerredisrepo.RouterRepository GetTokensUseCase() mvc.TokensUsecase @@ -59,12 +62,6 @@ type sideCarQueryServer struct { logger log.Logger } -const ( - // This is a directory path where the overwrite routes are backed up in case of failure. - // On restart, the overwrite routes are restored from this directory. - overwriteRoutesPath = "overwrite_routes" -) - // GetTokensUseCase implements SideCarQueryServer. func (sqs *sideCarQueryServer) GetTokensUseCase() mvc.TokensUsecase { return sqs.tokensUseCase @@ -149,7 +146,7 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo // Initialize router repository, usecase routerRepository := routerredisrepo.New(redisTxManager, 0) - routerUsecase := routerUseCase.WithOverwriteRoutesPath(routerUseCase.NewRouterUsecase(timeoutContext, routerRepository, poolsUseCase, *config.Router, poolsUseCase.GetCosmWasmPoolConfig(), logger, cache.New(), cache.New()), overwriteRoutesPath) + routerUsecase := routerUseCase.NewRouterUsecase(timeoutContext, routerRepository, poolsUseCase, *config.Router, poolsUseCase.GetCosmWasmPoolConfig(), logger, cache.New(), cache.New()) // Initialize system handler chainInfoRepository := chaininforedisrepo.New(redisTxManager) @@ -172,6 +169,33 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo } routerHttpDelivery.NewRouterHandler(e, routerUsecase, tokensUseCase, logger) + // Start grpc ingest server if enabled + grpcIngesterConfig := config.GRPCIngester + if grpcIngesterConfig.Enabeld { + // Initialize ingest handler and usecase + ingestUseCase, err := ingestusecase.NewIngestUsecase(logger) + if err != nil { + return nil, err + } + + grpcIngestHandler, err := ingestrpcdelivry.NewIngestGRPCHandler(ingestUseCase, *grpcIngesterConfig) + if err != nil { + panic(err) + } + + go func() { + logger.Info("Starting grpc ingest server") + + lis, err := net.Listen("tcp", grpcIngesterConfig.ServerAddress) + if err != nil { + panic(err) + } + if err := grpcIngestHandler.Serve(lis); err != nil { + panic(err) + } + }() + } + go func() { logger.Info("Starting profiling server") err = http.ListenAndServe("localhost:6062", nil) @@ -182,7 +206,6 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo return &sideCarQueryServer{ txManager: redisTxManager, - poolsRepository: poolsRepository, chainInfoRepository: chainInfoRepository, routerRepository: routerRepository, tokensUseCase: tokensUseCase, diff --git a/config-testnet.json b/config-testnet.json index 4218edbf..9b2d558e 100644 --- a/config-testnet.json +++ b/config-testnet.json @@ -33,6 +33,11 @@ "max-routes": 5, "min-osmo-liquidity": 0 }, - "enable-overwrite-routes-cache": true + "grpc-ingester":{ + "enabled": false, + "max-receive-msg-size-bytes": 16777216, + "server-address": ":50051", + "server-connection-timeout-seconds": 10 + } } \ No newline at end of file diff --git a/config.json b/config.json index 977329a5..1ba69f50 100644 --- a/config.json +++ b/config.json @@ -33,6 +33,11 @@ "max-routes": 5, "min-osmo-liquidity": 50 }, - "enable-overwrite-routes-cache": true + "grpc-ingester":{ + "enabled": false, + "max-receive-msg-size-bytes": 16777216, + "server-address": ":50051", + "server-connection-timeout-seconds": 10 + } } \ No newline at end of file diff --git a/domain/config.go b/domain/config.go index c36870d8..282fbc8a 100644 --- a/domain/config.go +++ b/domain/config.go @@ -28,4 +28,6 @@ type Config struct { Pools *PoolsConfig `mapstructure:"pools"` Pricing *PricingConfig `mapstructure:"pricing"` + + GRPCIngester *GRPCIngesterConfig `mapstructure:"grpc-ingester"` } diff --git a/domain/ingester.go b/domain/ingester.go new file mode 100644 index 00000000..d1924412 --- /dev/null +++ b/domain/ingester.go @@ -0,0 +1,15 @@ +package domain + +type GRPCIngesterConfig struct { + // Flag to enable the GRPC ingester server + Enabeld bool `mapstructure:"enabled"` + + // The maximum number of bytes to receive in a single GRPC message + MaxReceiveMsgSizeBytes int `mapstructure:"max-receive-msg-size-bytes"` + + // The address of the GRPC ingester server + ServerAddress string `mapstructure:"server-address"` + + // The number of seconds to wait for a connection to the server. + ServerConnectionTimeoutSeconds int `mapstructure:"server-connection-timeout-seconds"` +} diff --git a/domain/mvc/ingest.go b/domain/mvc/ingest.go new file mode 100644 index 00000000..ede4b4f6 --- /dev/null +++ b/domain/mvc/ingest.go @@ -0,0 +1,25 @@ +package mvc + +import ( + "context" + + "github.com/osmosis-labs/sqs/sqsdomain" + prototypes "github.com/osmosis-labs/sqs/sqsdomain/proto/types" +) + +// IngestUsecase represent the ingest's usecases +type IngestUsecase interface { + // ProcessPoolChunk processes the pool data chunk, returning error if any. + // Caches the given pools in-memory until the end of the block processing. + ProcessPoolChunk(ctx context.Context, poolChunk []*prototypes.PoolData) error + + // StartBlockProcess signifies the start of the given block height processing + // It persists the given taker fee into the repository. + StartBlockProcess(ctx context.Context, height uint64, takerFeesMap sqsdomain.TakerFeeMap) (err error) + + // EndBlockProcessing ends the given block processing on success, storing the height + // internally. + // Persists the given height as well as any previously processed pools in-store. + // Resets the internal pools cache to be empty. + EndBlockProcess(ctx context.Context, height uint64) (err error) +} diff --git a/ingest/README.md b/ingest/README.md new file mode 100644 index 00000000..88106155 --- /dev/null +++ b/ingest/README.md @@ -0,0 +1,44 @@ +# Ingest + +This is a component that is responsible for ingesting and processing data. + +It exposes a GRPC API for Osmosis node clients to send data to be ingested. + +```mermaid +sequenceDiagram + participant NodeIngester + Note over SQS,NodeIngester: SQS Ingest RPC Communication + Note over NodeIngester: Acquires lock on block processing + + NodeIngester->>SQS: StartBlockProcess(height, taker_fees) + Note over SQS: SQS starts processing height + SQS-->>NodeIngester: StartBlockProcessReply + + NodeIngester->>SQS: ProcessChainPools(stream) + NodeIngester-->>SQS: ProcessChainPools.Send(pools_data_chunk) + Note over SQS: SQS transforms and loads pools into ingester cache + NodeIngester-->>SQS: ProcessChainPools.Send(pools_data_chunk) + Note over SQS: SQS transforms and loads pools into ingester cache + + NodeIngester->>SQS: ProcessChainPools.CloseSend() + + NodeIngester->>SQS: EndBlockProcess(EndBlockProcessRequest) + Note over SQS: SQS commits all state into SQS repositories, making them available for clients. + + SQS-->>NodeIngester: EndBlockProcessReply + + Note over NodeIngester: Releases lock on block processing +``` + +Note that, as of right now the protocol is syncronous where each GRPC call happens in sequence. However, from the +node perspective it is processed in a separate goroutine, letting the node continue producing blocks. The node +acquires a lock on the block processing so that the interaction is not affected by synching. + +This is not a concern since, when the node is caught up, the block time is approximately 4.5 seconds while entire +protocol is capped at 1.5 seconds. + +Currently, we push all pool data into SQS every processed block. As we lower block time, we will introduce a mechanism for +pushing the pool data only for the modified pools. This will allow us to drastically lower the protocol interaction from 1.5 seconds. + +Alternative methods will include making the protocol more asyncronous but this will require introducing more complex +locking mechanisms which are overkill today. diff --git a/ingest/delivery/grpc/ingest_grpc_handler.go b/ingest/delivery/grpc/ingest_grpc_handler.go new file mode 100644 index 00000000..a86ad22e --- /dev/null +++ b/ingest/delivery/grpc/ingest_grpc_handler.go @@ -0,0 +1,82 @@ +package grpc + +import ( + "context" + "io" + "time" + + "github.com/osmosis-labs/sqs/domain" + "github.com/osmosis-labs/sqs/domain/mvc" + "github.com/osmosis-labs/sqs/sqsdomain" + prototypes "github.com/osmosis-labs/sqs/sqsdomain/proto/types" + "google.golang.org/grpc" +) + +type IngestGRPCHandler struct { + ingestUseCase mvc.IngestUsecase + + prototypes.UnimplementedSQSIngesterServer +} + +type IngestProcessBlockArgs struct { + Pools []sqsdomain.PoolI +} + +var _ prototypes.SQSIngesterServer = &IngestGRPCHandler{} + +// NewIngestHandler will initialize the ingest/ resources endpoint +func NewIngestGRPCHandler(us mvc.IngestUsecase, grpcIngesterConfig domain.GRPCIngesterConfig) (*grpc.Server, error) { + ingestHandler := &IngestGRPCHandler{ + ingestUseCase: us, + } + + grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(grpcIngesterConfig.MaxReceiveMsgSizeBytes), grpc.ConnectionTimeout(time.Second*time.Duration(grpcIngesterConfig.ServerConnectionTimeoutSeconds))) + prototypes.RegisterSQSIngesterServer(grpcServer, ingestHandler) + + return grpcServer, nil +} + +// ProcessChainPools implements types.IngesterServer. +func (i *IngestGRPCHandler) ProcessChainPools(stream prototypes.SQSIngester_ProcessChainPoolsServer) (err error) { + var poolDataChunk prototypes.ChainPoolsDataChunk + err = stream.RecvMsg(&poolDataChunk) + for err == nil { + err = i.ingestUseCase.ProcessPoolChunk(stream.Context(), poolDataChunk.Pools) + if err != nil { + return err + } + + err = stream.RecvMsg(&poolDataChunk) + } + + if err != io.EOF { + return err + } + + return stream.SendAndClose(&prototypes.ProcessChainPoolsReply{}) +} + +// StartBlockProcess( implements types.IngesterServer. +func (i *IngestGRPCHandler) StartBlockProcess(ctx context.Context, req *prototypes.StartBlockProcessRequest) (resp *prototypes.StartBlockProcessReply, err error) { + takerFeeMap := sqsdomain.TakerFeeMap{} + + if err := takerFeeMap.UnmarshalJSON(req.TakerFeesMap); err != nil { + return nil, err + } + + // Start block processing with the taker fees. + if err := i.ingestUseCase.StartBlockProcess(ctx, req.BlockHeight, takerFeeMap); err != nil { + return nil, err + } + + return &prototypes.StartBlockProcessReply{}, nil +} + +// EndBlockProcessing implements types.IngesterServer. +func (i *IngestGRPCHandler) EndBlockProcess(ctx context.Context, req *prototypes.EndBlockProcessRequest) (resp *prototypes.EndBlockProcessReply, err error) { + if err := i.ingestUseCase.EndBlockProcess(ctx, req.BlockHeight); err != nil { + return nil, err + } + + return &prototypes.EndBlockProcessReply{}, nil +} diff --git a/ingest/usecase/ingest_usecase.go b/ingest/usecase/ingest_usecase.go new file mode 100644 index 00000000..5ea4125b --- /dev/null +++ b/ingest/usecase/ingest_usecase.go @@ -0,0 +1,49 @@ +package usecase + +import ( + "context" + "errors" + "time" + + "go.uber.org/zap" + + "github.com/osmosis-labs/sqs/domain/mvc" + "github.com/osmosis-labs/sqs/log" + "github.com/osmosis-labs/sqs/sqsdomain" + + "github.com/osmosis-labs/sqs/sqsdomain/proto/types" +) + +type ingestUseCase struct { + // used for tracking the time taken to process a block + startProcessingTime time.Time + logger log.Logger +} + +var _ mvc.IngestUsecase = &ingestUseCase{} + +// NewIngestUsecase will create a new ingester use case object +func NewIngestUsecase(logger log.Logger) (mvc.IngestUsecase, error) { + return &ingestUseCase{ + startProcessingTime: time.Unix(0, 0), + logger: logger, + }, nil +} + +// ProcessPoolChunk implements mvc.IngestUsecase. +func (p *ingestUseCase) ProcessPoolChunk(ctx context.Context, poolData []*types.PoolData) error { + return errors.New("not implemented") +} + +// StartBlockProcess implements mvc.IngestUsecase. +func (p *ingestUseCase) StartBlockProcess(ctx context.Context, height uint64, takerFeesMap sqsdomain.TakerFeeMap) (err error) { + p.startProcessingTime = time.Now() + p.logger.Info("starting block processing", zap.Uint64("height", height)) + return errors.New("not implemented") +} + +// EndBlockProcess implements mvc.IngestUsecase. +func (p *ingestUseCase) EndBlockProcess(ctx context.Context, height uint64) (err error) { + p.logger.Info("completed block processing", zap.Uint64("height", height), zap.Duration("duration", time.Since(p.startProcessingTime))) + return errors.New("not implemented") +} diff --git a/sqsdomain/proto/ingest.proto b/sqsdomain/proto/ingest.proto new file mode 100644 index 00000000..7dca0d94 --- /dev/null +++ b/sqsdomain/proto/ingest.proto @@ -0,0 +1,74 @@ +syntax = "proto3"; + +package sqs.ingest.v1beta1; +option go_package = "sqsdomain/proto/types"; + +// SQSIngester is a a data ingester from an Osmosis node to +// the sidecar query server. +service SQSIngester { + // StartBlockProcess starts block processing by sending block height and taker fee updates + // for that block. + rpc StartBlockProcess(StartBlockProcessRequest) returns (StartBlockProcessReply) {} + + // ProcessChainPools processes the Osmosis liquidity pools in a streaming fashion. + rpc ProcessChainPools(stream ChainPoolsDataChunk) returns (ProcessChainPoolsReply) {} + + // EndBlockProcess is called when the block processing is finished. + // It sorts the pools for router for use intra-block. + // It commits all processed state into internal SQS repositories, including: + // - pools for display (pools repository) + // - sorted pools for use in the router (router repository) + // - taker fees (router repository) + // - block height (router chain info repository) + rpc EndBlockProcess(EndBlockProcessRequest) returns (EndBlockProcessReply) {} +} + +// PoolData represents a structure encapsulating an Osmosis liquidity pool. +message PoolData { + // ChainModel is the chain representation model of the pool. + bytes chain_model = 1; + + // SqsModel is additional pool data used by the sidecar query server. + bytes sqs_model = 2; + + // TickModel is the tick data of a concentrated liquidity pool. + // This field is only valid and set for concentrated pools. It is nil otherwise. + bytes tick_model = 3; +} + +// ProcessChainPools +//////////////////////////////////////////////////////////////////// + +// The pools data chunks streamed by the client. +message ChainPoolsDataChunk { + repeated PoolData pools = 1; +} + +// The response after completing pools client-streaming. +message ProcessChainPoolsReply{} + +// StartBlockProcess +//////////////////////////////////////////////////////////////////// + +// The start block process request. +// Sends taker fees and block height. +message StartBlockProcessRequest { + // block height is the height of the block being processed. + uint64 block_height = 1; + // taker_fees_map is the map of taker fees for the block. + bytes taker_fees_map = 2; +} + +// The response after completing start block processing. +message StartBlockProcessReply{} + +// EndBlockProcess +//////////////////////////////////////////////////////////////////// + +// The request to end the block processing. +message EndBlockProcessRequest { + uint64 block_height = 1; +} + +// The response to end the block processing. +message EndBlockProcessReply{} diff --git a/sqsdomain/proto/types/ingest.pb.go b/sqsdomain/proto/types/ingest.pb.go new file mode 100644 index 00000000..3f4ee6db --- /dev/null +++ b/sqsdomain/proto/types/ingest.pb.go @@ -0,0 +1,566 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0-devel +// protoc v3.14.0 +// source: ingest.proto + +package types + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// PoolData represents a structure encapsulating an Osmosis liquidity pool. +type PoolData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // ChainModel is the chain representation model of the pool. + ChainModel []byte `protobuf:"bytes,1,opt,name=chain_model,json=chainModel,proto3" json:"chain_model,omitempty"` + // SqsModel is additional pool data used by the sidecar query server. + SqsModel []byte `protobuf:"bytes,2,opt,name=sqs_model,json=sqsModel,proto3" json:"sqs_model,omitempty"` + // TickModel is the tick data of a concentrated liquidity pool. + // This field is only valid and set for concentrated pools. It is nil otherwise. + TickModel []byte `protobuf:"bytes,3,opt,name=tick_model,json=tickModel,proto3" json:"tick_model,omitempty"` +} + +func (x *PoolData) Reset() { + *x = PoolData{} + if protoimpl.UnsafeEnabled { + mi := &file_ingest_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PoolData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PoolData) ProtoMessage() {} + +func (x *PoolData) ProtoReflect() protoreflect.Message { + mi := &file_ingest_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PoolData.ProtoReflect.Descriptor instead. +func (*PoolData) Descriptor() ([]byte, []int) { + return file_ingest_proto_rawDescGZIP(), []int{0} +} + +func (x *PoolData) GetChainModel() []byte { + if x != nil { + return x.ChainModel + } + return nil +} + +func (x *PoolData) GetSqsModel() []byte { + if x != nil { + return x.SqsModel + } + return nil +} + +func (x *PoolData) GetTickModel() []byte { + if x != nil { + return x.TickModel + } + return nil +} + +// The pools data chunks streamed by the client. +type ChainPoolsDataChunk struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Pools []*PoolData `protobuf:"bytes,1,rep,name=pools,proto3" json:"pools,omitempty"` +} + +func (x *ChainPoolsDataChunk) Reset() { + *x = ChainPoolsDataChunk{} + if protoimpl.UnsafeEnabled { + mi := &file_ingest_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ChainPoolsDataChunk) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChainPoolsDataChunk) ProtoMessage() {} + +func (x *ChainPoolsDataChunk) ProtoReflect() protoreflect.Message { + mi := &file_ingest_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChainPoolsDataChunk.ProtoReflect.Descriptor instead. +func (*ChainPoolsDataChunk) Descriptor() ([]byte, []int) { + return file_ingest_proto_rawDescGZIP(), []int{1} +} + +func (x *ChainPoolsDataChunk) GetPools() []*PoolData { + if x != nil { + return x.Pools + } + return nil +} + +// The response after completing pools client-streaming. +type ProcessChainPoolsReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ProcessChainPoolsReply) Reset() { + *x = ProcessChainPoolsReply{} + if protoimpl.UnsafeEnabled { + mi := &file_ingest_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ProcessChainPoolsReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProcessChainPoolsReply) ProtoMessage() {} + +func (x *ProcessChainPoolsReply) ProtoReflect() protoreflect.Message { + mi := &file_ingest_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProcessChainPoolsReply.ProtoReflect.Descriptor instead. +func (*ProcessChainPoolsReply) Descriptor() ([]byte, []int) { + return file_ingest_proto_rawDescGZIP(), []int{2} +} + +// The start block process request. +// Sends taker fees and block height. +type StartBlockProcessRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // block height is the height of the block being processed. + BlockHeight uint64 `protobuf:"varint,1,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` + // taker_fees_map is the map of taker fees for the block. + TakerFeesMap []byte `protobuf:"bytes,2,opt,name=taker_fees_map,json=takerFeesMap,proto3" json:"taker_fees_map,omitempty"` +} + +func (x *StartBlockProcessRequest) Reset() { + *x = StartBlockProcessRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_ingest_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StartBlockProcessRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StartBlockProcessRequest) ProtoMessage() {} + +func (x *StartBlockProcessRequest) ProtoReflect() protoreflect.Message { + mi := &file_ingest_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StartBlockProcessRequest.ProtoReflect.Descriptor instead. +func (*StartBlockProcessRequest) Descriptor() ([]byte, []int) { + return file_ingest_proto_rawDescGZIP(), []int{3} +} + +func (x *StartBlockProcessRequest) GetBlockHeight() uint64 { + if x != nil { + return x.BlockHeight + } + return 0 +} + +func (x *StartBlockProcessRequest) GetTakerFeesMap() []byte { + if x != nil { + return x.TakerFeesMap + } + return nil +} + +// The response after completing start block processing. +type StartBlockProcessReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *StartBlockProcessReply) Reset() { + *x = StartBlockProcessReply{} + if protoimpl.UnsafeEnabled { + mi := &file_ingest_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StartBlockProcessReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StartBlockProcessReply) ProtoMessage() {} + +func (x *StartBlockProcessReply) ProtoReflect() protoreflect.Message { + mi := &file_ingest_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StartBlockProcessReply.ProtoReflect.Descriptor instead. +func (*StartBlockProcessReply) Descriptor() ([]byte, []int) { + return file_ingest_proto_rawDescGZIP(), []int{4} +} + +// The request to end the block processing. +type EndBlockProcessRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + BlockHeight uint64 `protobuf:"varint,1,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` +} + +func (x *EndBlockProcessRequest) Reset() { + *x = EndBlockProcessRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_ingest_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EndBlockProcessRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EndBlockProcessRequest) ProtoMessage() {} + +func (x *EndBlockProcessRequest) ProtoReflect() protoreflect.Message { + mi := &file_ingest_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EndBlockProcessRequest.ProtoReflect.Descriptor instead. +func (*EndBlockProcessRequest) Descriptor() ([]byte, []int) { + return file_ingest_proto_rawDescGZIP(), []int{5} +} + +func (x *EndBlockProcessRequest) GetBlockHeight() uint64 { + if x != nil { + return x.BlockHeight + } + return 0 +} + +// The response to end the block processing. +type EndBlockProcessReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *EndBlockProcessReply) Reset() { + *x = EndBlockProcessReply{} + if protoimpl.UnsafeEnabled { + mi := &file_ingest_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EndBlockProcessReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EndBlockProcessReply) ProtoMessage() {} + +func (x *EndBlockProcessReply) ProtoReflect() protoreflect.Message { + mi := &file_ingest_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EndBlockProcessReply.ProtoReflect.Descriptor instead. +func (*EndBlockProcessReply) Descriptor() ([]byte, []int) { + return file_ingest_proto_rawDescGZIP(), []int{6} +} + +var File_ingest_proto protoreflect.FileDescriptor + +var file_ingest_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x12, + 0x73, 0x71, 0x73, 0x2e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, + 0x61, 0x31, 0x22, 0x67, 0x0a, 0x08, 0x50, 0x6f, 0x6f, 0x6c, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1f, + 0x0a, 0x0b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x6c, 0x12, + 0x1b, 0x0a, 0x09, 0x73, 0x71, 0x73, 0x5f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x08, 0x73, 0x71, 0x73, 0x4d, 0x6f, 0x64, 0x65, 0x6c, 0x12, 0x1d, 0x0a, 0x0a, + 0x74, 0x69, 0x63, 0x6b, 0x5f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x09, 0x74, 0x69, 0x63, 0x6b, 0x4d, 0x6f, 0x64, 0x65, 0x6c, 0x22, 0x49, 0x0a, 0x13, 0x43, + 0x68, 0x61, 0x69, 0x6e, 0x50, 0x6f, 0x6f, 0x6c, 0x73, 0x44, 0x61, 0x74, 0x61, 0x43, 0x68, 0x75, + 0x6e, 0x6b, 0x12, 0x32, 0x0a, 0x05, 0x70, 0x6f, 0x6f, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x71, 0x73, 0x2e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x50, 0x6f, 0x6f, 0x6c, 0x44, 0x61, 0x74, 0x61, 0x52, + 0x05, 0x70, 0x6f, 0x6f, 0x6c, 0x73, 0x22, 0x18, 0x0a, 0x16, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, + 0x73, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x50, 0x6f, 0x6f, 0x6c, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, + 0x22, 0x63, 0x0a, 0x18, 0x53, 0x74, 0x61, 0x72, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x50, 0x72, + 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x0c, + 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x0b, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x69, 0x67, 0x68, 0x74, 0x12, + 0x24, 0x0a, 0x0e, 0x74, 0x61, 0x6b, 0x65, 0x72, 0x5f, 0x66, 0x65, 0x65, 0x73, 0x5f, 0x6d, 0x61, + 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x74, 0x61, 0x6b, 0x65, 0x72, 0x46, 0x65, + 0x65, 0x73, 0x4d, 0x61, 0x70, 0x22, 0x18, 0x0a, 0x16, 0x53, 0x74, 0x61, 0x72, 0x74, 0x42, 0x6c, + 0x6f, 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, + 0x3b, 0x0a, 0x16, 0x45, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x63, 0x65, + 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x62, 0x6c, 0x6f, + 0x63, 0x6b, 0x5f, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x0b, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x65, 0x69, 0x67, 0x68, 0x74, 0x22, 0x16, 0x0a, 0x14, + 0x45, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x32, 0xd7, 0x02, 0x0a, 0x0b, 0x53, 0x51, 0x53, 0x49, 0x6e, 0x67, 0x65, + 0x73, 0x74, 0x65, 0x72, 0x12, 0x6f, 0x0a, 0x11, 0x53, 0x74, 0x61, 0x72, 0x74, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x12, 0x2c, 0x2e, 0x73, 0x71, 0x73, 0x2e, + 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x53, + 0x74, 0x61, 0x72, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x73, 0x71, 0x73, 0x2e, 0x69, 0x6e, + 0x67, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x53, 0x74, 0x61, + 0x72, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, + 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x6c, 0x0a, 0x11, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, + 0x43, 0x68, 0x61, 0x69, 0x6e, 0x50, 0x6f, 0x6f, 0x6c, 0x73, 0x12, 0x27, 0x2e, 0x73, 0x71, 0x73, + 0x2e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, + 0x43, 0x68, 0x61, 0x69, 0x6e, 0x50, 0x6f, 0x6f, 0x6c, 0x73, 0x44, 0x61, 0x74, 0x61, 0x43, 0x68, + 0x75, 0x6e, 0x6b, 0x1a, 0x2a, 0x2e, 0x73, 0x71, 0x73, 0x2e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, + 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, + 0x43, 0x68, 0x61, 0x69, 0x6e, 0x50, 0x6f, 0x6f, 0x6c, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, + 0x00, 0x28, 0x01, 0x12, 0x69, 0x0a, 0x0f, 0x45, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x50, + 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x12, 0x2a, 0x2e, 0x73, 0x71, 0x73, 0x2e, 0x69, 0x6e, 0x67, + 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x45, 0x6e, 0x64, 0x42, + 0x6c, 0x6f, 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x73, 0x71, 0x73, 0x2e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x2e, + 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x45, 0x6e, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, + 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x17, + 0x5a, 0x15, 0x73, 0x71, 0x73, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_ingest_proto_rawDescOnce sync.Once + file_ingest_proto_rawDescData = file_ingest_proto_rawDesc +) + +func file_ingest_proto_rawDescGZIP() []byte { + file_ingest_proto_rawDescOnce.Do(func() { + file_ingest_proto_rawDescData = protoimpl.X.CompressGZIP(file_ingest_proto_rawDescData) + }) + return file_ingest_proto_rawDescData +} + +var file_ingest_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_ingest_proto_goTypes = []interface{}{ + (*PoolData)(nil), // 0: sqs.ingest.v1beta1.PoolData + (*ChainPoolsDataChunk)(nil), // 1: sqs.ingest.v1beta1.ChainPoolsDataChunk + (*ProcessChainPoolsReply)(nil), // 2: sqs.ingest.v1beta1.ProcessChainPoolsReply + (*StartBlockProcessRequest)(nil), // 3: sqs.ingest.v1beta1.StartBlockProcessRequest + (*StartBlockProcessReply)(nil), // 4: sqs.ingest.v1beta1.StartBlockProcessReply + (*EndBlockProcessRequest)(nil), // 5: sqs.ingest.v1beta1.EndBlockProcessRequest + (*EndBlockProcessReply)(nil), // 6: sqs.ingest.v1beta1.EndBlockProcessReply +} +var file_ingest_proto_depIdxs = []int32{ + 0, // 0: sqs.ingest.v1beta1.ChainPoolsDataChunk.pools:type_name -> sqs.ingest.v1beta1.PoolData + 3, // 1: sqs.ingest.v1beta1.SQSIngester.StartBlockProcess:input_type -> sqs.ingest.v1beta1.StartBlockProcessRequest + 1, // 2: sqs.ingest.v1beta1.SQSIngester.ProcessChainPools:input_type -> sqs.ingest.v1beta1.ChainPoolsDataChunk + 5, // 3: sqs.ingest.v1beta1.SQSIngester.EndBlockProcess:input_type -> sqs.ingest.v1beta1.EndBlockProcessRequest + 4, // 4: sqs.ingest.v1beta1.SQSIngester.StartBlockProcess:output_type -> sqs.ingest.v1beta1.StartBlockProcessReply + 2, // 5: sqs.ingest.v1beta1.SQSIngester.ProcessChainPools:output_type -> sqs.ingest.v1beta1.ProcessChainPoolsReply + 6, // 6: sqs.ingest.v1beta1.SQSIngester.EndBlockProcess:output_type -> sqs.ingest.v1beta1.EndBlockProcessReply + 4, // [4:7] is the sub-list for method output_type + 1, // [1:4] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_ingest_proto_init() } +func file_ingest_proto_init() { + if File_ingest_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_ingest_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PoolData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ingest_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ChainPoolsDataChunk); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ingest_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ProcessChainPoolsReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ingest_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StartBlockProcessRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ingest_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StartBlockProcessReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ingest_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EndBlockProcessRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ingest_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EndBlockProcessReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_ingest_proto_rawDesc, + NumEnums: 0, + NumMessages: 7, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_ingest_proto_goTypes, + DependencyIndexes: file_ingest_proto_depIdxs, + MessageInfos: file_ingest_proto_msgTypes, + }.Build() + File_ingest_proto = out.File + file_ingest_proto_rawDesc = nil + file_ingest_proto_goTypes = nil + file_ingest_proto_depIdxs = nil +} diff --git a/sqsdomain/proto/types/ingest_grpc.pb.go b/sqsdomain/proto/types/ingest_grpc.pb.go new file mode 100644 index 00000000..ddfd3f13 --- /dev/null +++ b/sqsdomain/proto/types/ingest_grpc.pb.go @@ -0,0 +1,228 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package types + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// SQSIngesterClient is the client API for SQSIngester service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SQSIngesterClient interface { + // StartBlockProcess starts block processing by sending block height and taker fee updates + // for that block. + StartBlockProcess(ctx context.Context, in *StartBlockProcessRequest, opts ...grpc.CallOption) (*StartBlockProcessReply, error) + // ProcessChainPools processes the Osmosis liquidity pools in a streaming fashion. + ProcessChainPools(ctx context.Context, opts ...grpc.CallOption) (SQSIngester_ProcessChainPoolsClient, error) + // EndBlockProcess is called when the block processing is finished. + // It sorts the pools for router for use intra-block. + // It commits all processed state into internal SQS repositories, including: + // - pools for display (pools repository) + // - sorted pools for use in the router (router repository) + // - taker fees (router repository) + // - block height (router chain info repository) + EndBlockProcess(ctx context.Context, in *EndBlockProcessRequest, opts ...grpc.CallOption) (*EndBlockProcessReply, error) +} + +type sQSIngesterClient struct { + cc grpc.ClientConnInterface +} + +func NewSQSIngesterClient(cc grpc.ClientConnInterface) SQSIngesterClient { + return &sQSIngesterClient{cc} +} + +func (c *sQSIngesterClient) StartBlockProcess(ctx context.Context, in *StartBlockProcessRequest, opts ...grpc.CallOption) (*StartBlockProcessReply, error) { + out := new(StartBlockProcessReply) + err := c.cc.Invoke(ctx, "/sqs.ingest.v1beta1.SQSIngester/StartBlockProcess", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *sQSIngesterClient) ProcessChainPools(ctx context.Context, opts ...grpc.CallOption) (SQSIngester_ProcessChainPoolsClient, error) { + stream, err := c.cc.NewStream(ctx, &SQSIngester_ServiceDesc.Streams[0], "/sqs.ingest.v1beta1.SQSIngester/ProcessChainPools", opts...) + if err != nil { + return nil, err + } + x := &sQSIngesterProcessChainPoolsClient{stream} + return x, nil +} + +type SQSIngester_ProcessChainPoolsClient interface { + Send(*ChainPoolsDataChunk) error + CloseAndRecv() (*ProcessChainPoolsReply, error) + grpc.ClientStream +} + +type sQSIngesterProcessChainPoolsClient struct { + grpc.ClientStream +} + +func (x *sQSIngesterProcessChainPoolsClient) Send(m *ChainPoolsDataChunk) error { + return x.ClientStream.SendMsg(m) +} + +func (x *sQSIngesterProcessChainPoolsClient) CloseAndRecv() (*ProcessChainPoolsReply, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(ProcessChainPoolsReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *sQSIngesterClient) EndBlockProcess(ctx context.Context, in *EndBlockProcessRequest, opts ...grpc.CallOption) (*EndBlockProcessReply, error) { + out := new(EndBlockProcessReply) + err := c.cc.Invoke(ctx, "/sqs.ingest.v1beta1.SQSIngester/EndBlockProcess", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SQSIngesterServer is the server API for SQSIngester service. +// All implementations must embed UnimplementedSQSIngesterServer +// for forward compatibility +type SQSIngesterServer interface { + // StartBlockProcess starts block processing by sending block height and taker fee updates + // for that block. + StartBlockProcess(context.Context, *StartBlockProcessRequest) (*StartBlockProcessReply, error) + // ProcessChainPools processes the Osmosis liquidity pools in a streaming fashion. + ProcessChainPools(SQSIngester_ProcessChainPoolsServer) error + // EndBlockProcess is called when the block processing is finished. + // It sorts the pools for router for use intra-block. + // It commits all processed state into internal SQS repositories, including: + // - pools for display (pools repository) + // - sorted pools for use in the router (router repository) + // - taker fees (router repository) + // - block height (router chain info repository) + EndBlockProcess(context.Context, *EndBlockProcessRequest) (*EndBlockProcessReply, error) + mustEmbedUnimplementedSQSIngesterServer() +} + +// UnimplementedSQSIngesterServer must be embedded to have forward compatible implementations. +type UnimplementedSQSIngesterServer struct { +} + +func (UnimplementedSQSIngesterServer) StartBlockProcess(context.Context, *StartBlockProcessRequest) (*StartBlockProcessReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method StartBlockProcess not implemented") +} +func (UnimplementedSQSIngesterServer) ProcessChainPools(SQSIngester_ProcessChainPoolsServer) error { + return status.Errorf(codes.Unimplemented, "method ProcessChainPools not implemented") +} +func (UnimplementedSQSIngesterServer) EndBlockProcess(context.Context, *EndBlockProcessRequest) (*EndBlockProcessReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method EndBlockProcess not implemented") +} +func (UnimplementedSQSIngesterServer) mustEmbedUnimplementedSQSIngesterServer() {} + +// UnsafeSQSIngesterServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SQSIngesterServer will +// result in compilation errors. +type UnsafeSQSIngesterServer interface { + mustEmbedUnimplementedSQSIngesterServer() +} + +func RegisterSQSIngesterServer(s grpc.ServiceRegistrar, srv SQSIngesterServer) { + s.RegisterService(&SQSIngester_ServiceDesc, srv) +} + +func _SQSIngester_StartBlockProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StartBlockProcessRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SQSIngesterServer).StartBlockProcess(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/sqs.ingest.v1beta1.SQSIngester/StartBlockProcess", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SQSIngesterServer).StartBlockProcess(ctx, req.(*StartBlockProcessRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _SQSIngester_ProcessChainPools_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SQSIngesterServer).ProcessChainPools(&sQSIngesterProcessChainPoolsServer{stream}) +} + +type SQSIngester_ProcessChainPoolsServer interface { + SendAndClose(*ProcessChainPoolsReply) error + Recv() (*ChainPoolsDataChunk, error) + grpc.ServerStream +} + +type sQSIngesterProcessChainPoolsServer struct { + grpc.ServerStream +} + +func (x *sQSIngesterProcessChainPoolsServer) SendAndClose(m *ProcessChainPoolsReply) error { + return x.ServerStream.SendMsg(m) +} + +func (x *sQSIngesterProcessChainPoolsServer) Recv() (*ChainPoolsDataChunk, error) { + m := new(ChainPoolsDataChunk) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _SQSIngester_EndBlockProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(EndBlockProcessRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SQSIngesterServer).EndBlockProcess(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/sqs.ingest.v1beta1.SQSIngester/EndBlockProcess", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SQSIngesterServer).EndBlockProcess(ctx, req.(*EndBlockProcessRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// SQSIngester_ServiceDesc is the grpc.ServiceDesc for SQSIngester service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SQSIngester_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "sqs.ingest.v1beta1.SQSIngester", + HandlerType: (*SQSIngesterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "StartBlockProcess", + Handler: _SQSIngester_StartBlockProcess_Handler, + }, + { + MethodName: "EndBlockProcess", + Handler: _SQSIngester_EndBlockProcess_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "ProcessChainPools", + Handler: _SQSIngester_ProcessChainPools_Handler, + ClientStreams: true, + }, + }, + Metadata: "ingest.proto", +}