Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store-free output module optimization #330

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

* Optimized start of output stream in developer mode when start block is in reversible segment and output module does not have any stores in its dependencies.


## v1.1.19

### Changed
Expand All @@ -18,7 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Fixed stream ending immediately in dev mode when start/end blocks are both 0.
* Sink-serve: fix missing output details on docker-compose apply errors
* Codegen: Fixed pluralized entity created for db_out and graph_out

*
## v1.1.18

### Fixed
Expand Down
13 changes: 13 additions & 0 deletions manifest/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,19 @@ func (g *ModuleGraph) ChildrenOf(moduleName string) ([]*pbsubstreams.Module, err
return res, nil
}

func (g *ModuleGraph) HasStatefulDependencies(moduleName string) (bool, error) {
stores, err := g.StoresDownTo(moduleName)
if err != nil {
return false, fmt.Errorf("getting stores down to %s: %w", moduleName, err)
}

if len(stores) > 0 {
return true, nil
}

return false, nil
}

func (g *ModuleGraph) StoresDownTo(moduleName string) ([]*pbsubstreams.Module, error) {
alreadyAdded := map[string]bool{}
topologicalIndex := map[string]int{}
Expand Down
23 changes: 21 additions & 2 deletions pipeline/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/streamingfast/substreams/manifest"
"sync/atomic"

"github.com/streamingfast/bstream"
Expand Down Expand Up @@ -41,12 +42,23 @@ func BuildRequestDetails(
return nil, nil, err
}

linearHandoff, err := computeLinearHandoffBlockNum(request.ProductionMode, req.ResolvedStartBlockNum, request.StopBlockNum, getRecentFinalBlock)
graph, err := manifest.NewModuleGraph(request.Modules.Modules)
if err != nil {
return nil, nil, status.Errorf(grpccodes.InvalidArgument, "invalid modules: %s", err.Error())
}

moduleHasStatefulDependencies, err := graph.HasStatefulDependencies(request.OutputModule)
if err != nil {
return nil, nil, status.Errorf(grpccodes.InvalidArgument, "invalid output module: %s", err.Error())
}

linearHandoff, err := computeLinearHandoffBlockNum(request.ProductionMode, req.ResolvedStartBlockNum, request.StopBlockNum, getRecentFinalBlock, moduleHasStatefulDependencies)
if err != nil {
return nil, nil, err
}

req.LinearHandoffBlockNum = linearHandoff

req.LinearGateBlockNum = req.LinearHandoffBlockNum
if req.ResolvedStartBlockNum > req.LinearHandoffBlockNum {
req.LinearGateBlockNum = req.ResolvedStartBlockNum
Expand Down Expand Up @@ -76,7 +88,7 @@ func nextUniqueID() uint64 {
return uniqueRequestIDCounter.Add(1)
}

func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uint64, getRecentFinalBlockFunc func() (uint64, error)) (uint64, error) {
func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uint64, getRecentFinalBlockFunc func() (uint64, error), stateRequired bool) (uint64, error) {
if productionMode {
maxHandoff, err := getRecentFinalBlockFunc()
if err != nil {
Expand All @@ -90,10 +102,17 @@ func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uin
}
return min(stopBlock, maxHandoff), nil
}

//if no state required, we don't need to ever back-process blocks. we can start flowing blocks right away from the start block
if !stateRequired {
return startBlock, nil
}

maxHandoff, err := getRecentFinalBlockFunc()
if err != nil {
return startBlock, nil
}

return min(startBlock, maxHandoff), nil
}

Expand Down
42 changes: 29 additions & 13 deletions pipeline/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,37 @@ func Test_computeLiveHandoffBlockNum(t *testing.T) {
stopBlockNum uint64
expectHandoffNum uint64
expectError bool
stateRequired bool
}{
// prod (start-block ignored)
{true, 100, true, 10, 0, 100, false},
{true, 100, true, 10, 150, 100, false},
{true, 100, true, 10, 50, 50, false},
{false, 0, true, 10, 50, 50, false},
{false, 0, true, 10, 0, 0, true},
{true, 100, true, 10, 0, 100, false, true},
{true, 100, true, 10, 150, 100, false, true},
{true, 100, true, 10, 50, 50, false, true},
{false, 0, true, 10, 50, 50, false, true},
{false, 0, true, 10, 0, 0, true, true},

// non-prod (stop-block ignored)
{true, 100, false, 10, 0, 10, false},
{true, 100, false, 10, 9999, 10, false},
{true, 100, false, 150, 0, 100, false},
{true, 100, false, 150, 9999, 100, false},
{false, 0, false, 150, 0, 150, false},
{false, 0, false, 150, 9999, 150, false},
// prod (start-block ignored) (state not required)
{true, 100, true, 10, 0, 100, false, false},
{true, 100, true, 10, 150, 100, false, false},
{true, 100, true, 10, 50, 50, false, false},
{false, 0, true, 10, 50, 50, false, false},
{false, 0, true, 10, 0, 0, true, false},

// non-prod (stop-block ignored) (state required)
{true, 100, false, 10, 0, 10, false, true},
{true, 100, false, 10, 9999, 10, false, true},
{true, 100, false, 150, 0, 100, false, true},
{true, 100, false, 150, 9999, 100, false, true},
{false, 0, false, 150, 0, 150, false, true},
{false, 0, false, 150, 9999, 150, false, true},

// non-prod (stop-block ignored) (state not required)
{true, 100, false, 10, 0, 10, false, false},
{true, 100, false, 10, 9999, 10, false, false},
{true, 100, false, 150, 0, 150, false, false},
{true, 100, false, 150, 9999, 150, false, false},
{false, 0, false, 150, 0, 150, false, false},
{false, 0, false, 150, 9999, 150, false, false},
}

for _, test := range tests {
Expand All @@ -209,7 +225,7 @@ func Test_computeLiveHandoffBlockNum(t *testing.T) {
return 0, fmt.Errorf("live not available")
}
return test.recentBlockNum, nil
})
}, test.stateRequired)
if test.expectError {
assert.Error(t, err)
} else {
Expand Down