From 3706bfb7fc6bcd4cc0751c4307aafd0c2ce689cd Mon Sep 17 00:00:00 2001 From: Suhas Karanth Date: Mon, 7 Aug 2023 11:11:17 +0530 Subject: [PATCH] feat: instrument worker with OpenTelemetry (#50) - Add Stats(...) method to JobProcessor to allow tracking counts of active and dead jobs by job type. - Add JobProcessor middleware for OpenTelemetry instrumentation. - Add middlewares to handlers for dead job managers's endpoints. - Add histogram compass.worker.jobs.enqueue.duration with the following attributes: - job.types: ["index-asset", "delete-asset"] - operation.success: true/false - Add gauge compass.worker.active_jobs with the following attributes: - job.type: index-asset/delete-asset - Add histogram compass.worker.job.dequeue.latency with the following attributes: - job.type: index-asset/delete-asset - Add histogram compass.worker.job.process.duration with the following attributes: - job.type: index-asset/delete-asset - job.attempt_number: 1/2 - job.status: done/dead/retry - operation.success: true/false - Add gauge compass.worker.dead_jobs with the following attributes: - job.type: index-asset/delete-asset --- Makefile | 2 +- core/asset/mocks/asset_repository.go | 2 +- core/asset/mocks/discovery_repository.go | 47 ++++++- core/asset/mocks/lineage_repository.go | 2 +- core/asset/mocks/worker_mock.go | 43 +++++- .../discussion/mocks/discussion_repository.go | 2 +- core/star/mocks/star_repository.go | 2 +- core/tag/mocks/tag_repository.go | 2 +- core/tag/mocks/tag_template_repository.go | 2 +- core/user/mocks/user_repository.go | 2 +- go.mod | 2 + go.sum | 4 + .../server/v1beta1/mocks/asset_service.go | 124 ++++++++++++++++-- .../v1beta1/mocks/discussion_service.go | 2 +- internal/server/v1beta1/mocks/star_service.go | 2 +- .../server/v1beta1/mocks/statsd_monitor.go | 2 +- internal/server/v1beta1/mocks/tag_service.go | 2 +- .../v1beta1/mocks/tag_template_service.go | 2 +- internal/server/v1beta1/mocks/user_service.go | 2 +- .../mocks/discovery_repository_mock.go | 6 +- internal/workermanager/mocks/worker_mock.go | 2 +- internal/workermanager/worker_manager.go | 84 ++++++++++-- pkg/grpc_interceptor/mocks/statsd_monitor.go | 2 +- pkg/worker/dead_jobs_handler.go | 31 ++++- pkg/worker/job_processor.go | 12 ++ pkg/worker/mocks/dead_job_manager_mock.go | 2 +- pkg/worker/mocks/job_processor_mock.go | 56 +++++++- pkg/worker/pgq/pgq_processor.go | 33 +++++ pkg/worker/pgq/pgq_processor_test.go | 80 +++++++++++ .../job_processor_instrumentation_mw.go | 116 ++++++++++++++++ 30 files changed, 615 insertions(+), 57 deletions(-) create mode 100644 pkg/worker/workermw/job_processor_instrumentation_mw.go diff --git a/Makefile b/Makefile index b931e86c..35411aa6 100644 --- a/Makefile +++ b/Makefile @@ -65,7 +65,7 @@ proto: $(TOOLS_DIR)/buf ## Generate the protobuf files @echo " > protobuf compilation finished" generate: $(TOOLS_DIR)/mockery ## Run all go generate in the code base - go generate ./... + PATH="$(TOOLS_DIR):${PATH}" go generate ./... clean: ## Clean the build artifacts rm -rf compass dist/ diff --git a/core/asset/mocks/asset_repository.go b/core/asset/mocks/asset_repository.go index 4311ae41..c125b7b6 100644 --- a/core/asset/mocks/asset_repository.go +++ b/core/asset/mocks/asset_repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/core/asset/mocks/discovery_repository.go b/core/asset/mocks/discovery_repository.go index 00c396cb..c9c3d6b5 100644 --- a/core/asset/mocks/discovery_repository.go +++ b/core/asset/mocks/discovery_repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks @@ -61,6 +61,11 @@ func (_c *DiscoveryRepository_DeleteByID_Call) Return(_a0 error) *DiscoveryRepos return _c } +func (_c *DiscoveryRepository_DeleteByID_Call) RunAndReturn(run func(context.Context, string) error) *DiscoveryRepository_DeleteByID_Call { + _c.Call.Return(run) + return _c +} + // DeleteByURN provides a mock function with given fields: ctx, assetURN func (_m *DiscoveryRepository) DeleteByURN(ctx context.Context, assetURN string) error { ret := _m.Called(ctx, assetURN) @@ -99,11 +104,20 @@ func (_c *DiscoveryRepository_DeleteByURN_Call) Return(_a0 error) *DiscoveryRepo return _c } +func (_c *DiscoveryRepository_DeleteByURN_Call) RunAndReturn(run func(context.Context, string) error) *DiscoveryRepository_DeleteByURN_Call { + _c.Call.Return(run) + return _c +} + // GroupAssets provides a mock function with given fields: ctx, cfg func (_m *DiscoveryRepository) GroupAssets(ctx context.Context, cfg asset.GroupConfig) ([]asset.GroupResult, error) { ret := _m.Called(ctx, cfg) var r0 []asset.GroupResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, asset.GroupConfig) ([]asset.GroupResult, error)); ok { + return rf(ctx, cfg) + } if rf, ok := ret.Get(0).(func(context.Context, asset.GroupConfig) []asset.GroupResult); ok { r0 = rf(ctx, cfg) } else { @@ -112,7 +126,6 @@ func (_m *DiscoveryRepository) GroupAssets(ctx context.Context, cfg asset.GroupC } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, asset.GroupConfig) error); ok { r1 = rf(ctx, cfg) } else { @@ -146,11 +159,20 @@ func (_c *DiscoveryRepository_GroupAssets_Call) Return(results []asset.GroupResu return _c } +func (_c *DiscoveryRepository_GroupAssets_Call) RunAndReturn(run func(context.Context, asset.GroupConfig) ([]asset.GroupResult, error)) *DiscoveryRepository_GroupAssets_Call { + _c.Call.Return(run) + return _c +} + // Search provides a mock function with given fields: ctx, cfg func (_m *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchConfig) ([]asset.SearchResult, error) { ret := _m.Called(ctx, cfg) var r0 []asset.SearchResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, asset.SearchConfig) ([]asset.SearchResult, error)); ok { + return rf(ctx, cfg) + } if rf, ok := ret.Get(0).(func(context.Context, asset.SearchConfig) []asset.SearchResult); ok { r0 = rf(ctx, cfg) } else { @@ -159,7 +181,6 @@ func (_m *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchConfi } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, asset.SearchConfig) error); ok { r1 = rf(ctx, cfg) } else { @@ -193,11 +214,20 @@ func (_c *DiscoveryRepository_Search_Call) Return(results []asset.SearchResult, return _c } +func (_c *DiscoveryRepository_Search_Call) RunAndReturn(run func(context.Context, asset.SearchConfig) ([]asset.SearchResult, error)) *DiscoveryRepository_Search_Call { + _c.Call.Return(run) + return _c +} + // Suggest provides a mock function with given fields: ctx, cfg func (_m *DiscoveryRepository) Suggest(ctx context.Context, cfg asset.SearchConfig) ([]string, error) { ret := _m.Called(ctx, cfg) var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, asset.SearchConfig) ([]string, error)); ok { + return rf(ctx, cfg) + } if rf, ok := ret.Get(0).(func(context.Context, asset.SearchConfig) []string); ok { r0 = rf(ctx, cfg) } else { @@ -206,7 +236,6 @@ func (_m *DiscoveryRepository) Suggest(ctx context.Context, cfg asset.SearchConf } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, asset.SearchConfig) error); ok { r1 = rf(ctx, cfg) } else { @@ -240,6 +269,11 @@ func (_c *DiscoveryRepository_Suggest_Call) Return(suggestions []string, err err return _c } +func (_c *DiscoveryRepository_Suggest_Call) RunAndReturn(run func(context.Context, asset.SearchConfig) ([]string, error)) *DiscoveryRepository_Suggest_Call { + _c.Call.Return(run) + return _c +} + // Upsert provides a mock function with given fields: _a0, _a1 func (_m *DiscoveryRepository) Upsert(_a0 context.Context, _a1 asset.Asset) error { ret := _m.Called(_a0, _a1) @@ -278,6 +312,11 @@ func (_c *DiscoveryRepository_Upsert_Call) Return(_a0 error) *DiscoveryRepositor return _c } +func (_c *DiscoveryRepository_Upsert_Call) RunAndReturn(run func(context.Context, asset.Asset) error) *DiscoveryRepository_Upsert_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewDiscoveryRepository interface { mock.TestingT Cleanup(func()) diff --git a/core/asset/mocks/lineage_repository.go b/core/asset/mocks/lineage_repository.go index 8e08a40d..70028ffd 100644 --- a/core/asset/mocks/lineage_repository.go +++ b/core/asset/mocks/lineage_repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/core/asset/mocks/worker_mock.go b/core/asset/mocks/worker_mock.go index fbc53e94..3067275f 100644 --- a/core/asset/mocks/worker_mock.go +++ b/core/asset/mocks/worker_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks @@ -23,6 +23,47 @@ func (_m *Worker) EXPECT() *Worker_Expecter { return &Worker_Expecter{mock: &_m.Mock} } +// Close provides a mock function with given fields: +func (_m *Worker) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Worker_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type Worker_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *Worker_Expecter) Close() *Worker_Close_Call { + return &Worker_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *Worker_Close_Call) Run(run func()) *Worker_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Worker_Close_Call) Return(_a0 error) *Worker_Close_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Worker_Close_Call) RunAndReturn(run func() error) *Worker_Close_Call { + _c.Call.Return(run) + return _c +} + // EnqueueDeleteAssetJob provides a mock function with given fields: ctx, urn func (_m *Worker) EnqueueDeleteAssetJob(ctx context.Context, urn string) error { ret := _m.Called(ctx, urn) diff --git a/core/discussion/mocks/discussion_repository.go b/core/discussion/mocks/discussion_repository.go index e47df0a6..792cc1b4 100644 --- a/core/discussion/mocks/discussion_repository.go +++ b/core/discussion/mocks/discussion_repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/core/star/mocks/star_repository.go b/core/star/mocks/star_repository.go index 45616351..0aca8af0 100644 --- a/core/star/mocks/star_repository.go +++ b/core/star/mocks/star_repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/core/tag/mocks/tag_repository.go b/core/tag/mocks/tag_repository.go index 26ba4257..c08580e2 100644 --- a/core/tag/mocks/tag_repository.go +++ b/core/tag/mocks/tag_repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/core/tag/mocks/tag_template_repository.go b/core/tag/mocks/tag_template_repository.go index 1230093c..9301ea37 100644 --- a/core/tag/mocks/tag_template_repository.go +++ b/core/tag/mocks/tag_template_repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/core/user/mocks/user_repository.go b/core/user/mocks/user_repository.go index 1af844f3..e0d78746 100644 --- a/core/user/mocks/user_repository.go +++ b/core/user/mocks/user_repository.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/go.mod b/go.mod index f3f8d82f..972f5298 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( go.nhat.io/otelsql v0.11.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0 go.opentelemetry.io/contrib/instrumentation/host v0.42.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.1-0.20230731182153-525d6c05bd62 go.opentelemetry.io/contrib/instrumentation/runtime v0.42.0 go.opentelemetry.io/contrib/samplers/probability/consistent v0.11.0 go.opentelemetry.io/otel v1.16.0 @@ -75,6 +76,7 @@ require ( github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.4.0 // indirect github.com/fatih/color v1.13.0 // indirect + github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index a860f9f5..920ddb20 100644 --- a/go.sum +++ b/go.sum @@ -475,6 +475,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= +github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -1419,6 +1421,8 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4 go.opentelemetry.io/contrib/instrumentation/host v0.42.0 h1:/GMlvboQJd4LWxNX/oGYLv06J5a/M/flauLruM/3U2g= go.opentelemetry.io/contrib/instrumentation/host v0.42.0/go.mod h1:w6v1mVemRjTTdfejACjf+LgVA6zKtHOWmdAIf3icx7A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.1-0.20230731182153-525d6c05bd62 h1:4Eb2RlvDQ9Gpp+qEsdUHa4cEX1NJdv/g9Y4MYprBqcw= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.1-0.20230731182153-525d6c05bd62/go.mod h1:/Q7SZ3KqNy3qTGotS1MKC8lO5z0CFSeacGbjfgMa4sk= go.opentelemetry.io/contrib/instrumentation/runtime v0.42.0 h1:EbmAUG9hEAMXyfWEasIt2kmh/WmXUznUksChApTgBGc= go.opentelemetry.io/contrib/instrumentation/runtime v0.42.0/go.mod h1:rD9feqRYP24P14t5kmhNMqsqm1jvKmpx2H2rKVw52V8= go.opentelemetry.io/contrib/samplers/probability/consistent v0.11.0 h1:K4HJhe01GFkIo9JZ8iX1hMeI5MrkEMggXQM0YJamtEQ= diff --git a/internal/server/v1beta1/mocks/asset_service.go b/internal/server/v1beta1/mocks/asset_service.go index ac77eecd..7834952e 100644 --- a/internal/server/v1beta1/mocks/asset_service.go +++ b/internal/server/v1beta1/mocks/asset_service.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks @@ -62,6 +62,11 @@ func (_c *AssetService_AddProbe_Call) Return(_a0 error) *AssetService_AddProbe_C return _c } +func (_c *AssetService_AddProbe_Call) RunAndReturn(run func(context.Context, string, *asset.Probe) error) *AssetService_AddProbe_Call { + _c.Call.Return(run) + return _c +} + // DeleteAsset provides a mock function with given fields: ctx, id func (_m *AssetService) DeleteAsset(ctx context.Context, id string) error { ret := _m.Called(ctx, id) @@ -100,11 +105,21 @@ func (_c *AssetService_DeleteAsset_Call) Return(_a0 error) *AssetService_DeleteA return _c } +func (_c *AssetService_DeleteAsset_Call) RunAndReturn(run func(context.Context, string) error) *AssetService_DeleteAsset_Call { + _c.Call.Return(run) + return _c +} + // GetAllAssets provides a mock function with given fields: ctx, flt, withTotal func (_m *AssetService) GetAllAssets(ctx context.Context, flt asset.Filter, withTotal bool) ([]asset.Asset, uint32, error) { ret := _m.Called(ctx, flt, withTotal) var r0 []asset.Asset + var r1 uint32 + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, asset.Filter, bool) ([]asset.Asset, uint32, error)); ok { + return rf(ctx, flt, withTotal) + } if rf, ok := ret.Get(0).(func(context.Context, asset.Filter, bool) []asset.Asset); ok { r0 = rf(ctx, flt, withTotal) } else { @@ -113,14 +128,12 @@ func (_m *AssetService) GetAllAssets(ctx context.Context, flt asset.Filter, with } } - var r1 uint32 if rf, ok := ret.Get(1).(func(context.Context, asset.Filter, bool) uint32); ok { r1 = rf(ctx, flt, withTotal) } else { r1 = ret.Get(1).(uint32) } - var r2 error if rf, ok := ret.Get(2).(func(context.Context, asset.Filter, bool) error); ok { r2 = rf(ctx, flt, withTotal) } else { @@ -155,18 +168,26 @@ func (_c *AssetService_GetAllAssets_Call) Return(_a0 []asset.Asset, _a1 uint32, return _c } +func (_c *AssetService_GetAllAssets_Call) RunAndReturn(run func(context.Context, asset.Filter, bool) ([]asset.Asset, uint32, error)) *AssetService_GetAllAssets_Call { + _c.Call.Return(run) + return _c +} + // GetAssetByID provides a mock function with given fields: ctx, id func (_m *AssetService) GetAssetByID(ctx context.Context, id string) (asset.Asset, error) { ret := _m.Called(ctx, id) var r0 asset.Asset + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (asset.Asset, error)); ok { + return rf(ctx, id) + } if rf, ok := ret.Get(0).(func(context.Context, string) asset.Asset); ok { r0 = rf(ctx, id) } else { r0 = ret.Get(0).(asset.Asset) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, id) } else { @@ -200,18 +221,26 @@ func (_c *AssetService_GetAssetByID_Call) Return(_a0 asset.Asset, _a1 error) *As return _c } +func (_c *AssetService_GetAssetByID_Call) RunAndReturn(run func(context.Context, string) (asset.Asset, error)) *AssetService_GetAssetByID_Call { + _c.Call.Return(run) + return _c +} + // GetAssetByVersion provides a mock function with given fields: ctx, id, version func (_m *AssetService) GetAssetByVersion(ctx context.Context, id string, version string) (asset.Asset, error) { ret := _m.Called(ctx, id, version) var r0 asset.Asset + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (asset.Asset, error)); ok { + return rf(ctx, id, version) + } if rf, ok := ret.Get(0).(func(context.Context, string, string) asset.Asset); ok { r0 = rf(ctx, id, version) } else { r0 = ret.Get(0).(asset.Asset) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { r1 = rf(ctx, id, version) } else { @@ -246,11 +275,20 @@ func (_c *AssetService_GetAssetByVersion_Call) Return(_a0 asset.Asset, _a1 error return _c } +func (_c *AssetService_GetAssetByVersion_Call) RunAndReturn(run func(context.Context, string, string) (asset.Asset, error)) *AssetService_GetAssetByVersion_Call { + _c.Call.Return(run) + return _c +} + // GetAssetVersionHistory provides a mock function with given fields: ctx, flt, id func (_m *AssetService) GetAssetVersionHistory(ctx context.Context, flt asset.Filter, id string) ([]asset.Asset, error) { ret := _m.Called(ctx, flt, id) var r0 []asset.Asset + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, asset.Filter, string) ([]asset.Asset, error)); ok { + return rf(ctx, flt, id) + } if rf, ok := ret.Get(0).(func(context.Context, asset.Filter, string) []asset.Asset); ok { r0 = rf(ctx, flt, id) } else { @@ -259,7 +297,6 @@ func (_m *AssetService) GetAssetVersionHistory(ctx context.Context, flt asset.Fi } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, asset.Filter, string) error); ok { r1 = rf(ctx, flt, id) } else { @@ -294,18 +331,26 @@ func (_c *AssetService_GetAssetVersionHistory_Call) Return(_a0 []asset.Asset, _a return _c } +func (_c *AssetService_GetAssetVersionHistory_Call) RunAndReturn(run func(context.Context, asset.Filter, string) ([]asset.Asset, error)) *AssetService_GetAssetVersionHistory_Call { + _c.Call.Return(run) + return _c +} + // GetLineage provides a mock function with given fields: ctx, urn, query func (_m *AssetService) GetLineage(ctx context.Context, urn string, query asset.LineageQuery) (asset.Lineage, error) { ret := _m.Called(ctx, urn, query) var r0 asset.Lineage + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, asset.LineageQuery) (asset.Lineage, error)); ok { + return rf(ctx, urn, query) + } if rf, ok := ret.Get(0).(func(context.Context, string, asset.LineageQuery) asset.Lineage); ok { r0 = rf(ctx, urn, query) } else { r0 = ret.Get(0).(asset.Lineage) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, asset.LineageQuery) error); ok { r1 = rf(ctx, urn, query) } else { @@ -340,11 +385,20 @@ func (_c *AssetService_GetLineage_Call) Return(_a0 asset.Lineage, _a1 error) *As return _c } +func (_c *AssetService_GetLineage_Call) RunAndReturn(run func(context.Context, string, asset.LineageQuery) (asset.Lineage, error)) *AssetService_GetLineage_Call { + _c.Call.Return(run) + return _c +} + // GetTypes provides a mock function with given fields: ctx, flt func (_m *AssetService) GetTypes(ctx context.Context, flt asset.Filter) (map[asset.Type]int, error) { ret := _m.Called(ctx, flt) var r0 map[asset.Type]int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, asset.Filter) (map[asset.Type]int, error)); ok { + return rf(ctx, flt) + } if rf, ok := ret.Get(0).(func(context.Context, asset.Filter) map[asset.Type]int); ok { r0 = rf(ctx, flt) } else { @@ -353,7 +407,6 @@ func (_m *AssetService) GetTypes(ctx context.Context, flt asset.Filter) (map[ass } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, asset.Filter) error); ok { r1 = rf(ctx, flt) } else { @@ -387,11 +440,20 @@ func (_c *AssetService_GetTypes_Call) Return(_a0 map[asset.Type]int, _a1 error) return _c } +func (_c *AssetService_GetTypes_Call) RunAndReturn(run func(context.Context, asset.Filter) (map[asset.Type]int, error)) *AssetService_GetTypes_Call { + _c.Call.Return(run) + return _c +} + // GroupAssets provides a mock function with given fields: ctx, cfg func (_m *AssetService) GroupAssets(ctx context.Context, cfg asset.GroupConfig) ([]asset.GroupResult, error) { ret := _m.Called(ctx, cfg) var r0 []asset.GroupResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, asset.GroupConfig) ([]asset.GroupResult, error)); ok { + return rf(ctx, cfg) + } if rf, ok := ret.Get(0).(func(context.Context, asset.GroupConfig) []asset.GroupResult); ok { r0 = rf(ctx, cfg) } else { @@ -400,7 +462,6 @@ func (_m *AssetService) GroupAssets(ctx context.Context, cfg asset.GroupConfig) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, asset.GroupConfig) error); ok { r1 = rf(ctx, cfg) } else { @@ -434,11 +495,20 @@ func (_c *AssetService_GroupAssets_Call) Return(results []asset.GroupResult, err return _c } +func (_c *AssetService_GroupAssets_Call) RunAndReturn(run func(context.Context, asset.GroupConfig) ([]asset.GroupResult, error)) *AssetService_GroupAssets_Call { + _c.Call.Return(run) + return _c +} + // SearchAssets provides a mock function with given fields: ctx, cfg func (_m *AssetService) SearchAssets(ctx context.Context, cfg asset.SearchConfig) ([]asset.SearchResult, error) { ret := _m.Called(ctx, cfg) var r0 []asset.SearchResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, asset.SearchConfig) ([]asset.SearchResult, error)); ok { + return rf(ctx, cfg) + } if rf, ok := ret.Get(0).(func(context.Context, asset.SearchConfig) []asset.SearchResult); ok { r0 = rf(ctx, cfg) } else { @@ -447,7 +517,6 @@ func (_m *AssetService) SearchAssets(ctx context.Context, cfg asset.SearchConfig } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, asset.SearchConfig) error); ok { r1 = rf(ctx, cfg) } else { @@ -481,11 +550,20 @@ func (_c *AssetService_SearchAssets_Call) Return(results []asset.SearchResult, e return _c } +func (_c *AssetService_SearchAssets_Call) RunAndReturn(run func(context.Context, asset.SearchConfig) ([]asset.SearchResult, error)) *AssetService_SearchAssets_Call { + _c.Call.Return(run) + return _c +} + // SuggestAssets provides a mock function with given fields: ctx, cfg func (_m *AssetService) SuggestAssets(ctx context.Context, cfg asset.SearchConfig) ([]string, error) { ret := _m.Called(ctx, cfg) var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, asset.SearchConfig) ([]string, error)); ok { + return rf(ctx, cfg) + } if rf, ok := ret.Get(0).(func(context.Context, asset.SearchConfig) []string); ok { r0 = rf(ctx, cfg) } else { @@ -494,7 +572,6 @@ func (_m *AssetService) SuggestAssets(ctx context.Context, cfg asset.SearchConfi } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, asset.SearchConfig) error); ok { r1 = rf(ctx, cfg) } else { @@ -528,18 +605,26 @@ func (_c *AssetService_SuggestAssets_Call) Return(suggestions []string, err erro return _c } +func (_c *AssetService_SuggestAssets_Call) RunAndReturn(run func(context.Context, asset.SearchConfig) ([]string, error)) *AssetService_SuggestAssets_Call { + _c.Call.Return(run) + return _c +} + // UpsertAsset provides a mock function with given fields: ctx, ast, upstreams, downstreams func (_m *AssetService) UpsertAsset(ctx context.Context, ast *asset.Asset, upstreams []string, downstreams []string) (string, error) { ret := _m.Called(ctx, ast, upstreams, downstreams) var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *asset.Asset, []string, []string) (string, error)); ok { + return rf(ctx, ast, upstreams, downstreams) + } if rf, ok := ret.Get(0).(func(context.Context, *asset.Asset, []string, []string) string); ok { r0 = rf(ctx, ast, upstreams, downstreams) } else { r0 = ret.Get(0).(string) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *asset.Asset, []string, []string) error); ok { r1 = rf(ctx, ast, upstreams, downstreams) } else { @@ -575,18 +660,26 @@ func (_c *AssetService_UpsertAsset_Call) Return(_a0 string, _a1 error) *AssetSer return _c } +func (_c *AssetService_UpsertAsset_Call) RunAndReturn(run func(context.Context, *asset.Asset, []string, []string) (string, error)) *AssetService_UpsertAsset_Call { + _c.Call.Return(run) + return _c +} + // UpsertAssetWithoutLineage provides a mock function with given fields: ctx, ast func (_m *AssetService) UpsertAssetWithoutLineage(ctx context.Context, ast *asset.Asset) (string, error) { ret := _m.Called(ctx, ast) var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *asset.Asset) (string, error)); ok { + return rf(ctx, ast) + } if rf, ok := ret.Get(0).(func(context.Context, *asset.Asset) string); ok { r0 = rf(ctx, ast) } else { r0 = ret.Get(0).(string) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *asset.Asset) error); ok { r1 = rf(ctx, ast) } else { @@ -620,6 +713,11 @@ func (_c *AssetService_UpsertAssetWithoutLineage_Call) Return(_a0 string, _a1 er return _c } +func (_c *AssetService_UpsertAssetWithoutLineage_Call) RunAndReturn(run func(context.Context, *asset.Asset) (string, error)) *AssetService_UpsertAssetWithoutLineage_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewAssetService interface { mock.TestingT Cleanup(func()) diff --git a/internal/server/v1beta1/mocks/discussion_service.go b/internal/server/v1beta1/mocks/discussion_service.go index 9e4674ab..f65b9a6a 100644 --- a/internal/server/v1beta1/mocks/discussion_service.go +++ b/internal/server/v1beta1/mocks/discussion_service.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/internal/server/v1beta1/mocks/star_service.go b/internal/server/v1beta1/mocks/star_service.go index e2970046..d8c99f4e 100644 --- a/internal/server/v1beta1/mocks/star_service.go +++ b/internal/server/v1beta1/mocks/star_service.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/internal/server/v1beta1/mocks/statsd_monitor.go b/internal/server/v1beta1/mocks/statsd_monitor.go index 943bc089..bfe5e15e 100644 --- a/internal/server/v1beta1/mocks/statsd_monitor.go +++ b/internal/server/v1beta1/mocks/statsd_monitor.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/internal/server/v1beta1/mocks/tag_service.go b/internal/server/v1beta1/mocks/tag_service.go index 6026dceb..9e22652c 100644 --- a/internal/server/v1beta1/mocks/tag_service.go +++ b/internal/server/v1beta1/mocks/tag_service.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/internal/server/v1beta1/mocks/tag_template_service.go b/internal/server/v1beta1/mocks/tag_template_service.go index 7139fd29..1875751a 100644 --- a/internal/server/v1beta1/mocks/tag_template_service.go +++ b/internal/server/v1beta1/mocks/tag_template_service.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/internal/server/v1beta1/mocks/user_service.go b/internal/server/v1beta1/mocks/user_service.go index d1bf96f0..91c41229 100644 --- a/internal/server/v1beta1/mocks/user_service.go +++ b/internal/server/v1beta1/mocks/user_service.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/internal/workermanager/mocks/discovery_repository_mock.go b/internal/workermanager/mocks/discovery_repository_mock.go index 12b41a90..39b079cf 100644 --- a/internal/workermanager/mocks/discovery_repository_mock.go +++ b/internal/workermanager/mocks/discovery_repository_mock.go @@ -1,12 +1,12 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks import ( - asset "github.com/goto/compass/core/asset" - context "context" + asset "github.com/goto/compass/core/asset" + mock "github.com/stretchr/testify/mock" ) diff --git a/internal/workermanager/mocks/worker_mock.go b/internal/workermanager/mocks/worker_mock.go index ee408d04..7107f9f2 100644 --- a/internal/workermanager/mocks/worker_mock.go +++ b/internal/workermanager/mocks/worker_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/internal/workermanager/worker_manager.go b/internal/workermanager/worker_manager.go index bed3d8cc..5b41a284 100644 --- a/internal/workermanager/worker_manager.go +++ b/internal/workermanager/worker_manager.go @@ -10,12 +10,16 @@ import ( "github.com/goto/compass/pkg/worker" "github.com/goto/compass/pkg/worker/pgq" + "github.com/goto/compass/pkg/worker/workermw" "github.com/goto/salt/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" ) type Manager struct { processor *pgq.Processor - registered atomic.Bool + initDone atomic.Bool worker Worker jobManagerPort int discoveryRepo DiscoveryRepository @@ -52,7 +56,7 @@ func New(ctx context.Context, deps Deps) (*Manager, error) { } w, err := worker.New( - processor, + workermw.WithJobProcessorInstrumentation()(processor), worker.WithRunConfig(cfg.WorkerCount, cfg.PollInterval), worker.WithLogger(deps.Logger), ) @@ -77,8 +81,8 @@ func NewWithWorker(w Worker, deps Deps) *Manager { } func (m *Manager) Run(ctx context.Context) error { - if err := m.register(); err != nil { - return fmt.Errorf("run async worker: %w", err) + if err := m.init(); err != nil { + return fmt.Errorf("run async worker: init: %w", err) } go func() { @@ -97,25 +101,83 @@ func (m *Manager) Run(ctx context.Context) error { return m.worker.Run(ctx) } -func (m *Manager) register() error { - if m.registered.Load() { +func (m *Manager) init() error { + if m.initDone.Load() { return nil } + m.initDone.Store(true) - for typ, h := range map[string]worker.JobHandler{ + jobHandlers := map[string]worker.JobHandler{ jobIndexAsset: m.indexAssetHandler(), jobDeleteAsset: m.deleteAssetHandler(), - } { + } + for typ, h := range jobHandlers { if err := m.worker.Register(typ, h); err != nil { return err } } - m.registered.Store(true) - - return nil + return m.registerStatsCallback(keys(jobHandlers)) } func (m *Manager) Close() error { return m.processor.Close() } + +func (m *Manager) registerStatsCallback(jobTypes []string) error { + const attrJobType = attribute.Key("job.type") + + meter := otel.Meter("github.com/goto/compass/internal/workermanager") + activeJobs, err := meter.Int64ObservableGauge("compass.worker.active_jobs") + handleOtelErr(err) + + deadJobs, err := meter.Int64ObservableGauge("compass.worker.dead_jobs") + handleOtelErr(err) + + _, err = meter.RegisterCallback( + func(ctx context.Context, o metric.Observer) error { + stats, err := m.processor.Stats(ctx) + if err != nil { + return err + } + + seen := make(map[string]struct{}, len(jobTypes)) + for _, st := range stats { + seen[st.Type] = struct{}{} + attr := metric.WithAttributes(attrJobType.String(st.Type)) + o.ObserveInt64(activeJobs, (int64)(st.Active), attr) + o.ObserveInt64(deadJobs, (int64)(st.Dead), attr) + } + + for _, typ := range jobTypes { + if _, ok := seen[typ]; ok { + continue + } + + attr := metric.WithAttributes(attrJobType.String(typ)) + o.ObserveInt64(activeJobs, 0, attr) + o.ObserveInt64(deadJobs, 0, attr) + } + + return nil + }, + activeJobs, + deadJobs, + ) + + return err +} + +func keys(handlers map[string]worker.JobHandler) []string { + types := make([]string, 0, len(handlers)) + for typ := range handlers { + types = append(types, typ) + } + return types +} + +func handleOtelErr(err error) { + if err != nil { + otel.Handle(err) + } +} diff --git a/pkg/grpc_interceptor/mocks/statsd_monitor.go b/pkg/grpc_interceptor/mocks/statsd_monitor.go index 7ed1e439..fbd9b7ab 100644 --- a/pkg/grpc_interceptor/mocks/statsd_monitor.go +++ b/pkg/grpc_interceptor/mocks/statsd_monitor.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/pkg/worker/dead_jobs_handler.go b/pkg/worker/dead_jobs_handler.go index ec6d9699..286f902f 100644 --- a/pkg/worker/dead_jobs_handler.go +++ b/pkg/worker/dead_jobs_handler.go @@ -9,6 +9,8 @@ import ( "net/url" "strconv" "strings" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) //go:embed dead_jobs_page.html @@ -36,13 +38,28 @@ const ( // - /clear-jobs: Remove specified dead jobs from dead_jobs table. func DeadJobManagementHandler(mgr DeadJobManager) http.Handler { mux := http.NewServeMux() - mux.HandleFunc(listDeadJobsPath, deadJobsHandler(mgr)) - mux.HandleFunc(resurrectJobsPath, jobsTransformerHandler(func(ctx context.Context, jobIDs []string) error { - return mgr.Resurrect(ctx, jobIDs) - })) - mux.HandleFunc(clearJobsPath, jobsTransformerHandler(func(ctx context.Context, jobIDs []string) error { - return mgr.ClearDeadJobs(ctx, jobIDs) - })) + mux.Handle( + listDeadJobsPath, + otelhttp.NewMiddleware("list_dead_jobs")( + otelhttp.WithRouteTag(listDeadJobsPath, deadJobsHandler(mgr)), + ), + ) + mux.Handle( + resurrectJobsPath, + otelhttp.NewMiddleware("resurrect_jobs")( + otelhttp.WithRouteTag(resurrectJobsPath, jobsTransformerHandler(func(ctx context.Context, jobIDs []string) error { + return mgr.Resurrect(ctx, jobIDs) + })), + ), + ) + mux.Handle( + clearJobsPath, + otelhttp.NewMiddleware("clear_jobs")( + otelhttp.WithRouteTag(clearJobsPath, jobsTransformerHandler(func(ctx context.Context, jobIDs []string) error { + return mgr.ClearDeadJobs(ctx, jobIDs) + })), + ), + ) return mux } diff --git a/pkg/worker/job_processor.go b/pkg/worker/job_processor.go index 0789efff..3c1dd7fa 100644 --- a/pkg/worker/job_processor.go +++ b/pkg/worker/job_processor.go @@ -17,6 +17,18 @@ type JobProcessor interface { // Process is also responsible for clearing the job or marking the job as // dead or setting up the retry for the job depending on the job result. Process(ctx context.Context, types []string, fn JobExecutorFunc) error + + // Stats returns the job statistics by job type. It includes the number of + // active and dead jobs. + Stats(ctx context.Context) ([]JobTypeStats, error) +} + +// JobTypeStats is the statistics for the job type with number of active and +// dead jobs. +type JobTypeStats struct { + Type string `json:"type"` + Active int `json:"active"` + Dead int `json:"dead"` } // JobExecutorFunc is invoked by JobProcessor for ready jobs. It is responsible diff --git a/pkg/worker/mocks/dead_job_manager_mock.go b/pkg/worker/mocks/dead_job_manager_mock.go index dcda7ba5..5a9eaf98 100644 --- a/pkg/worker/mocks/dead_job_manager_mock.go +++ b/pkg/worker/mocks/dead_job_manager_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks diff --git a/pkg/worker/mocks/job_processor_mock.go b/pkg/worker/mocks/job_processor_mock.go index c6bbb091..763319ad 100644 --- a/pkg/worker/mocks/job_processor_mock.go +++ b/pkg/worker/mocks/job_processor_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.25.1. DO NOT EDIT. +// Code generated by mockery v2.28.1. DO NOT EDIT. package mocks @@ -123,6 +123,60 @@ func (_c *JobProcessor_Process_Call) RunAndReturn(run func(context.Context, []st return _c } +// Stats provides a mock function with given fields: ctx +func (_m *JobProcessor) Stats(ctx context.Context) ([]worker.JobTypeStats, error) { + ret := _m.Called(ctx) + + var r0 []worker.JobTypeStats + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]worker.JobTypeStats, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []worker.JobTypeStats); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]worker.JobTypeStats) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// JobProcessor_Stats_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stats' +type JobProcessor_Stats_Call struct { + *mock.Call +} + +// Stats is a helper method to define mock.On call +// - ctx context.Context +func (_e *JobProcessor_Expecter) Stats(ctx interface{}) *JobProcessor_Stats_Call { + return &JobProcessor_Stats_Call{Call: _e.mock.On("Stats", ctx)} +} + +func (_c *JobProcessor_Stats_Call) Run(run func(ctx context.Context)) *JobProcessor_Stats_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *JobProcessor_Stats_Call) Return(_a0 []worker.JobTypeStats, _a1 error) *JobProcessor_Stats_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *JobProcessor_Stats_Call) RunAndReturn(run func(context.Context) ([]worker.JobTypeStats, error)) *JobProcessor_Stats_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewJobProcessor interface { mock.TestingT Cleanup(func()) diff --git a/pkg/worker/pgq/pgq_processor.go b/pkg/worker/pgq/pgq_processor.go index 5313e087..8de14156 100644 --- a/pkg/worker/pgq/pgq_processor.go +++ b/pkg/worker/pgq/pgq_processor.go @@ -129,6 +129,39 @@ func (p *Processor) Process(ctx context.Context, types []string, fn worker.JobEx return nil } +func (p *Processor) Stats(ctx context.Context) ([]worker.JobTypeStats, error) { + const query = "select COALESCE(actv.type, dead.type) as type, " + + " COALESCE(active_job_count, 0) as active_job_count, " + + " COALESCE(dead_job_count, 0) as dead_job_count " + + "from (select type, count(id) as active_job_count " + + " from jobs_queue group by type) as actv " + + "full join (select type, count(id) as dead_job_count " + + " from dead_jobs group by type) as dead " + + "on (actv.type = dead.type) " + + "order by 1" + + rows, err := p.db.QueryContext(ctx, query) + if err != nil { + return nil, fmt.Errorf("pgq stats: run query: %w", err) + } + + defer rows.Close() + var stats []worker.JobTypeStats + for rows.Next() { + var st worker.JobTypeStats + if err := rows.Scan(&st.Type, &st.Active, &st.Dead); err != nil { + return nil, fmt.Errorf("pgq stats: scan row: %w", err) + } + + stats = append(stats, st) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("pgq stats: scan rows: %w", err) + } + + return stats, nil +} + func (p *Processor) DeadJobs(ctx context.Context, size, offset int) ([]worker.Job, error) { query := sq.Select(). From(deadJobsTable). diff --git a/pkg/worker/pgq/pgq_processor_test.go b/pkg/worker/pgq/pgq_processor_test.go index e9c0690b..fd7dbb25 100644 --- a/pkg/worker/pgq/pgq_processor_test.go +++ b/pkg/worker/pgq/pgq_processor_test.go @@ -380,6 +380,86 @@ func (s *ProcessorTestSuite) TestProcess() { }) } +func (s *ProcessorTestSuite) TestStats() { + s.Run("WithEmptyTables", func() { + err := testutils.RunMigrations(s.T(), s.db) + s.Require().NoError(err) + + p, err := pgq.NewProcessor(s.ctx, s.testDBConfig()) + s.NoError(err) + + stats, err := p.Stats(s.ctx) + s.NoError(err) + s.Empty(stats) + }) + + s.Run("WithActiveAndDeadJobs", func() { + err := testutils.RunMigrations(s.T(), s.db) + s.Require().NoError(err) + + inserts, err := os.ReadFile("testdata/insert_dead_jobs.sql") + s.Require().NoError(err) + + _, err = s.db.Exec((string)(inserts)) + s.Require().NoError(err) + + p, err := pgq.NewProcessor(s.ctx, s.testDBConfig()) + s.NoError(err) + + jobSpecs := []worker.JobSpec{ + {Type: "test", Payload: []byte("job1")}, + {Type: "test", Payload: []byte("job2")}, + {Type: "test", Payload: []byte("job3")}, + } + var jobs []worker.Job + for _, js := range jobSpecs { + job, err := worker.NewJob(js) + s.Require().NoError(err) + + jobs = append(jobs, job) + } + err = p.Enqueue(s.ctx, jobs...) + s.NoError(err) + + stats, err := p.Stats(s.ctx) + s.NoError(err) + + expected := []worker.JobTypeStats{ + { + Type: "index-asset", + Active: 0, + Dead: 12, + }, + { + Type: "test", + Active: 3, + Dead: 0, + }, + } + s.Equal(expected, stats) + + err = p.Resurrect(s.ctx, []string{"01H63BPX1D3S98K5GBADE1Q322", "01H63BPVMH6BBQJH776KRJAMH0"}) + s.NoError(err) + + stats, err = p.Stats(s.ctx) + s.NoError(err) + + expected = []worker.JobTypeStats{ + { + Type: "index-asset", + Active: 2, + Dead: 10, + }, + { + Type: "test", + Active: 3, + Dead: 0, + }, + } + s.Equal(expected, stats) + }) +} + func (s *ProcessorTestSuite) TestDeadJobs() { err := testutils.RunMigrations(s.T(), s.db) s.Require().NoError(err) diff --git a/pkg/worker/workermw/job_processor_instrumentation_mw.go b/pkg/worker/workermw/job_processor_instrumentation_mw.go new file mode 100644 index 00000000..da71c30b --- /dev/null +++ b/pkg/worker/workermw/job_processor_instrumentation_mw.go @@ -0,0 +1,116 @@ +package workermw + +import ( + "context" + "sort" + "time" + + "github.com/goto/compass/pkg/worker" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + enqueueDurnHistogram = "compass.worker.jobs.enqueue.duration" + dequeueLatencyHistogram = "compass.worker.job.dequeue.latency" + processDurnHistogram = "compass.worker.job.process.duration" +) + +const ( + attrJobTypes = attribute.Key("job.types") + attrJobType = attribute.Key("job.type") + attrOpSuccess = attribute.Key("operation.success") + attrJobAttemptNo = attribute.Key("job.attempt_number") + attrJobStatus = attribute.Key("job.status") +) + +type JobProcessorInstrumentation struct { + next worker.JobProcessor + + enqueueDurn metric.Float64Histogram + dequeueLatency metric.Float64Histogram + processDurn metric.Float64Histogram +} + +func WithJobProcessorInstrumentation() func(worker.JobProcessor) worker.JobProcessor { + meter := otel.Meter("github.com/goto/compass/pkg/worker/workermw") + enqueueDurn, err := meter.Float64Histogram(enqueueDurnHistogram) + handleOtelErr(err) + + dequeueLatency, err := meter.Float64Histogram(dequeueLatencyHistogram) + handleOtelErr(err) + + processDurn, err := meter.Float64Histogram(processDurnHistogram) + handleOtelErr(err) + + return func(next worker.JobProcessor) worker.JobProcessor { + return JobProcessorInstrumentation{ + next: next, + enqueueDurn: enqueueDurn, + dequeueLatency: dequeueLatency, + processDurn: processDurn, + } + } +} + +func (mw JobProcessorInstrumentation) Enqueue(ctx context.Context, jobs ...worker.Job) (err error) { + defer func(start time.Time) { + ms := (float64)(time.Since(start)) / (float64)(time.Millisecond) + mw.enqueueDurn.Record(ctx, ms, metric.WithAttributes( + attrJobTypes.StringSlice(jobTypes(jobs)), + attrOpSuccess.Bool(err == nil), + )) + }(time.Now()) + + return mw.next.Enqueue(ctx, jobs...) +} + +func (mw JobProcessorInstrumentation) Process(ctx context.Context, types []string, fn worker.JobExecutorFunc) (err error) { + start := time.Now() + wrappedFn := func(ctx context.Context, job worker.Job) (resultJob worker.Job) { + latency := (float64)(time.Since(job.RunAt)) / (float64)(time.Millisecond) + mw.dequeueLatency.Record(ctx, latency, metric.WithAttributes( + attrJobType.String(job.Type), + )) + defer func() { + ms := (float64)(time.Since(start)) / (float64)(time.Millisecond) + mw.processDurn.Record(ctx, ms, metric.WithAttributes( + attrJobType.String(job.Type), + attrJobAttemptNo.Int(resultJob.AttemptsDone), + attrJobStatus.String(jobStatus(resultJob)), + attrOpSuccess.Bool(resultJob.Status == worker.StatusDone), + )) + }() + return fn(ctx, job) + } + return mw.next.Process(ctx, types, wrappedFn) +} + +func (mw JobProcessorInstrumentation) Stats(ctx context.Context) ([]worker.JobTypeStats, error) { + return mw.next.Stats(ctx) +} + +func jobTypes(jobs []worker.Job) []string { + var types []string + for _, j := range jobs { + types = append(types, j.Type) + } + sort.Strings(types) + + return types +} + +func jobStatus(j worker.Job) string { + if j.Status == "" { + return "retry" + } + + return (string)(j.Status) +} + +func handleOtelErr(err error) { + if err != nil { + otel.Handle(err) + } +}