Skip to content

Commit

Permalink
optimized start of output stream in developer mode when start block i…
Browse files Browse the repository at this point in the history
…s in reversible segment and output module does not have any stores in its dependencies
  • Loading branch information
colindickson committed Oct 26, 2023
1 parent dd57692 commit 0a4289d
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 16 deletions.
7 changes: 6 additions & 1 deletion docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

* Optimized start of output stream in developer mode when start block is in reversible segment and output module does not have any stores in its dependencies.


## v1.1.19

### Changed
Expand All @@ -18,7 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Fixed stream ending immediately in dev mode when start/end blocks are both 0.
* Sink-serve: fix missing output details on docker-compose apply errors
* Codegen: Fixed pluralized entity created for db_out and graph_out

*
## v1.1.18

### Fixed
Expand Down
13 changes: 13 additions & 0 deletions manifest/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,19 @@ func (g *ModuleGraph) ChildrenOf(moduleName string) ([]*pbsubstreams.Module, err
return res, nil
}

func (g *ModuleGraph) HasStatefulDependencies(moduleName string) (bool, error) {
stores, err := g.StoresDownTo(moduleName)
if err != nil {
return false, fmt.Errorf("getting stores down to %s: %w", moduleName, err)
}

if len(stores) > 0 {
return true, nil
}

return false, nil
}

func (g *ModuleGraph) StoresDownTo(moduleName string) ([]*pbsubstreams.Module, error) {
alreadyAdded := map[string]bool{}
topologicalIndex := map[string]int{}
Expand Down
23 changes: 21 additions & 2 deletions pipeline/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/streamingfast/substreams/manifest"
"sync/atomic"

"github.com/streamingfast/bstream"
Expand Down Expand Up @@ -41,12 +42,23 @@ func BuildRequestDetails(
return nil, nil, err
}

linearHandoff, err := computeLinearHandoffBlockNum(request.ProductionMode, req.ResolvedStartBlockNum, request.StopBlockNum, getRecentFinalBlock)
graph, err := manifest.NewModuleGraph(request.Modules.Modules)
if err != nil {
return nil, nil, status.Errorf(grpccodes.InvalidArgument, "invalid modules: %s", err.Error())
}

moduleHasStatefulDependencies, err := graph.HasStatefulDependencies(request.OutputModule)
if err != nil {
return nil, nil, status.Errorf(grpccodes.InvalidArgument, "invalid output module: %s", err.Error())
}

linearHandoff, err := computeLinearHandoffBlockNum(request.ProductionMode, req.ResolvedStartBlockNum, request.StopBlockNum, getRecentFinalBlock, moduleHasStatefulDependencies)
if err != nil {
return nil, nil, err
}

req.LinearHandoffBlockNum = linearHandoff

req.LinearGateBlockNum = req.LinearHandoffBlockNum
if req.ResolvedStartBlockNum > req.LinearHandoffBlockNum {
req.LinearGateBlockNum = req.ResolvedStartBlockNum
Expand Down Expand Up @@ -76,7 +88,7 @@ func nextUniqueID() uint64 {
return uniqueRequestIDCounter.Add(1)
}

func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uint64, getRecentFinalBlockFunc func() (uint64, error)) (uint64, error) {
func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uint64, getRecentFinalBlockFunc func() (uint64, error), stateRequired bool) (uint64, error) {
if productionMode {
maxHandoff, err := getRecentFinalBlockFunc()
if err != nil {
Expand All @@ -90,10 +102,17 @@ func computeLinearHandoffBlockNum(productionMode bool, startBlock, stopBlock uin
}
return min(stopBlock, maxHandoff), nil
}

//if no state required, we don't need to ever back-process blocks. we can start flowing blocks right away from the start block
if !stateRequired {
return startBlock, nil
}

maxHandoff, err := getRecentFinalBlockFunc()
if err != nil {
return startBlock, nil
}

return min(startBlock, maxHandoff), nil
}

Expand Down
42 changes: 29 additions & 13 deletions pipeline/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,37 @@ func Test_computeLiveHandoffBlockNum(t *testing.T) {
stopBlockNum uint64
expectHandoffNum uint64
expectError bool
stateRequired bool
}{
// prod (start-block ignored)
{true, 100, true, 10, 0, 100, false},
{true, 100, true, 10, 150, 100, false},
{true, 100, true, 10, 50, 50, false},
{false, 0, true, 10, 50, 50, false},
{false, 0, true, 10, 0, 0, true},
{true, 100, true, 10, 0, 100, false, true},
{true, 100, true, 10, 150, 100, false, true},
{true, 100, true, 10, 50, 50, false, true},
{false, 0, true, 10, 50, 50, false, true},
{false, 0, true, 10, 0, 0, true, true},

// non-prod (stop-block ignored)
{true, 100, false, 10, 0, 10, false},
{true, 100, false, 10, 9999, 10, false},
{true, 100, false, 150, 0, 100, false},
{true, 100, false, 150, 9999, 100, false},
{false, 0, false, 150, 0, 150, false},
{false, 0, false, 150, 9999, 150, false},
// prod (start-block ignored) (state not required)
{true, 100, true, 10, 0, 100, false, false},
{true, 100, true, 10, 150, 100, false, false},
{true, 100, true, 10, 50, 50, false, false},
{false, 0, true, 10, 50, 50, false, false},
{false, 0, true, 10, 0, 0, true, false},

// non-prod (stop-block ignored) (state required)
{true, 100, false, 10, 0, 10, false, true},
{true, 100, false, 10, 9999, 10, false, true},
{true, 100, false, 150, 0, 100, false, true},
{true, 100, false, 150, 9999, 100, false, true},
{false, 0, false, 150, 0, 150, false, true},
{false, 0, false, 150, 9999, 150, false, true},

// non-prod (stop-block ignored) (state not required)
{true, 100, false, 10, 0, 10, false, false},
{true, 100, false, 10, 9999, 10, false, false},
{true, 100, false, 150, 0, 150, false, false},
{true, 100, false, 150, 9999, 150, false, false},
{false, 0, false, 150, 0, 150, false, false},
{false, 0, false, 150, 9999, 150, false, false},
}

for _, test := range tests {
Expand All @@ -209,7 +225,7 @@ func Test_computeLiveHandoffBlockNum(t *testing.T) {
return 0, fmt.Errorf("live not available")
}
return test.recentBlockNum, nil
})
}, test.stateRequired)
if test.expectError {
assert.Error(t, err)
} else {
Expand Down

0 comments on commit 0a4289d

Please sign in to comment.