diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 809734325..26d3eac48 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +* Optimized start of output stream in developer mode when start block is in reversible segment and output module does not have any stores in its dependencies. + + ## v1.1.19 ### Changed @@ -18,7 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Fixed stream ending immediately in dev mode when start/end blocks are both 0. * Sink-serve: fix missing output details on docker-compose apply errors * Codegen: Fixed pluralized entity created for db_out and graph_out - +* ## v1.1.18 ### Fixed diff --git a/manifest/graph.go b/manifest/graph.go index db5331e2b..da6b04d0d 100644 --- a/manifest/graph.go +++ b/manifest/graph.go @@ -341,6 +341,19 @@ func (g *ModuleGraph) ChildrenOf(moduleName string) ([]*pbsubstreams.Module, err return res, nil } +func (g *ModuleGraph) HasStatefulDependencies(moduleName string) (bool, error) { + stores, err := g.StoresDownTo(moduleName) + if err != nil { + return false, fmt.Errorf("getting stores down to %s: %w", moduleName, err) + } + + if len(stores) > 0 { + return true, nil + } + + return false, nil +} + func (g *ModuleGraph) StoresDownTo(moduleName string) ([]*pbsubstreams.Module, error) { alreadyAdded := map[string]bool{} topologicalIndex := map[string]int{} diff --git a/pipeline/resolve.go b/pipeline/resolve.go index a2c00eb3a..a09d2dd52 100644 --- a/pipeline/resolve.go +++ b/pipeline/resolve.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/streamingfast/substreams/manifest" "sync/atomic" "github.com/streamingfast/bstream" @@ -41,12 +42,23 @@ func BuildRequestDetails( return nil, nil, err } - linearHandoff, err := computeLinearHandoffBlockNum(request.ProductionMode, req.ResolvedStartBlockNum, request.StopBlockNum, getRecentFinalBlock) + graph, err := manifest.NewModuleGraph(request.Modules.Modules) + if err != nil { + return nil, nil, status.Errorf(grpccodes.InvalidArgument, "invalid modules: %s", err.Error()) + } + + moduleHasStatefulDependencies, err := graph.HasStatefulDependencies(request.OutputModule) + if err != nil { + return nil, nil, status.Errorf(grpccodes.InvalidArgument, "invalid output module: %s", err.Error()) + } + + linearHandoff, err := computeLinearHandoffBlockNum(request.ProductionMode, req.ResolvedStartBlockNum, request.StopBlockNum, getRecentFinalBlock, moduleHasStatefulDependencies) if err != nil { return nil, nil, err } req.LinearHandoffBlockNum = linearHandoff + req.LinearGateBlockNum = req.LinearHandoffBlockNum if req.ResolvedStartBlockNum > req.LinearHandoffBlockNum { req.LinearGateBlockNum = req.ResolvedStartBlockNum @@ -76,7 +88,7 @@ func nextUniqueID() uint64 { return uniqueRequestIDCounter.Add(1) } -func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uint64, getRecentFinalBlockFunc func() (uint64, error)) (uint64, error) { +func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uint64, getRecentFinalBlockFunc func() (uint64, error), stateRequired bool) (uint64, error) { if productionMode { maxHandoff, err := getRecentFinalBlockFunc() if err != nil { @@ -90,10 +102,17 @@ func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uin } return min(stopBlock, maxHandoff), nil } + + //if no state required, we don't need to ever back-process blocks. we can start flowing blocks right away from the start block + if !stateRequired { + return startBlock, nil + } + maxHandoff, err := getRecentFinalBlockFunc() if err != nil { return startBlock, nil } + return min(startBlock, maxHandoff), nil } diff --git a/pipeline/resolve_test.go b/pipeline/resolve_test.go index 981290896..199a4e47a 100644 --- a/pipeline/resolve_test.go +++ b/pipeline/resolve_test.go @@ -181,21 +181,37 @@ func Test_computeLiveHandoffBlockNum(t *testing.T) { stopBlockNum uint64 expectHandoffNum uint64 expectError bool + stateRequired bool }{ // prod (start-block ignored) - {true, 100, true, 10, 0, 100, false}, - {true, 100, true, 10, 150, 100, false}, - {true, 100, true, 10, 50, 50, false}, - {false, 0, true, 10, 50, 50, false}, - {false, 0, true, 10, 0, 0, true}, + {true, 100, true, 10, 0, 100, false, true}, + {true, 100, true, 10, 150, 100, false, true}, + {true, 100, true, 10, 50, 50, false, true}, + {false, 0, true, 10, 50, 50, false, true}, + {false, 0, true, 10, 0, 0, true, true}, - // non-prod (stop-block ignored) - {true, 100, false, 10, 0, 10, false}, - {true, 100, false, 10, 9999, 10, false}, - {true, 100, false, 150, 0, 100, false}, - {true, 100, false, 150, 9999, 100, false}, - {false, 0, false, 150, 0, 150, false}, - {false, 0, false, 150, 9999, 150, false}, + // prod (start-block ignored) (state not required) + {true, 100, true, 10, 0, 100, false, false}, + {true, 100, true, 10, 150, 100, false, false}, + {true, 100, true, 10, 50, 50, false, false}, + {false, 0, true, 10, 50, 50, false, false}, + {false, 0, true, 10, 0, 0, true, false}, + + // non-prod (stop-block ignored) (state required) + {true, 100, false, 10, 0, 10, false, true}, + {true, 100, false, 10, 9999, 10, false, true}, + {true, 100, false, 150, 0, 100, false, true}, + {true, 100, false, 150, 9999, 100, false, true}, + {false, 0, false, 150, 0, 150, false, true}, + {false, 0, false, 150, 9999, 150, false, true}, + + // non-prod (stop-block ignored) (state not required) + {true, 100, false, 10, 0, 10, false, false}, + {true, 100, false, 10, 9999, 10, false, false}, + {true, 100, false, 150, 0, 150, false, false}, + {true, 100, false, 150, 9999, 150, false, false}, + {false, 0, false, 150, 0, 150, false, false}, + {false, 0, false, 150, 9999, 150, false, false}, } for _, test := range tests { @@ -209,7 +225,7 @@ func Test_computeLiveHandoffBlockNum(t *testing.T) { return 0, fmt.Errorf("live not available") } return test.recentBlockNum, nil - }) + }, test.stateRequired) if test.expectError { assert.Error(t, err) } else {