Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sdk infomation metrics #2208

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/21-simple-mono-vertex.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
name: simple-mono-vertex
spec:
Expand All @@ -12,4 +14,4 @@ spec:
sink:
udsink:
container:
image: quay.io/numaio/numaflow-rs/sink-log:stable
image: quay.io/numaio/numaflow-rs/sink-log:stable
2 changes: 1 addition & 1 deletion pkg/daemon/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
var (
pipeline_info = promauto.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "pipeline",
Name: "build_info",
Name: "daemon_build_info",
Help: "A metric with a constant value '1', labeled by Numaflow binary version and platform, as well as the pipeline name",
}, []string{metrics.LabelVersion, metrics.LabelPlatform, metrics.LabelPipeline})

Expand Down
11 changes: 11 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,20 @@ const (
LabelPartitionName = "partition_name"
LabelMonoVertexName = "mvtx_name"

LabelSDKLanguage = "language"
LabelSDKVersion = "version"
LabelSDKType = "type" // container type, e.g sourcer, sourcetransformer, sinker, etc. see serverinfo.ContainerType

LabelReason = "reason"
)

var (
SDKInfo = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "sdk_info",
Help: "A metric with a constant value '1', labeled by SDK information such as version, language, and type",
}, []string{LabelSDKType, LabelSDKVersion, LabelSDKLanguage})
)

