Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip events on invalid state, increase grpc max size #143

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func runServer(cfg *config.Config) error {

// Flow client
// TODO: WithInsecure()?
flowClient, err := client.New(cfg.AccessAPIHost, grpc.WithInsecure())
maxSize := 1024 * 1024 * 64
flowClient, err := client.New(cfg.AccessAPIHost, grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxSize)),
)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ func New(cfg *config.Config, db *gorm.DB, flowClient *client.Client, poll bool)

if poll {
go poller(app)
go packContractEventsPoller(app)
go sentTransactionsPoller(app)
go sendableTransactionPoller(app)
}

return app, nil
Expand Down
17 changes: 12 additions & 5 deletions service/app/contract_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,8 @@ func (svc *ContractService) UpdateMintingStatus(ctx context.Context, db *gorm.DB
// Set the FlowID of the pack
// Make sure the pack is in correct state
if err := pack.Seal(packFlowID); err != nil {
return err // rollback
logger.Warn(fmt.Sprintf("pack in wrong state %s packFlowId:%+v", err, packFlowID))
continue
}

// Update the pack in database
Expand Down Expand Up @@ -910,14 +911,17 @@ func (svc *ContractService) UpdateCirculatingPackContract(ctx context.Context, d
"packFlowID": pack.FlowID,
})

eventLogger.Info("handling event...")

switch eventName {
// -- REVEAL_REQUEST, Owner has requested to reveal a pack ------------
case REVEAL_REQUEST:

// Make sure the pack is in correct state
if err := pack.RevealRequestHandled(); err != nil {
err := fmt.Errorf("error while handling %s: %w", eventName, err)
return err // rollback
eventLogger.Warn(fmt.Sprintf("distID:%s distFlowID:%s packID:%s packFlowID:%s err:%s", distribution.ID, distribution.FlowID, pack.ID, pack.FlowID, err.Error()))
continue
}

// Update the pack in database
Expand Down Expand Up @@ -1006,7 +1010,8 @@ func (svc *ContractService) UpdateCirculatingPackContract(ctx context.Context, d
// Make sure the pack is in correct state
if err := pack.Reveal(); err != nil {
err := fmt.Errorf("error while handling %s: %w", eventName, err)
return err // rollback
eventLogger.Warn(fmt.Sprintf("distID:%s distFlowID:%s packID:%s packFlowID:%s err:%s", distribution.ID, distribution.FlowID, pack.ID, pack.FlowID, err.Error()))
continue
}

// Update the pack in database
Expand All @@ -1020,7 +1025,8 @@ func (svc *ContractService) UpdateCirculatingPackContract(ctx context.Context, d
// Make sure the pack is in correct state
if err := pack.OpenRequestHandled(); err != nil {
err := fmt.Errorf("error while handling %s: %w", eventName, err)
return err // rollback
eventLogger.Warn(fmt.Sprintf("distID:%s distFlowID:%s packID:%s packFlowID:%s err:%s", distribution.ID, distribution.FlowID, pack.ID, pack.FlowID, err.Error()))
continue
}

// Update the pack in database
Expand Down Expand Up @@ -1088,7 +1094,8 @@ func (svc *ContractService) UpdateCirculatingPackContract(ctx context.Context, d
// Make sure the pack is in correct state
if err := pack.Open(); err != nil {
err := fmt.Errorf("error while handling %s: %w", eventName, err)
return err // rollback
eventLogger.Warn(fmt.Sprintf("distID:%s distFlowID:%s packID:%s packFlowID:%s err:%s", distribution.ID, distribution.FlowID, pack.ID, pack.FlowID, err.Error()))
continue
}

// Update the pack in database
Expand Down
60 changes: 56 additions & 4 deletions service/app/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
func poller(app *App) {

ticker := time.NewTicker(time.Second) // TODO (latenssi): configurable?
transactionRatelimiter := ratelimit.New(app.cfg.TransactionSendRate)

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -37,12 +35,66 @@ func poller(app *App) {
logPollerRun("handleMinting", handleMinting(ctx, app))
logPollerRun("handleComplete", handleComplete(ctx, app))

log.Trace("Poll end")
case <-app.quit:
cancel()
ticker.Stop()
return
}
}
}

// packContractEventsPoller is responsible for checking pack contract events from flow blockchain blocks
func packContractEventsPoller(app *App) {
ticker := time.NewTicker(time.Second) // TODO (latenssi): configurable?
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
for {
select {
case <-ticker.C:
log.Trace("PackContractEventsPoller start")
logPollerRun("pollCirculatingPackContractEvents", pollCirculatingPackContractEvents(ctx, app))
log.Trace("PackContractEventsPoller end")
case <-app.quit:
cancel()
ticker.Stop()
return
}
}
}

logPollerRun("handleSentTransactions", handleSentTransactions(ctx, app))
// sendableTransactionPoller is responsible for checking any sendable to transactions in the transactions table.
func sendableTransactionPoller(app *App) {
ticker := time.NewTicker(time.Second) // TODO (latenssi): configurable?
transactionRatelimiter := ratelimit.New(app.cfg.TransactionSendRate)

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
for {
select {
case <-ticker.C:
log.Trace("SendableTransactionPoller poll start")
logPollerRun("handleSendableTransactions", handleSendableTransactions(ctx, app, transactionRatelimiter))
log.Trace("SendableTransactionPoller poll end")
case <-app.quit:
cancel()
ticker.Stop()
return
}
}
}

log.Trace("Poll end")
// transactionPoller is responsible for sending flow transactions and check transaction status
func sentTransactionsPoller(app *App) {
ticker := time.NewTicker(time.Second) // TODO (latenssi): configurable?
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
for {
select {
case <-ticker.C:
log.Trace("SentTransactionsPoller poll start")
logPollerRun("handleSentTransactions", handleSentTransactions(ctx, app))
log.Trace("SentTransactionsPoller poll end")
case <-app.quit:
cancel()
ticker.Stop()
Expand Down
4 changes: 2 additions & 2 deletions service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type Config struct {

// -- Database --

DatabaseDSN string `env:"FLOW_PDS_DATABASE_DSN" envDefault:"pds.db"`
DatabaseType string `env:"FLOW_PDS_DATABASE_TYPE" envDefault:"sqlite"`
DatabaseDSN string `env:"FLOW_PDS_DATABASE_DSN" envDefault:"postgresql://pds:pds@localhost:5432/pds"`
DatabaseType string `env:"FLOW_PDS_DATABASE_TYPE" envDefault:"psql"`

// -- Host and chain access --

Expand Down
14 changes: 2 additions & 12 deletions test_lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"math/big"
"path"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -45,17 +44,8 @@ func getTestCfg(t *testing.T, b *testing.B) *config.Config {
panic(err)
}

cfg.DatabaseDSN = "test.db"

if t != nil {
cfg.DatabaseDSN = path.Join(t.TempDir(), "test.db")
}

if b != nil {
cfg.DatabaseDSN = path.Join(b.TempDir(), "test.db")
}

cfg.DatabaseType = "sqlite"
cfg.DatabaseDSN = "postgresql://pds:pds@localhost:5432/pds"
cfg.DatabaseType = "psql"

return cfg
}
Expand Down