diff --git a/CHANGELOG.md b/CHANGELOG.md index c69d1f7..180ddd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s * Substreams: add `--common-tmp-dir` flag and activate local caching of pre-compiled WASM modules through wazero feature * Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module. * Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling +* * Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead. ## v1.5.7 diff --git a/firehose/block_getter.go b/firehose/block_getter.go index 6ac564b..11c67e9 100644 --- a/firehose/block_getter.go +++ b/firehose/block_getter.go @@ -11,6 +11,7 @@ import ( "github.com/streamingfast/derr" "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" + "github.com/streamingfast/firehose-core/metering" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -59,10 +60,12 @@ func (g *BlockGetter) Get( mergedBlocksStore := g.mergedBlocksStore if clonable, ok := mergedBlocksStore.(dstore.Clonable); ok { var err error - mergedBlocksStore, err = clonable.Clone(ctx) + mergedBlocksStore, err = clonable.Clone(ctx, metering.WithBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this mergedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } @@ -91,10 +94,12 @@ func (g *BlockGetter) Get( forkedBlocksStore := g.forkedBlocksStore if clonable, ok := forkedBlocksStore.(dstore.Clonable); ok { var err error - forkedBlocksStore, err = clonable.Clone(ctx) + forkedBlocksStore, err = clonable.Clone(ctx, metering.WithForkedBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this forkedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } diff --git a/firehose/server/blocks.go b/firehose/server/blocks.go index b73da6c..7e57fe9 100644 --- a/firehose/server/blocks.go +++ b/firehose/server/blocks.go @@ -14,6 +14,7 @@ import ( "github.com/streamingfast/dauth" "github.com/streamingfast/dmetering" "github.com/streamingfast/firehose-core/firehose/metrics" + "github.com/streamingfast/firehose-core/metering" "github.com/streamingfast/logging" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" @@ -66,29 +67,9 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque }, } - ////////////////////////////////////////////////////////////////////// meter := dmetering.GetBytesMeter(ctx) - bytesRead := meter.BytesReadDelta() - bytesWritten := meter.BytesWrittenDelta() - size := proto.Size(resp) - auth := dauth.FromContext(ctx) - event := dmetering.Event{ - UserID: auth.UserID(), - ApiKeyID: auth.APIKeyID(), - IpAddress: auth.RealIP(), - Meta: auth.Meta(), - Endpoint: "sf.firehose.v2.Firehose/Block", - Metrics: map[string]float64{ - "egress_bytes": float64(size), - "written_bytes": float64(bytesWritten), - "read_bytes": float64(bytesRead), - "block_count": 1, - }, - Timestamp: time.Now(), - } - dmetering.Emit(ctx, event) - ////////////////////////////////////////////////////////////////////// + metering.Send(ctx, meter, auth.UserID(), auth.APIKeyID(), auth.RealIP(), auth.Meta(), "sf.firehose.v2.Firehose/Block", resp) return resp, nil } @@ -127,10 +108,6 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream } } - isLiveBlock := func(step pbfirehose.ForkStep) bool { - return step == pbfirehose.ForkStep_STEP_NEW - } - var blockCount uint64 handlerFunc := bstream.HandlerFunc(func(block *pbbstream.Block, obj interface{}) error { blockCount++ @@ -188,10 +165,6 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream return NewErrSendBlock(err) } - if isLiveBlock(protoStep) { - dmetering.GetBytesMeter(ctx).AddBytesRead(len(block.Payload.Value)) - } - level := zap.DebugLevel if block.Number%200 == 0 { level = zap.InfoLevel @@ -206,8 +179,45 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream return status.Errorf(codes.Unimplemented, "no transforms registry configured within this instance") } + liveSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler { + return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + if stepable, ok := obj.(bstream.Stepable); ok { + if stepable.Step().Matches(bstream.StepNew) { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadBytes, len(blk.GetPayload().GetValue())) + + // legacy metering + // todo(colin): remove this once we are sure the new metering is working + dmetering.GetBytesMeter(ctx).AddBytesRead(len(blk.GetPayload().GetValue())) + } else { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadForkedBytes, len(blk.GetPayload().GetValue())) + } + } + return next.ProcessBlock(blk, obj) + }) + } + + fileSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler { + return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + if stepable, ok := obj.(bstream.Stepable); ok { + if stepable.Step().Matches(bstream.StepNew) { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadBytes, len(blk.GetPayload().GetValue())) + } else { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadForkedBytes, len(blk.GetPayload().GetValue())) + } + } + return next.ProcessBlock(blk, obj) + }) + } + ctx = s.initFunc(ctx, request) - str, err := s.streamFactory.New(ctx, handlerFunc, request, logger) + str, err := s.streamFactory.New( + ctx, + handlerFunc, + request, + logger, + stream.WithLiveSourceHandlerMiddleware(liveSourceMiddlewareHandler), + stream.WithFileSourceHandlerMiddleware(fileSourceMiddlewareHandler), + ) if err != nil { return err } diff --git a/firehose/server/server.go b/firehose/server/server.go index f4b52c4..1b494a4 100644 --- a/firehose/server/server.go +++ b/firehose/server/server.go @@ -18,6 +18,7 @@ import ( "github.com/streamingfast/firehose-core/firehose" "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/firehose/rate" + "github.com/streamingfast/firehose-core/metering" pbfirehoseV1 "github.com/streamingfast/pbgo/sf/firehose/v1" pbfirehoseV2 "github.com/streamingfast/pbgo/sf/firehose/v2" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -66,41 +67,19 @@ func New( opts ...Option, ) *Server { initFunc := func(ctx context.Context, _ *pbfirehoseV2.Request) context.Context { - ////////////////////////////////////////////////////////////////////// ctx = dmetering.WithBytesMeter(ctx) ctx = withRequestMeter(ctx) return ctx - ////////////////////////////////////////////////////////////////////// } postHookFunc := func(ctx context.Context, response *pbfirehoseV2.Response) { - ////////////////////////////////////////////////////////////////////// - meter := dmetering.GetBytesMeter(ctx) - bytesRead := meter.BytesReadDelta() - bytesWritten := meter.BytesWrittenDelta() - size := proto.Size(response) - - auth := dauth.FromContext(ctx) - event := dmetering.Event{ - UserID: auth.UserID(), - ApiKeyID: auth.APIKeyID(), - IpAddress: auth.RealIP(), - Meta: auth.Meta(), - Endpoint: "sf.firehose.v2.Firehose/Blocks", - Metrics: map[string]float64{ - "egress_bytes": float64(size), - "written_bytes": float64(bytesWritten), - "read_bytes": float64(bytesRead), - "block_count": 1, - }, - Timestamp: time.Now(), - } - requestMeter := getRequestMeter(ctx) requestMeter.blocks++ - requestMeter.egressBytes += size - dmetering.Emit(ctx, event) - ////////////////////////////////////////////////////////////////////// + requestMeter.egressBytes += proto.Size(response) + + meter := dmetering.GetBytesMeter(ctx) + auth := dauth.FromContext(ctx) + metering.Send(ctx, meter, auth.UserID(), auth.APIKeyID(), auth.RealIP(), auth.Meta(), "sf.firehose.v2.Firehose/Block", response) } tracerProvider := otel.GetTracerProvider() diff --git a/go.mod b/go.mod index 96687a0..bad8288 100644 --- a/go.mod +++ b/go.mod @@ -17,22 +17,22 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 - github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf + github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 - github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1 + github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 - github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb + github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 - github.com/streamingfast/substreams v1.9.4-0.20240826160128-7d7c7b132e06 + github.com/streamingfast/substreams v1.9.4-0.20240827160230-05a454855aaf github.com/stretchr/testify v1.8.4 github.com/test-go/testify v1.1.4 go.uber.org/multierr v1.10.0 diff --git a/go.sum b/go.sum index e7c34f8..164175f 100644 --- a/go.sum +++ b/go.sum @@ -537,8 +537,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= -github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf h1:LXFIz2pyTlIMNzyifvKsZpFLcLbJkTcRyu7OlABV1S0= -github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= +github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b h1:LbT8xpXFY5bsZbQfhQJGcXUBXbl/QZZ7CqfN6nLtpwM= +github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d h1:9tsEt2tLCp94CW6MyJZY+Rw6+t0WH2kioBR6ucO6P/E= github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d/go.mod h1:og+6lDBPLZ24lbF/YISmVsSduZUZwXSmJGD3pZ/sW2Y= github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU= @@ -551,12 +551,12 @@ github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a h1:JwAGZ7f5vkB github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a/go.mod h1:EPtUX/vhRphE37Zo6sDcgD/S3sm5YqXHhxAgzS6Ebwo= github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 h1:HKi8AIkLBzxZWmbCRUo1RxoOLK33iXO6gZprfsE9rf4= github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4/go.mod h1:ehPytv7E4rI65iLcrwTes4rNGGqPPiugnH+20nDQyp4= -github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1 h1:zPqUBv2dBJ/N898pZ9W+1qDamQjbtdD7cwtwQB8PWTQ= -github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= +github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI= +github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw= -github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb h1:tmu8wGiSTzdqk2CnPnI7GywKwepGieqNOQDRKKSiVJg= -github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w= +github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 h1:BB3VSDl8/OHBSvjqfgufwqr4tD5l7XPjXybDm6uudj4= +github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w= github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ= github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns= github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 h1:g8eEYbFSykyzIyuxNMmHEUGGUvJE0ivmqZagLDK42gw= @@ -582,8 +582,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= -github.com/streamingfast/substreams v1.9.4-0.20240826160128-7d7c7b132e06 h1:tK+N8JBt7/bwYcaxtCNlPan9qkPjedcWglE/KDPe3dA= -github.com/streamingfast/substreams v1.9.4-0.20240826160128-7d7c7b132e06/go.mod h1:8vkSvR4XodacDXHSBpg+L4Jcq5BQpSBhGZOvM76C+H4= +github.com/streamingfast/substreams v1.9.4-0.20240827160230-05a454855aaf h1:SxYuU+ox5Jow5j+d2xXvfmuxBIoJXEyWFZg+C6T9Kdw= +github.com/streamingfast/substreams v1.9.4-0.20240827160230-05a454855aaf/go.mod h1:htDRslKI5Fj+JUqmKVsNj4Ph1DIzYig/K+VvP6SUIt0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= diff --git a/metering/metering.go b/metering/metering.go new file mode 100644 index 0000000..6155c85 --- /dev/null +++ b/metering/metering.go @@ -0,0 +1,90 @@ +package metering + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/streamingfast/dstore" + + "github.com/streamingfast/dmetering" + "github.com/streamingfast/substreams/reqctx" + "google.golang.org/protobuf/proto" +) + +const ( + MeterLiveUncompressedReadBytes = "live_uncompressed_read_bytes" + MeterLiveUncompressedReadForkedBytes = "live_uncompressed_read_forked_bytes" + + MeterFileUncompressedReadBytes = "file_uncompressed_read_bytes" + MeterFileUncompressedReadForkedBytes = "file_uncompressed_read_forked_bytes" + MeterFileCompressedReadForkedBytes = "file_compressed_read_forked_bytes" + MeterFileCompressedReadBytes = "file_compressed_read_bytes" + + TotalReadBytes = "total_read_bytes" +) + +func WithBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option { + return []dstore.Option{dstore.WithCompressedReadCallback(func(ctx context.Context, n int) { + meter.CountInc(MeterFileCompressedReadBytes, n) + })} +} + +func WithForkedBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option { + return []dstore.Option{dstore.WithCompressedReadCallback(func(ctx context.Context, n int) { + meter.CountInc(MeterFileCompressedReadForkedBytes, n) + })} +} + +func GetTotalBytesRead(meter dmetering.Meter) uint64 { + total := uint64(meter.GetCount(TotalReadBytes)) + return total +} + +func Send(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) { + bytesRead := meter.BytesReadDelta() + bytesWritten := meter.BytesWrittenDelta() + egressBytes := proto.Size(resp) + + liveUncompressedReadBytes := meter.GetCountAndReset(MeterLiveUncompressedReadBytes) + liveUncompressedReadForkedBytes := meter.GetCountAndReset(MeterLiveUncompressedReadForkedBytes) + + fileUncompressedReadBytes := meter.GetCountAndReset(MeterFileUncompressedReadBytes) + fileUncompressedReadForkedBytes := meter.GetCountAndReset(MeterFileUncompressedReadForkedBytes) + fileCompressedReadForkedBytes := meter.GetCountAndReset(MeterFileCompressedReadForkedBytes) + fileCompressedReadBytes := meter.GetCountAndReset(MeterFileCompressedReadBytes) + + totalReadBytes := fileCompressedReadBytes + fileCompressedReadForkedBytes + liveUncompressedReadBytes + liveUncompressedReadForkedBytes + + meter.CountInc(TotalReadBytes, int(totalReadBytes)) + + event := dmetering.Event{ + UserID: userID, + ApiKeyID: apiKeyID, + IpAddress: ip, + Meta: userMeta, + + Endpoint: endpoint, + Metrics: map[string]float64{ + "egress_bytes": float64(egressBytes), + "written_bytes": float64(bytesWritten), + "read_bytes": float64(bytesRead), + MeterLiveUncompressedReadBytes: float64(liveUncompressedReadBytes), + MeterLiveUncompressedReadForkedBytes: float64(liveUncompressedReadForkedBytes), + MeterFileUncompressedReadBytes: float64(fileUncompressedReadBytes), + MeterFileUncompressedReadForkedBytes: float64(fileUncompressedReadForkedBytes), + MeterFileCompressedReadForkedBytes: float64(fileCompressedReadForkedBytes), + MeterFileCompressedReadBytes: float64(fileCompressedReadBytes), + "block_count": 1, + }, + Timestamp: time.Now(), + } + + emitter := reqctx.Emitter(ctx) + if emitter == nil { + dmetering.Emit(context.WithoutCancel(ctx), event) + } else { + emitter.Emit(context.WithoutCancel(ctx), event) + } +} diff --git a/stream_factory.go b/stream_factory.go index 25f3d31..31346be 100644 --- a/stream_factory.go +++ b/stream_factory.go @@ -4,12 +4,15 @@ import ( "context" "fmt" + "github.com/streamingfast/dmetering" + + "github.com/streamingfast/firehose-core/metering" + "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/hub" "github.com/streamingfast/bstream/stream" "github.com/streamingfast/bstream/transform" "github.com/streamingfast/dauth" - "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" @@ -44,7 +47,8 @@ func (sf *StreamFactory) New( ctx context.Context, handler bstream.Handler, request *pbfirehose.Request, - logger *zap.Logger) (*stream.Stream, error) { + logger *zap.Logger, + extraOpts ...stream.Option) (*stream.Stream, error) { reqLogger := logger.With( zap.Int64("req_start_block", request.StartBlockNum), @@ -108,23 +112,31 @@ func (sf *StreamFactory) New( forkedBlocksStore := sf.forkedBlocksStore if clonable, ok := forkedBlocksStore.(dstore.Clonable); ok { var err error - forkedBlocksStore, err = clonable.Clone(ctx) + forkedBlocksStore, err = clonable.Clone(ctx, metering.WithForkedBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this forkedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } mergedBlocksStore := sf.mergedBlocksStore if clonable, ok := mergedBlocksStore.(dstore.Clonable); ok { var err error - mergedBlocksStore, err = clonable.Clone(ctx) + mergedBlocksStore, err = clonable.Clone(ctx, metering.WithBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this mergedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } + for _, opt := range extraOpts { + options = append(options, opt) + } + str := stream.New( forkedBlocksStore, mergedBlocksStore,