diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b41d8d..770cc7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added +- Added metrics to expose blocks/bytes read/sent from sources +- Added a metric to track blocks behind live on a joiningsource for a trace_id - Added FileSourceWithSecondaryBlocksStores Option to allow a fallback location - `.SetNearBlocksCount(count)` and `.Clone()` on `Tracker` object. - `Tracker` object to streamline queries about different targets (like network head, database lib, relayer blockstream head, whatever other BlockRef tags), ask the question about them being near one another (to select between live mode or catch-up mode). Also streamlines the requests of a start block, with a bunch of different backend implementations that can answer to the questions regarding where to start. diff --git a/block_payload.go b/block_payload.go index caa5830..383ce97 100644 --- a/block_payload.go +++ b/block_payload.go @@ -17,6 +17,7 @@ type BlockPayloadSetter func(block *Block, data []byte) (*Block, error) type BlockPayload interface { Get() (data []byte, err error) + Size() int } type MemoryBlockPayload struct { @@ -35,6 +36,10 @@ func (p *MemoryBlockPayload) Get() (data []byte, err error) { return p.data, err } +func (p *MemoryBlockPayload) Size() int { + return len(p.data) +} + var atmCache *atm.Cache var store dstore.Store @@ -109,6 +114,10 @@ func (p *ATMCachedBlockPayload) Get() (data []byte, err error) { return } +func (p *ATMCachedBlockPayload) Size() int { + return p.dataSize +} + func ATMCachedPayloadSetter(block *Block, data []byte) (*Block, error) { _, err := getCache().Write(block.Id, block.Timestamp, time.Now(), data) if err != nil { diff --git a/filesource.go b/filesource.go index a42be1a..124e139 100644 --- a/filesource.go +++ b/filesource.go @@ -449,6 +449,9 @@ func (s *FileSource) streamReader(blockReader BlockReader, prevLastBlockRead Blo break } + BlocksReadFileSource.Inc() + BytesReadFileSource.AddInt(blk.Payload.Size()) + blockNum := blk.Num() if blockNum < s.startBlockNum { continue @@ -509,6 +512,8 @@ func (s *FileSource) preprocess(block *Block, out chan *PreprocessedBlock) { }} zlog.Debug("block pre processed", zap.Stringer("block_ref", block)) + BlocksSentFileSource.Inc() + BytesSentFileSource.AddInt(block.Payload.Size()) select { case <-s.Terminating(): return diff --git a/go.mod b/go.mod index 5a296ce..2c91523 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,9 @@ require ( github.com/streamingfast/atm v0.0.0-20220131151839-18c87005e680 github.com/streamingfast/dbin v0.0.0-20210809205249-73d5eca35dc5 github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc - github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447 + github.com/streamingfast/dmetrics v0.0.0-20221012032216-6cf8338d4429 github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648 + github.com/streamingfast/dtracing v0.0.0-20221011173312-3f74543e68eb github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424 github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 github.com/streamingfast/pbgo v0.0.6-0.20220629184423-cfd0608e0cf4 @@ -29,7 +30,7 @@ require ( cloud.google.com/go/monitoring v1.6.0 // indirect cloud.google.com/go/storage v1.22.1 // indirect cloud.google.com/go/trace v1.2.0 // indirect - contrib.go.opencensus.io/exporter/stackdriver v0.12.6 // indirect + contrib.go.opencensus.io/exporter/stackdriver v0.13.10 // indirect contrib.go.opencensus.io/exporter/zipkin v0.1.1 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect github.com/Azure/azure-storage-blob-go v0.14.0 // indirect @@ -37,7 +38,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.0 // indirect github.com/blendle/zapdriver v1.3.1 // indirect - github.com/census-instrumentation/opencensus-proto v0.2.1 // indirect + github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect @@ -58,12 +59,12 @@ require ( github.com/mitchellh/go-testing-interface v1.14.1 // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/openzipkin/zipkin-go v0.1.6 // indirect + github.com/paulbellamy/ratecounter v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect - github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect @@ -79,4 +80,4 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220808131553-a91ffa7f803e // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect -) +) \ No newline at end of file diff --git a/go.sum b/go.sum index abafae5..bd9aa1e 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,7 @@ cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1 cloud.google.com/go/iam v0.1.1/go.mod h1:CKqrcnI/suGpybEHxZ7BMehL0oA4LpdyJdUlTl9jVMw= cloud.google.com/go/iam v0.3.0 h1:exkAomrVUuzx9kWFI1wm3KI0uoDeUFPB4kKGzx6x+Gc= cloud.google.com/go/iam v0.3.0/go.mod h1:XzJPvDayI+9zsASAFO68Hk07u3z+f+JrT2xXNdp4bnY= +cloud.google.com/go/monitoring v1.1.0/go.mod h1:L81pzz7HKn14QCMaCs6NTQkdBnE87TElyanS95vIcl4= cloud.google.com/go/monitoring v1.6.0 h1:+x5AA2mFkiHK/ySN6NWKbeKBV+Z/DN+h51kBzcW08zU= cloud.google.com/go/monitoring v1.6.0/go.mod h1:w+OY1TYCk4MtvY7WfEHlIp5mP8SV/gDSqOsvGhVa2KM= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -67,10 +68,13 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKuqnZI01LAA= cloud.google.com/go/storage v1.22.1 h1:F6IlQJZrZM++apn9V5/VfS3gbTUYg98PS3EMQAzqtfg= cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= +cloud.google.com/go/trace v1.0.0/go.mod h1:4iErSByzxkyHWzzlAj63/Gmjz0NH1ASqhJguHpGcr6A= cloud.google.com/go/trace v1.2.0 h1:oIaB4KahkIUOpLSAAjEJ8y2desbjY/x/RfP4O3KAtTI= cloud.google.com/go/trace v1.2.0/go.mod h1:Wc8y/uYyOhPy12KEnXG9XGrvfMz5F5SrYecQlbW1rwM= contrib.go.opencensus.io/exporter/stackdriver v0.12.6 h1:Y2FTyj0HgOhfjEW6D6ytZNoz1YcPDXmkKr1I478CWKs= contrib.go.opencensus.io/exporter/stackdriver v0.12.6/go.mod h1:8x999/OcIPy5ivx/wDiV7Gx4D+VUPODf0mWRGRc5kSk= +contrib.go.opencensus.io/exporter/stackdriver v0.13.10 h1:a9+GZPUe+ONKUwULjlEOucMMG0qfSCCenlji0Nhqbys= +contrib.go.opencensus.io/exporter/stackdriver v0.13.10/go.mod h1:I5htMbyta491eUxufwwZPQdcKvvgzMB4O9ni41YnIM8= contrib.go.opencensus.io/exporter/zipkin v0.1.1 h1:PR+1zWqY8ceXs1qDQQIlgXe+sdiwCf0n32bH4+Epk8g= contrib.go.opencensus.io/exporter/zipkin v0.1.1/go.mod h1:GMvdSl3eJ2gapOaLKzTKE3qDgUkJ86k9k3yY2eqwkzc= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= @@ -119,6 +123,8 @@ github.com/blendle/zapdriver v1.3.1 h1:C3dydBOWYRiOk+B8X9IVZ5IOe+7cl+tGOexN4QqHf github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc= github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk= +github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= @@ -335,7 +341,13 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.1.6 h1:yXiysv1CSK7Q5yjGy1710zZGnsbMUIjluWBxtLXHPBo= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= +github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs= +github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pinax-network/dmetrics v0.0.0-20221007092947-973c981de09f h1:ca5ZACDx4kZRQk43gxnr5l5v7eVVmRKrKvCHAwW+e5I= +github.com/pinax-network/dmetrics v0.0.0-20221007092947-973c981de09f/go.mod h1:fWoyaD76fE7mXZfkfcAfNeU/Hv/y6yJ/RgEcInQLwSw= +github.com/pinax-network/dtracing v0.0.0-20221007093316-91e3187b1e55 h1:HYk1ueqMDudtAJMXrt3Vky7GwC3A/ius0NiC4IqrD/8= +github.com/pinax-network/dtracing v0.0.0-20221007093316-91e3187b1e55/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -379,12 +391,17 @@ github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc h1:sRhUilbZExb github.com/streamingfast/dgrpc v0.0.0-20220909121013-162e9305bbfc/go.mod h1:YlFJuFiB9rmglB5UfTfnsOTfKC1rFo+D0sRbTzLcqgc= github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447 h1:oZwOVjxpWCqLUjgcPgVigVCHYR40JkmXfm1kuMcCOQk= github.com/streamingfast/dmetrics v0.0.0-20210811180524-8494aeb34447/go.mod h1:VLdQY/FwczmC/flqWkcsBbqXO4BhU4zQDSK7GMrpcjY= +github.com/streamingfast/dmetrics v0.0.0-20221012032216-6cf8338d4429 h1:ll/nzUOxIt7vmJxI5cNJVS1KEsneOSvwkDHt9Vaqq+Q= +github.com/streamingfast/dmetrics v0.0.0-20221012032216-6cf8338d4429/go.mod h1:fWoyaD76fE7mXZfkfcAfNeU/Hv/y6yJ/RgEcInQLwSw= github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648 h1:xpy3HNXeUHaZexf42duj7NeOmXcGfDMJXlZaj3CX18Y= github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648/go.mod h1:SHSEIPowGeE1TfNNmGeAUUnlO3dwevmX5kFOSazU60M= github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a h1:/7Rw3pYpueJYOQReTJpfAhAPk0uZD4I58LfiUAr4IMc= github.com/streamingfast/dtracing v0.0.0-20210811175635-d55665d3622a/go.mod h1:bqiYZaX6L/MoXNfFQeAdau6g9HLA3yKHkX8KzStt58Q= +github.com/streamingfast/dtracing v0.0.0-20221011173312-3f74543e68eb h1:XPuLw6gwN4k1DRxHEwRGbNYVVBf3Petx+Cv5GUx2vIU= +github.com/streamingfast/dtracing v0.0.0-20221011173312-3f74543e68eb/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns= github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20210908162127-bdc5856d5341/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= +github.com/streamingfast/logging v0.0.0-20220304183711-ddba33d79e27/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424 h1:qKt1W13L7GXL3xqvD6z2ufSkIy/KDm9oGrfurypC78E= github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU= github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 h1:xlWSfi1BoPfsHtPb0VEHGUcAdBF208LUiFCwfaVPfLA= @@ -549,6 +566,7 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= @@ -628,6 +646,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -753,6 +773,8 @@ google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6 google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE= google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI= +google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E= +google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU= google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= google.golang.org/api v0.63.0/go.mod h1:gs4ij2ffTRXwuzzgJl/56BdwJaA194ijkfn++9tDuPo= google.golang.org/api v0.64.0/go.mod h1:931CdxA8Rm4t6zqTFGSsgwbAEZ2+GMYurbndwSimebM= @@ -834,7 +856,11 @@ google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEc google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= +google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= +google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211018162055-cf77aa76bad2/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= diff --git a/hub/hub.go b/hub/hub.go index 5275bc7..73b6549 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -300,6 +300,9 @@ func (h *ForkableHub) processBlock(blk *bstream.Block, obj interface{}) error { zlog.Debug("process_block", zap.Stringer("blk", blk), zap.Any("obj", obj.(*forkable.ForkableObject).Step())) preprocBlock := &bstream.PreprocessedBlock{Block: blk, Obj: obj} + bstream.BlocksReadLiveSource.Inc() + bstream.BytesReadLiveSource.AddInt(blk.Payload.Size()) + subscribers := h.subscribers // we may remove some from the original slice during the loop for _, sub := range subscribers { diff --git a/joiningsource.go b/joiningsource.go index 84fbd53..0f7a14e 100644 --- a/joiningsource.go +++ b/joiningsource.go @@ -15,10 +15,13 @@ package bstream import ( + "context" "errors" "fmt" "sync" + "time" + "github.com/streamingfast/dtracing" "github.com/streamingfast/shutter" "go.uber.org/zap" ) @@ -44,6 +47,7 @@ type JoiningSource struct { lastBlockProcessed *Block + ctx context.Context startBlockNum uint64 // overriden by cursor if it exists cursor *Cursor @@ -54,6 +58,7 @@ func NewJoiningSource( fileSourceFactory, liveSourceFactory ForkableSourceFactory, h Handler, + ctx context.Context, startBlockNum uint64, cursor *Cursor, logger *zap.Logger) *JoiningSource { @@ -64,6 +69,7 @@ func NewJoiningSource( fileSourceFactory: fileSourceFactory, liveSourceFactory: liveSourceFactory, handler: h, + ctx: ctx, startBlockNum: startBlockNum, cursor: cursor, logger: logger, @@ -98,6 +104,8 @@ func (s *JoiningSource) run() error { s.cursor.String()) } + defer s.deleteBlocksBehindLive() + s.OnTerminating(fileSrc.Shutdown) fileSrc.Run() @@ -108,7 +116,6 @@ func (s *JoiningSource) run() error { s.OnTerminating(s.liveSource.Shutdown) s.liveSource.Run() return s.liveSource.Err() - } func (s *JoiningSource) tryGetSource(handler Handler, factory ForkableSourceFactory) Source { @@ -122,6 +129,7 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { if s.liveSource != nil { // we should be already shutdown anyway return nil } + s.logBlocksBehindLive(s.lowestLiveBlockNum - blk.Number) if blk.Number >= s.lowestLiveBlockNum { if src := s.liveSourceFactory.SourceFromBlockNum(blk.Number, s.handler); src != nil { @@ -135,3 +143,24 @@ func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { return s.handler.ProcessBlock(blk, obj) } + +func (s *JoiningSource) deleteBlocksBehindLive() { + traceId := dtracing.GetTraceIDOrEmpty(s.ctx).String() + + go func() { + // allow Prometheus to scrape the current metrics before they are dropped + // 2 min is the maximum recommended scrape interval + time.Sleep(2 * time.Minute) + BlocksBehindLive.DeleteLabelValues(traceId) + }() +} + +func (s *JoiningSource) logBlocksBehindLive(blocksBehindLive uint64) { + traceId := dtracing.GetTraceIDOrEmpty(s.ctx).String() + + if blocksBehindLive <= 0 { // avoid cluttering the metrics with streams that caught up to live + s.deleteBlocksBehindLive() + } else { + BlocksBehindLive.SetUint64(blocksBehindLive, traceId) + } +} diff --git a/metrics.go b/metrics.go index 51f94d4..a81cd96 100644 --- a/metrics.go +++ b/metrics.go @@ -20,6 +20,17 @@ import ( var Metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("bstream")) +var BlocksReadFileSource = Metrics.NewCounter("blocks_read_filesource", "Number of blocks read from file source") +var BytesReadFileSource = Metrics.NewCounter("bytes_read_filesource", "Bytes read from file source") + +var BlocksSentFileSource = Metrics.NewCounter("blocks_sent_filesource", "Number of blocks sent that came from file source") +var BytesSentFileSource = Metrics.NewCounter("bytes_sent_filesource", "Bytes sent that came from file source") + +var BlocksReadLiveSource = Metrics.NewCounter("blocks_read_livesource", "Number of blocks read from live source") +var BytesReadLiveSource = Metrics.NewCounter("bytes_read_livesource", "Bytes read from live source") + +var BlocksBehindLive = Metrics.NewGaugeVec("blocks_behind_live", []string{"trace_id"}, "Number of blocks behind live source") + func WithHeadMetrics(h Handler, blkNum *dmetrics.HeadBlockNum, blkDrift *dmetrics.HeadTimeDrift) Handler { return HandlerFunc(func(blk *Block, obj interface{}) error { blkDrift.SetBlockTime(blk.Time()) diff --git a/stream/stream.go b/stream/stream.go index 014eafa..bad89bb 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -75,7 +75,7 @@ func New( } func (s *Stream) Run(ctx context.Context) error { - source, err := s.createSource() + source, err := s.createSource(ctx) if err != nil { return err } @@ -97,7 +97,7 @@ func (s *Stream) Run(ctx context.Context) error { return nil } -func (s *Stream) createSource() (bstream.Source, error) { +func (s *Stream) createSource(ctx context.Context) (bstream.Source, error) { s.logger.Debug("setting up firehose source") absoluteStartBlockNum, err := resolveNegativeStartBlockNum(s.startBlockNum, s.currentHeadGetter) @@ -138,6 +138,7 @@ func (s *Stream) createSource() (bstream.Source, error) { s.fileSourceFactory, s.liveSourceFactory, h, + ctx, absoluteStartBlockNum, s.cursor, s.logger,