Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add some bstream metrics #25

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased

### Added
- Added metrics to expose blocks/bytes read/sent from sources
- Added a metric to track blocks behind live on a joiningsource for a trace_id
- Added FileSourceWithSecondaryBlocksStores Option to allow a fallback location
- `.SetNearBlocksCount(count)` and `.Clone()` on `Tracker` object.
- `Tracker` object to streamline queries about different targets (like network head, database lib, relayer blockstream head, whatever other BlockRef tags), ask the question about them being near one another (to select between live mode or catch-up mode). Also streamlines the requests of a start block, with a bunch of different backend implementations that can answer to the questions regarding where to start.
Expand Down
9 changes: 9 additions & 0 deletions block_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type BlockPayloadSetter func(block *Block, data []byte) (*Block, error)

type BlockPayload interface {
Get() (data []byte, err error)
Size() int
}

type MemoryBlockPayload struct {
Expand All @@ -35,6 +36,10 @@ func (p *MemoryBlockPayload) Get() (data []byte, err error) {
return p.data, err
}

func (p *MemoryBlockPayload) Size() int {
return len(p.data)
}

var atmCache *atm.Cache
var store dstore.Store

Expand Down Expand Up @@ -109,6 +114,10 @@ func (p *ATMCachedBlockPayload) Get() (data []byte, err error) {
return
}

func (p *ATMCachedBlockPayload) Size() int {
return p.dataSize
}

func ATMCachedPayloadSetter(block *Block, data []byte) (*Block, error) {
_, err := getCache().Write(block.Id, block.Timestamp, time.Now(), data)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ func (s *FileSource) streamReader(blockReader BlockReader, prevLastBlockRead Blo
break
}

BlocksReadFileSource.Inc()
BytesReadFileSource.AddInt(blk.Payload.Size())

blockNum := blk.Num()
if blockNum < s.startBlockNum {
continue
Expand Down Expand Up @@ -509,6 +512,8 @@ func (s *FileSource) preprocess(block *Block, out chan *PreprocessedBlock) {
}}

zlog.Debug("block pre processed", zap.Stringer("block_ref", block))
BlocksSentFileSource.Inc()
BytesSentFileSource.AddInt(block.Payload.Size())
select {
case <-s.Terminating():
return
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ require (
github.com/streamingfast/atm v0.0.0-20220131151839-18c87005e680
github.com/streamingfast/dbin v0.0.0-20210809205249-73d5eca35dc5
github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc
github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447
github.com/streamingfast/dmetrics v0.0.0-20221012032216-6cf8338d4429
github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648
github.com/streamingfast/dtracing v0.0.0-20221011173312-3f74543e68eb
github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424
github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308
github.com/streamingfast/pbgo v0.0.6-0.20220629184423-cfd0608e0cf4
Expand All @@ -29,15 +30,15 @@ require (
cloud.google.com/go/monitoring v1.6.0 // indirect
cloud.google.com/go/storage v1.22.1 // indirect
cloud.google.com/go/trace v1.2.0 // indirect
contrib.go.opencensus.io/exporter/stackdriver v0.12.6 // indirect
contrib.go.opencensus.io/exporter/stackdriver v0.13.10 // indirect
contrib.go.opencensus.io/exporter/zipkin v0.1.1 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
github.com/aws/aws-sdk-go v1.37.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.2.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
Expand All @@ -58,12 +59,12 @@ require (
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/openzipkin/zipkin-go v0.1.6 // indirect
github.com/paulbellamy/ratecounter v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
Expand All @@ -79,4 +80,4 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220808131553-a91ffa7f803e // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
)
26 changes: 26 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1
cloud.google.com/go/iam v0.1.1/go.mod h1:CKqrcnI/suGpybEHxZ7BMehL0oA4LpdyJdUlTl9jVMw=
cloud.google.com/go/iam v0.3.0 h1:exkAomrVUuzx9kWFI1wm3KI0uoDeUFPB4kKGzx6x+Gc=
cloud.google.com/go/iam v0.3.0/go.mod h1:XzJPvDayI+9zsASAFO68Hk07u3z+f+JrT2xXNdp4bnY=
cloud.google.com/go/monitoring v1.1.0/go.mod h1:L81pzz7HKn14QCMaCs6NTQkdBnE87TElyanS95vIcl4=
cloud.google.com/go/monitoring v1.6.0 h1:+x5AA2mFkiHK/ySN6NWKbeKBV+Z/DN+h51kBzcW08zU=
cloud.google.com/go/monitoring v1.6.0/go.mod h1:w+OY1TYCk4MtvY7WfEHlIp5mP8SV/gDSqOsvGhVa2KM=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
Expand All @@ -67,10 +68,13 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKuqnZI01LAA=
cloud.google.com/go/storage v1.22.1 h1:F6IlQJZrZM++apn9V5/VfS3gbTUYg98PS3EMQAzqtfg=
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
cloud.google.com/go/trace v1.0.0/go.mod h1:4iErSByzxkyHWzzlAj63/Gmjz0NH1ASqhJguHpGcr6A=
cloud.google.com/go/trace v1.2.0 h1:oIaB4KahkIUOpLSAAjEJ8y2desbjY/x/RfP4O3KAtTI=
cloud.google.com/go/trace v1.2.0/go.mod h1:Wc8y/uYyOhPy12KEnXG9XGrvfMz5F5SrYecQlbW1rwM=
contrib.go.opencensus.io/exporter/stackdriver v0.12.6 h1:Y2FTyj0HgOhfjEW6D6ytZNoz1YcPDXmkKr1I478CWKs=
contrib.go.opencensus.io/exporter/stackdriver v0.12.6/go.mod h1:8x999/OcIPy5ivx/wDiV7Gx4D+VUPODf0mWRGRc5kSk=
contrib.go.opencensus.io/exporter/stackdriver v0.13.10 h1:a9+GZPUe+ONKUwULjlEOucMMG0qfSCCenlji0Nhqbys=
contrib.go.opencensus.io/exporter/stackdriver v0.13.10/go.mod h1:I5htMbyta491eUxufwwZPQdcKvvgzMB4O9ni41YnIM8=
contrib.go.opencensus.io/exporter/zipkin v0.1.1 h1:PR+1zWqY8ceXs1qDQQIlgXe+sdiwCf0n32bH4+Epk8g=
contrib.go.opencensus.io/exporter/zipkin v0.1.1/go.mod h1:GMvdSl3eJ2gapOaLKzTKE3qDgUkJ86k9k3yY2eqwkzc=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
Expand Down Expand Up @@ -119,6 +123,8 @@ github.com/blendle/zapdriver v1.3.1 h1:C3dydBOWYRiOk+B8X9IVZ5IOe+7cl+tGOexN4QqHf
github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
Expand Down Expand Up @@ -335,7 +341,13 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/openzipkin/zipkin-go v0.1.6 h1:yXiysv1CSK7Q5yjGy1710zZGnsbMUIjluWBxtLXHPBo=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pinax-network/dmetrics v0.0.0-20221007092947-973c981de09f h1:ca5ZACDx4kZRQk43gxnr5l5v7eVVmRKrKvCHAwW+e5I=
github.com/pinax-network/dmetrics v0.0.0-20221007092947-973c981de09f/go.mod h1:fWoyaD76fE7mXZfkfcAfNeU/Hv/y6yJ/RgEcInQLwSw=
github.com/pinax-network/dtracing v0.0.0-20221007093316-91e3187b1e55 h1:HYk1ueqMDudtAJMXrt3Vky7GwC3A/ius0NiC4IqrD/8=
github.com/pinax-network/dtracing v0.0.0-20221007093316-91e3187b1e55/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -379,12 +391,17 @@ github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc h1:sRhUilbZExb
github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc/go.mod h1:YlFJuFiB9rmglB5UfTfnsOTfKC1rFo+D0sRbTzLcqgc=
github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447 h1:oZwOVjxpWCqLUjgcPgVigVCHYR40JkmXfm1kuMcCOQk=
github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447/go.mod h1:VLdQY/FwczmC/flqWkcsBbqXO4BhU4zQDSK7GMrpcjY=
github.com/streamingfast/dmetrics v0.0.0-20221012032216-6cf8338d4429 h1:ll/nzUOxIt7vmJxI5cNJVS1KEsneOSvwkDHt9Vaqq+Q=
github.com/streamingfast/dmetrics v0.0.0-20221012032216-6cf8338d4429/go.mod h1:fWoyaD76fE7mXZfkfcAfNeU/Hv/y6yJ/RgEcInQLwSw=
github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648 h1:xpy3HNXeUHaZexf42duj7NeOmXcGfDMJXlZaj3CX18Y=
github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648/go.mod h1:SHSEIPowGeE1TfNNmGeAUUnlO3dwevmX5kFOSazU60M=
github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a h1:/7Rw3pYpueJYOQReTJpfAhAPk0uZD4I58LfiUAr4IMc=
github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a/go.mod h1:bqiYZaX6L/MoXNfFQeAdau6g9HLA3yKHkX8KzStt58Q=
github.com/streamingfast/dtracing v0.0.0-20221011173312-3f74543e68eb h1:XPuLw6gwN4k1DRxHEwRGbNYVVBf3Petx+Cv5GUx2vIU=
github.com/streamingfast/dtracing v0.0.0-20221011173312-3f74543e68eb/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=
github.com/streamingfast/logging v0.0.0-20210908162127-bdc5856d5341/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=
github.com/streamingfast/logging v0.0.0-20220304183711-ddba33d79e27/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=
github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424 h1:qKt1W13L7GXL3xqvD6z2ufSkIy/KDm9oGrfurypC78E=
github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU=
github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 h1:xlWSfi1BoPfsHtPb0VEHGUcAdBF208LUiFCwfaVPfLA=
Expand Down Expand Up @@ -549,6 +566,7 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
Expand Down Expand Up @@ -628,6 +646,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -753,6 +773,8 @@ google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6
google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI=
google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E=
google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU=
google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I=
google.golang.org/api v0.63.0/go.mod h1:gs4ij2ffTRXwuzzgJl/56BdwJaA194ijkfn++9tDuPo=
google.golang.org/api v0.64.0/go.mod h1:931CdxA8Rm4t6zqTFGSsgwbAEZ2+GMYurbndwSimebM=
Expand Down Expand Up @@ -834,7 +856,11 @@ google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEc
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211018162055-cf77aa76bad2/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
Expand Down
3 changes: 3 additions & 0 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ func (h *ForkableHub) processBlock(blk *bstream.Block, obj interface{}) error {
zlog.Debug("process_block", zap.Stringer("blk", blk), zap.Any("obj", obj.(*forkable.ForkableObject).Step()))
preprocBlock := &bstream.PreprocessedBlock{Block: blk, Obj: obj}

bstream.BlocksReadLiveSource.Inc()
bstream.BytesReadLiveSource.AddInt(blk.Payload.Size())

subscribers := h.subscribers // we may remove some from the original slice during the loop

for _, sub := range subscribers {
Expand Down
31 changes: 30 additions & 1 deletion joiningsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
package bstream

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/streamingfast/dtracing"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)
Expand All @@ -44,6 +47,7 @@ type JoiningSource struct {

lastBlockProcessed *Block

ctx context.Context
startBlockNum uint64 // overriden by cursor if it exists
cursor *Cursor

Expand All @@ -54,6 +58,7 @@ func NewJoiningSource(
fileSourceFactory,
liveSourceFactory ForkableSourceFactory,
h Handler,
ctx context.Context,
startBlockNum uint64,
cursor *Cursor,
logger *zap.Logger) *JoiningSource {
Expand All @@ -64,6 +69,7 @@ func NewJoiningSource(
fileSourceFactory: fileSourceFactory,
liveSourceFactory: liveSourceFactory,
handler: h,
ctx: ctx,
startBlockNum: startBlockNum,
cursor: cursor,
logger: logger,
Expand Down Expand Up @@ -98,6 +104,8 @@ func (s *JoiningSource) run() error {
s.cursor.String())
}

defer s.deleteBlocksBehindLive()

s.OnTerminating(fileSrc.Shutdown)
fileSrc.Run()

Expand All @@ -108,7 +116,6 @@ func (s *JoiningSource) run() error {
s.OnTerminating(s.liveSource.Shutdown)
s.liveSource.Run()
return s.liveSource.Err()

}

func (s *JoiningSource) tryGetSource(handler Handler, factory ForkableSourceFactory) Source {
Expand All @@ -122,6 +129,7 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error {
if s.liveSource != nil { // we should be already shutdown anyway
return nil
}
s.logBlocksBehindLive(s.lowestLiveBlockNum - blk.Number)

if blk.Number >= s.lowestLiveBlockNum {
if src := s.liveSourceFactory.SourceFromBlockNum(blk.Number, s.handler); src != nil {
Expand All @@ -135,3 +143,24 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error {

return s.handler.ProcessBlock(blk, obj)
}

func (s *JoiningSource) deleteBlocksBehindLive() {
traceId := dtracing.GetTraceIDOrEmpty(s.ctx).String()

go func() {
// allow Prometheus to scrape the current metrics before they are dropped
// 2 min is the maximum recommended scrape interval
time.Sleep(2 * time.Minute)
BlocksBehindLive.DeleteLabelValues(traceId)
}()
}

func (s *JoiningSource) logBlocksBehindLive(blocksBehindLive uint64) {
traceId := dtracing.GetTraceIDOrEmpty(s.ctx).String()

if blocksBehindLive <= 0 { // avoid cluttering the metrics with streams that caught up to live
s.deleteBlocksBehindLive()
} else {
BlocksBehindLive.SetUint64(blocksBehindLive, traceId)
}
}
11 changes: 11 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ import (

var Metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("bstream"))

var BlocksReadFileSource = Metrics.NewCounter("blocks_read_filesource", "Number of blocks read from file source")
var BytesReadFileSource = Metrics.NewCounter("bytes_read_filesource", "Bytes read from file source")

var BlocksSentFileSource = Metrics.NewCounter("blocks_sent_filesource", "Number of blocks sent that came from file source")
var BytesSentFileSource = Metrics.NewCounter("bytes_sent_filesource", "Bytes sent that came from file source")

var BlocksReadLiveSource = Metrics.NewCounter("blocks_read_livesource", "Number of blocks read from live source")
var BytesReadLiveSource = Metrics.NewCounter("bytes_read_livesource", "Bytes read from live source")

var BlocksBehindLive = Metrics.NewGaugeVec("blocks_behind_live", []string{"trace_id"}, "Number of blocks behind live source")

func WithHeadMetrics(h Handler, blkNum *dmetrics.HeadBlockNum, blkDrift *dmetrics.HeadTimeDrift) Handler {
return HandlerFunc(func(blk *Block, obj interface{}) error {
blkDrift.SetBlockTime(blk.Time())
Expand Down
5 changes: 3 additions & 2 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func New(
}

func (s *Stream) Run(ctx context.Context) error {
source, err := s.createSource()
source, err := s.createSource(ctx)
if err != nil {
return err
}
Expand All @@ -97,7 +97,7 @@ func (s *Stream) Run(ctx context.Context) error {
return nil
}

func (s *Stream) createSource() (bstream.Source, error) {
func (s *Stream) createSource(ctx context.Context) (bstream.Source, error) {
s.logger.Debug("setting up firehose source")

absoluteStartBlockNum, err := resolveNegativeStartBlockNum(s.startBlockNum, s.currentHeadGetter)
Expand Down Expand Up @@ -138,6 +138,7 @@ func (s *Stream) createSource() (bstream.Source, error) {
s.fileSourceFactory,
s.liveSourceFactory,
h,
ctx,
absoluteStartBlockNum,
s.cursor,
s.logger,
Expand Down