// Generic forwarder metrics
var (
// ReadMessagesCount is used to indicate the number of total messages read
Expand Down
2 changes: 1 addition & 1 deletion pkg/mvtxdaemon/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
var (
monoVertexInfo = promauto.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "monovtx",
Name: "build_info",
Name: "daemon_build_info",
Help: "A metric with a constant value '1', labeled by Numaflow binary version and platform, as well as the mono vertex name",
}, []string{metrics.LabelVersion, metrics.LabelPlatform, metrics.LabelMonoVertexName})
)
20 changes: 12 additions & 8 deletions pkg/sdkclient/serverinfo/serverinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, err
minNumaflowVersion := serverInfo.MinimumNumaflowVersion
sdkLanguage := serverInfo.Language
numaflowVersion := numaflow.GetVersion().Version
containerType, err := getContainerType(filePath)
if err != nil {
return nil, fmt.Errorf("failed to get container type: %w", err)
containerType := getContainerType(filePath)
if containerType == ContainerTypeUnknown {
return nil, fmt.Errorf("unknown container type")
}

// If MinimumNumaflowVersion is empty, skip the numaflow compatibility check as there was an
Expand Down Expand Up @@ -221,11 +221,15 @@ func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, containerTyp

// getContainerType returns the container type from the server info file path
// serverInfoFilePath is in the format of "/var/run/numaflow/{ContainerType}-server-info"
func getContainerType(serverInfoFilePath string) (ContainerType, error) {
func getContainerType(serverInfoFilePath string) ContainerType {
splits := strings.Split(serverInfoFilePath, "/")
if containerType := strings.TrimSuffix(splits[len(splits)-1], "-server-info"); containerType == "" {
return "", fmt.Errorf("failed to get container type from server info file path: %s", serverInfoFilePath)
} else {
return ContainerType(containerType), nil
containerType := ContainerType(strings.TrimSuffix(splits[len(splits)-1], "-server-info"))
switch containerType {
case ContainerTypeSourcer, ContainerTypeSourcetransformer, ContainerTypeSinker, ContainerTypeMapper,
ContainerTypeReducer, ContainerTypeReducestreamer, ContainerTypeSessionreducer,
ContainerTypeSideinput, ContainerTypeFbsinker:
return containerType
default:
return ContainerTypeUnknown
}
}
20 changes: 10 additions & 10 deletions pkg/sdkclient/serverinfo/serverinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,16 @@ func Test_CheckNumaflowCompatibility(t *testing.T) {
func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
var testMinimumSupportedSDKVersions = sdkConstraints{
Python: map[ContainerType]string{
sourcer: "0.6.0rc100",
ContainerTypeSourcer: "0.6.0rc100",
},
Go: map[ContainerType]string{
sourcer: "0.6.0-z",
ContainerTypeSourcer: "0.6.0-z",
},
Java: map[ContainerType]string{
sourcer: "0.6.0-z",
ContainerTypeSourcer: "0.6.0-z",
},
Rust: map[ContainerType]string{
sourcer: "0.1.0-z",
ContainerTypeSourcer: "0.1.0-z",
},
}
tests := []struct {
Expand Down Expand Up @@ -283,7 +283,7 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions)
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, ContainerTypeSourcer, tt.minimumSupportedSDKVersions)
if tt.shouldErr {
assert.Error(t, err, "Expected error")
assert.Contains(t, err.Error(), tt.errMessage)
Expand All @@ -298,16 +298,16 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) {
var testMinimumSupportedSDKVersions = sdkConstraints{
Python: map[ContainerType]string{
sourcer: "0.6.0b1",
ContainerTypeSourcer: "0.6.0b1",
},
Go: map[ContainerType]string{
sourcer: "0.6.0-rc2",
ContainerTypeSourcer: "0.6.0-rc2",
},
Java: map[ContainerType]string{
sourcer: "0.6.0-rc2",
ContainerTypeSourcer: "0.6.0-rc2",
},
Rust: map[ContainerType]string{
sourcer: "0.1.0-rc3",
ContainerTypeSourcer: "0.1.0-rc3",
},
}
tests := []struct {
Expand Down Expand Up @@ -395,7 +395,7 @@ func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions)
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, ContainerTypeSourcer, tt.minimumSupportedSDKVersions)
if tt.shouldErr {
assert.Error(t, err, "Expected error")
assert.Contains(t, err.Error(), tt.errMessage)
Expand Down
91 changes: 46 additions & 45 deletions pkg/sdkclient/serverinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ type ContainerType string
// the string content matches the corresponding server info file name.
// DO NOT change it unless the server info file name is changed.
const (
sourcer ContainerType = "sourcer"
sourcetransformer ContainerType = "sourcetransformer"
sinker ContainerType = "sinker"
mapper ContainerType = "mapper"
reducer ContainerType = "reducer"
reducestreamer ContainerType = "reducestreamer"
sessionreducer ContainerType = "sessionreducer"
sideinput ContainerType = "sideinput"
fbsinker ContainerType = "fb-sinker"
ContainerTypeSourcer ContainerType = "sourcer"
ContainerTypeSourcetransformer ContainerType = "sourcetransformer"
ContainerTypeSinker ContainerType = "sinker"
ContainerTypeMapper ContainerType = "mapper"
ContainerTypeReducer ContainerType = "reducer"
ContainerTypeReducestreamer ContainerType = "reducestreamer"
ContainerTypeSessionreducer ContainerType = "sessionreducer"
ContainerTypeSideinput ContainerType = "sideinput"
ContainerTypeFbsinker ContainerType = "fb-sinker"
ContainerTypeUnknown ContainerType = "unknown"
)

type sdkConstraints map[Language]map[ContainerType]string
Expand Down Expand Up @@ -88,51 +89,51 @@ More details about version comparison can be found in the PEP 440 and semver doc
var minimumSupportedSDKVersions = sdkConstraints{
Python: map[ContainerType]string{
// meaning the minimum supported python SDK version is 0.8.0
sourcer: "0.8.0rc100",
sourcetransformer: "0.8.0rc100",
sinker: "0.8.0rc100",
mapper: "0.8.0rc100",
reducer: "0.8.0rc100",
reducestreamer: "0.8.0rc100",
sessionreducer: "0.8.0rc100",
sideinput: "0.8.0rc100",
fbsinker: "0.8.0rc100",
ContainerTypeSourcer: "0.8.0rc100",
ContainerTypeSourcetransformer: "0.8.0rc100",
ContainerTypeSinker: "0.8.0rc100",
ContainerTypeMapper: "0.8.0rc100",
ContainerTypeReducer: "0.8.0rc100",
ContainerTypeReducestreamer: "0.8.0rc100",
ContainerTypeSessionreducer: "0.8.0rc100",
ContainerTypeSideinput: "0.8.0rc100",
ContainerTypeFbsinker: "0.8.0rc100",
},
Go: map[ContainerType]string{
// meaning the minimum supported go SDK version is 0.8.0
sourcer: "0.8.0-z",
sourcetransformer: "0.8.0-z",
sinker: "0.8.0-z",
mapper: "0.8.0-z",
reducer: "0.8.0-z",
reducestreamer: "0.8.0-z",
sessionreducer: "0.8.0-z",
sideinput: "0.8.0-z",
fbsinker: "0.8.0-z",
ContainerTypeSourcer: "0.8.0-z",
ContainerTypeSourcetransformer: "0.8.0-z",
ContainerTypeSinker: "0.8.0-z",
ContainerTypeMapper: "0.8.0-z",
ContainerTypeReducer: "0.8.0-z",
ContainerTypeReducestreamer: "0.8.0-z",
ContainerTypeSessionreducer: "0.8.0-z",
ContainerTypeSideinput: "0.8.0-z",
ContainerTypeFbsinker: "0.8.0-z",
},
Java: map[ContainerType]string{
// meaning the minimum supported go SDK version is 0.8.0
sourcer: "0.8.0-z",
sourcetransformer: "0.8.0-z",
sinker: "0.8.0-z",
mapper: "0.8.0-z",
reducer: "0.8.0-z",
reducestreamer: "0.8.0-z",
sessionreducer: "0.8.0-z",
sideinput: "0.8.0-z",
fbsinker: "0.8.0-z",
ContainerTypeSourcer: "0.8.0-z",
ContainerTypeSourcetransformer: "0.8.0-z",
ContainerTypeSinker: "0.8.0-z",
ContainerTypeMapper: "0.8.0-z",
ContainerTypeReducer: "0.8.0-z",
ContainerTypeReducestreamer: "0.8.0-z",
ContainerTypeSessionreducer: "0.8.0-z",
ContainerTypeSideinput: "0.8.0-z",
ContainerTypeFbsinker: "0.8.0-z",
},
Rust: map[ContainerType]string{
// meaning the minimum supported go SDK version is 0.1.0
sourcer: "0.1.0-z",
sourcetransformer: "0.1.0-z",
sinker: "0.1.0-z",
mapper: "0.1.0-z",
reducer: "0.1.0-z",
reducestreamer: "0.1.0-z",
sessionreducer: "0.1.0-z",
sideinput: "0.1.0-z",
fbsinker: "0.1.0-z",
ContainerTypeSourcer: "0.1.0-z",
ContainerTypeSourcetransformer: "0.1.0-z",
ContainerTypeSinker: "0.1.0-z",
ContainerTypeMapper: "0.1.0-z",
ContainerTypeReducer: "0.1.0-z",
ContainerTypeReducestreamer: "0.1.0-z",
ContainerTypeSessionreducer: "0.1.0-z",
ContainerTypeSideinput: "0.1.0-z",
ContainerTypeFbsinker: "0.1.0-z",
},
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sideinputs/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/sdkclient"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
"github.com/numaproj/numaflow/pkg/sdkclient/sideinput"
Expand Down Expand Up @@ -87,6 +88,7 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(string(serverinfo.ContainerTypeSideinput), serverInfo.Version, string(serverInfo.Language)).Set(1)

// Create a new gRPC client for Side Input
sideInputClient, err := sideinput.New(serverInfo)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(string(serverinfo.ContainerTypeSinker), serverInfo.Version, string(serverInfo.Language)).Set(1)

sdkClient, err := sinkclient.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand Down Expand Up @@ -183,6 +184,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(string(serverinfo.ContainerTypeFbsinker), serverInfo.Version, string(serverInfo.Language)).Set(1)

sdkClient, err := sinkclient.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize), sdkclient.WithUdsSockAddr(sdkclient.FbSinkAddr))
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(string(serverinfo.ContainerTypeSourcer), serverInfo.Version, string(serverInfo.Language)).Set(1)

srcClient, err := sourceclient.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand Down Expand Up @@ -238,6 +239,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(string(serverinfo.ContainerTypeSourcetransformer), serverInfo.Version, string(serverInfo.Language)).Set(1)

srcTransformerClient, err := sourcetransformer.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/udf/map_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(string(serverinfo.ContainerTypeMapper), serverInfo.Version, string(serverInfo.Language)).Set(1)

// track all the resources that need to be closed
var resourcesToClose []io.Closer
Expand Down
3 changes: 3 additions & 0 deletions pkg/udf/reduce_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(string(serverinfo.ContainerTypeReducestreamer), serverInfo.Version, string(serverInfo.Language)).Set(1)
client, err = reducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize), sdkclient.WithUdsSockAddr(sdkclient.ReduceStreamAddr))
} else {
// Wait for server info to be ready
serverInfo, err = serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.ReduceServerInfoFile))
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(string(serverinfo.ContainerTypeReducer), serverInfo.Version, string(serverInfo.Language)).Set(1)
client, err = reducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
}
if err != nil {
Expand All @@ -134,6 +136,7 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(string(serverinfo.ContainerTypeSessionreducer), serverInfo.Version, string(serverInfo.Language)).Set(1)

client, err := sessionreducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand Down
26 changes: 2 additions & 24 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading