Skip to content

Commit

Permalink
Metering update: more detailed metering with addition of new metrics.…
Browse files Browse the repository at this point in the history
…  *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead.
  • Loading branch information
colindickson committed Aug 27, 2024
1 parent e502fe0 commit 5782c51
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s
* Substreams: add `--common-tmp-dir` flag and activate local caching of pre-compiled WASM modules through wazero feature
* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module.
* Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling
* * Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead.

## v1.5.7

Expand Down
9 changes: 7 additions & 2 deletions firehose/block_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/streamingfast/derr"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/dstore"
"github.com/streamingfast/firehose-core/metering"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -59,10 +60,12 @@ func (g *BlockGetter) Get(
mergedBlocksStore := g.mergedBlocksStore
if clonable, ok := mergedBlocksStore.(dstore.Clonable); ok {
var err error
mergedBlocksStore, err = clonable.Clone(ctx)
mergedBlocksStore, err = clonable.Clone(ctx, metering.WithBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...)
if err != nil {
return nil, err
}

//todo: (deprecated) remove this
mergedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx))
}

Expand Down Expand Up @@ -91,10 +94,12 @@ func (g *BlockGetter) Get(
forkedBlocksStore := g.forkedBlocksStore
if clonable, ok := forkedBlocksStore.(dstore.Clonable); ok {
var err error
forkedBlocksStore, err = clonable.Clone(ctx)
forkedBlocksStore, err = clonable.Clone(ctx, metering.WithForkedBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...)
if err != nil {
return nil, err
}

//todo: (deprecated) remove this
forkedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx))
}

Expand Down
70 changes: 40 additions & 30 deletions firehose/server/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/streamingfast/dauth"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/firehose-core/firehose/metrics"
"github.com/streamingfast/firehose-core/metering"
"github.com/streamingfast/logging"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
Expand Down Expand Up @@ -66,29 +67,9 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque
},
}

//////////////////////////////////////////////////////////////////////
meter := dmetering.GetBytesMeter(ctx)
bytesRead := meter.BytesReadDelta()
bytesWritten := meter.BytesWrittenDelta()
size := proto.Size(resp)

auth := dauth.FromContext(ctx)
event := dmetering.Event{
UserID: auth.UserID(),
ApiKeyID: auth.APIKeyID(),
IpAddress: auth.RealIP(),
Meta: auth.Meta(),
Endpoint: "sf.firehose.v2.Firehose/Block",
Metrics: map[string]float64{
"egress_bytes": float64(size),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
"block_count": 1,
},
Timestamp: time.Now(),
}
dmetering.Emit(ctx, event)
//////////////////////////////////////////////////////////////////////
metering.Send(ctx, meter, auth.UserID(), auth.APIKeyID(), auth.RealIP(), auth.Meta(), "sf.firehose.v2.Firehose/Block", resp)

return resp, nil
}
Expand Down Expand Up @@ -127,10 +108,6 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
}
}

isLiveBlock := func(step pbfirehose.ForkStep) bool {
return step == pbfirehose.ForkStep_STEP_NEW
}

var blockCount uint64
handlerFunc := bstream.HandlerFunc(func(block *pbbstream.Block, obj interface{}) error {
blockCount++
Expand Down Expand Up @@ -188,10 +165,6 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
return NewErrSendBlock(err)
}

if isLiveBlock(protoStep) {
dmetering.GetBytesMeter(ctx).AddBytesRead(len(block.Payload.Value))
}

level := zap.DebugLevel
if block.Number%200 == 0 {
level = zap.InfoLevel
Expand All @@ -206,8 +179,45 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
return status.Errorf(codes.Unimplemented, "no transforms registry configured within this instance")
}

liveSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler {
return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
if stepable, ok := obj.(bstream.Stepable); ok {
if stepable.Step().Matches(bstream.StepNew) {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadBytes, len(blk.GetPayload().GetValue()))

// legacy metering
// todo(colin): remove this once we are sure the new metering is working
dmetering.GetBytesMeter(ctx).AddBytesRead(len(blk.GetPayload().GetValue()))
} else {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadForkedBytes, len(blk.GetPayload().GetValue()))
}
}
return next.ProcessBlock(blk, obj)
})
}

fileSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler {
return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
if stepable, ok := obj.(bstream.Stepable); ok {
if stepable.Step().Matches(bstream.StepNew) {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadBytes, len(blk.GetPayload().GetValue()))
} else {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadForkedBytes, len(blk.GetPayload().GetValue()))
}
}
return next.ProcessBlock(blk, obj)
})
}

ctx = s.initFunc(ctx, request)
str, err := s.streamFactory.New(ctx, handlerFunc, request, logger)
str, err := s.streamFactory.New(
ctx,
handlerFunc,
request,
logger,
stream.WithLiveSourceHandlerMiddleware(liveSourceMiddlewareHandler),
stream.WithFileSourceHandlerMiddleware(fileSourceMiddlewareHandler),
)
if err != nil {
return err
}
Expand Down
33 changes: 6 additions & 27 deletions firehose/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/streamingfast/firehose-core/firehose"
"github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/firehose/rate"
"github.com/streamingfast/firehose-core/metering"
pbfirehoseV1 "github.com/streamingfast/pbgo/sf/firehose/v1"
pbfirehoseV2 "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand Down Expand Up @@ -66,41 +67,19 @@ func New(
opts ...Option,
) *Server {
initFunc := func(ctx context.Context, _ *pbfirehoseV2.Request) context.Context {
//////////////////////////////////////////////////////////////////////
ctx = dmetering.WithBytesMeter(ctx)
ctx = withRequestMeter(ctx)
return ctx
//////////////////////////////////////////////////////////////////////
}

postHookFunc := func(ctx context.Context, response *pbfirehoseV2.Response) {
//////////////////////////////////////////////////////////////////////
meter := dmetering.GetBytesMeter(ctx)
bytesRead := meter.BytesReadDelta()
bytesWritten := meter.BytesWrittenDelta()
size := proto.Size(response)

auth := dauth.FromContext(ctx)
event := dmetering.Event{
UserID: auth.UserID(),
ApiKeyID: auth.APIKeyID(),
IpAddress: auth.RealIP(),
Meta: auth.Meta(),
Endpoint: "sf.firehose.v2.Firehose/Blocks",
Metrics: map[string]float64{
"egress_bytes": float64(size),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
"block_count": 1,
},
Timestamp: time.Now(),
}

requestMeter := getRequestMeter(ctx)
requestMeter.blocks++
requestMeter.egressBytes += size
dmetering.Emit(ctx, event)
//////////////////////////////////////////////////////////////////////
requestMeter.egressBytes += proto.Size(response)

meter := dmetering.GetBytesMeter(ctx)
auth := dauth.FromContext(ctx)
metering.Send(ctx, meter, auth.UserID(), auth.APIKeyID(), auth.RealIP(), auth.Meta(), "sf.firehose.v2.Firehose/Block", response)
}

tracerProvider := otel.GetTracerProvider()
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf
github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b
github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4
github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2
github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0
github.com/streamingfast/substreams v1.9.4-0.20240826160128-7d7c7b132e06
github.com/streamingfast/substreams v1.9.4-0.20240827160230-05a454855aaf
github.com/stretchr/testify v1.8.4
github.com/test-go/testify v1.1.4
go.uber.org/multierr v1.10.0
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf h1:LXFIz2pyTlIMNzyifvKsZpFLcLbJkTcRyu7OlABV1S0=
github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b h1:LbT8xpXFY5bsZbQfhQJGcXUBXbl/QZZ7CqfN6nLtpwM=
github.com/streamingfast/bstream v0.0.2-0.20240819202225-ca1b790abf0b/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d h1:9tsEt2tLCp94CW6MyJZY+Rw6+t0WH2kioBR6ucO6P/E=
github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d/go.mod h1:og+6lDBPLZ24lbF/YISmVsSduZUZwXSmJGD3pZ/sW2Y=
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU=
Expand All @@ -551,12 +551,12 @@ github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a h1:JwAGZ7f5vkB
github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a/go.mod h1:EPtUX/vhRphE37Zo6sDcgD/S3sm5YqXHhxAgzS6Ebwo=
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 h1:HKi8AIkLBzxZWmbCRUo1RxoOLK33iXO6gZprfsE9rf4=
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4/go.mod h1:ehPytv7E4rI65iLcrwTes4rNGGqPPiugnH+20nDQyp4=
github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1 h1:zPqUBv2dBJ/N898pZ9W+1qDamQjbtdD7cwtwQB8PWTQ=
github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI=
github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb h1:tmu8wGiSTzdqk2CnPnI7GywKwepGieqNOQDRKKSiVJg=
github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w=
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 h1:BB3VSDl8/OHBSvjqfgufwqr4tD5l7XPjXybDm6uudj4=
github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 h1:g8eEYbFSykyzIyuxNMmHEUGGUvJE0ivmqZagLDK42gw=
Expand All @@ -582,8 +582,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt
github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4=
github.com/streamingfast/substreams v1.9.4-0.20240826160128-7d7c7b132e06 h1:tK+N8JBt7/bwYcaxtCNlPan9qkPjedcWglE/KDPe3dA=
github.com/streamingfast/substreams v1.9.4-0.20240826160128-7d7c7b132e06/go.mod h1:8vkSvR4XodacDXHSBpg+L4Jcq5BQpSBhGZOvM76C+H4=
github.com/streamingfast/substreams v1.9.4-0.20240827160230-05a454855aaf h1:SxYuU+ox5Jow5j+d2xXvfmuxBIoJXEyWFZg+C6T9Kdw=
github.com/streamingfast/substreams v1.9.4-0.20240827160230-05a454855aaf/go.mod h1:htDRslKI5Fj+JUqmKVsNj4Ph1DIzYig/K+VvP6SUIt0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
90 changes: 90 additions & 0 deletions metering/metering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package metering

import (
"context"
"time"

"go.uber.org/zap"

"github.com/streamingfast/dstore"

"github.com/streamingfast/dmetering"
"github.com/streamingfast/substreams/reqctx"
"google.golang.org/protobuf/proto"
)

const (
MeterLiveUncompressedReadBytes = "live_uncompressed_read_bytes"
MeterLiveUncompressedReadForkedBytes = "live_uncompressed_read_forked_bytes"

MeterFileUncompressedReadBytes = "file_uncompressed_read_bytes"
MeterFileUncompressedReadForkedBytes = "file_uncompressed_read_forked_bytes"
MeterFileCompressedReadForkedBytes = "file_compressed_read_forked_bytes"
MeterFileCompressedReadBytes = "file_compressed_read_bytes"

TotalReadBytes = "total_read_bytes"
)

func WithBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option {
return []dstore.Option{dstore.WithCompressedReadCallback(func(ctx context.Context, n int) {
meter.CountInc(MeterFileCompressedReadBytes, n)
})}
}

func WithForkedBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option {
return []dstore.Option{dstore.WithCompressedReadCallback(func(ctx context.Context, n int) {
meter.CountInc(MeterFileCompressedReadForkedBytes, n)
})}
}

func GetTotalBytesRead(meter dmetering.Meter) uint64 {
total := uint64(meter.GetCount(TotalReadBytes))
return total
}

func Send(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) {
bytesRead := meter.BytesReadDelta()
bytesWritten := meter.BytesWrittenDelta()
egressBytes := proto.Size(resp)

liveUncompressedReadBytes := meter.GetCountAndReset(MeterLiveUncompressedReadBytes)
liveUncompressedReadForkedBytes := meter.GetCountAndReset(MeterLiveUncompressedReadForkedBytes)

fileUncompressedReadBytes := meter.GetCountAndReset(MeterFileUncompressedReadBytes)
fileUncompressedReadForkedBytes := meter.GetCountAndReset(MeterFileUncompressedReadForkedBytes)
fileCompressedReadForkedBytes := meter.GetCountAndReset(MeterFileCompressedReadForkedBytes)
fileCompressedReadBytes := meter.GetCountAndReset(MeterFileCompressedReadBytes)

totalReadBytes := fileCompressedReadBytes + fileCompressedReadForkedBytes + liveUncompressedReadBytes + liveUncompressedReadForkedBytes

meter.CountInc(TotalReadBytes, int(totalReadBytes))

event := dmetering.Event{
UserID: userID,
ApiKeyID: apiKeyID,
IpAddress: ip,
Meta: userMeta,

Endpoint: endpoint,
Metrics: map[string]float64{
"egress_bytes": float64(egressBytes),
"written_bytes": float64(bytesWritten),
"read_bytes": float64(bytesRead),
MeterLiveUncompressedReadBytes: float64(liveUncompressedReadBytes),
MeterLiveUncompressedReadForkedBytes: float64(liveUncompressedReadForkedBytes),
MeterFileUncompressedReadBytes: float64(fileUncompressedReadBytes),
MeterFileUncompressedReadForkedBytes: float64(fileUncompressedReadForkedBytes),
MeterFileCompressedReadForkedBytes: float64(fileCompressedReadForkedBytes),
MeterFileCompressedReadBytes: float64(fileCompressedReadBytes),
"block_count": 1,
},
Timestamp: time.Now(),
}

emitter := reqctx.Emitter(ctx)
if emitter == nil {
dmetering.Emit(context.WithoutCancel(ctx), event)
} else {
emitter.Emit(context.WithoutCancel(ctx), event)
}
}
Loading

0 comments on commit 5782c51

Please sign in to comment.