diff --git a/cli/server.go b/cli/server.go index 446328a9..59c420fd 100644 --- a/cli/server.go +++ b/cli/server.go @@ -152,7 +152,7 @@ func runServer(ctx context.Context, cfg *Config) error { } }() - assetService, cancel := asset.NewService(asset.ServiceDeps{ + assetService := asset.NewService(asset.ServiceDeps{ AssetRepo: assetRepository, DiscoveryRepo: discoveryRepository, LineageRepo: lineageRepository, @@ -160,7 +160,6 @@ func runServer(ctx context.Context, cfg *Config) error { Logger: logger, Config: cfg.Asset, }) - defer cancel() // init discussion discussionRepository, err := postgres.NewDiscussionRepository(pgClient, 0) diff --git a/core/asset/service.go b/core/asset/service.go index 5645ec93..96c54598 100644 --- a/core/asset/service.go +++ b/core/asset/service.go @@ -20,7 +20,6 @@ type Service struct { worker Worker logger log.Logger config Config - cancelFnList []func() assetOpCounter metric.Int64Counter } @@ -44,7 +43,7 @@ type ServiceDeps struct { Config Config } -func NewService(deps ServiceDeps) (service *Service, cancel func()) { +func NewService(deps ServiceDeps) (service *Service) { assetOpCounter, err := otel.Meter("github.com/goto/compass/core/asset"). Int64Counter("compass.asset.operation") if err != nil { @@ -58,16 +57,11 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) { worker: deps.Worker, logger: deps.Logger, config: deps.Config, - cancelFnList: make([]func(), 0), assetOpCounter: assetOpCounter, } - return newService, func() { - for i := range newService.cancelFnList { - newService.cancelFnList[i]() - } - } + return newService } func (s *Service) GetAllAssets(ctx context.Context, flt Filter, withTotal bool) ([]Asset, uint32, error) { @@ -154,23 +148,15 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest) if !request.DryRun && total > 0 { newCtx, cancel := context.WithTimeout(context.Background(), s.config.DeleteAssetsTimeout) - idx := len(s.cancelFnList) - s.cancelFnList = append(s.cancelFnList, cancel) - go func(index int) { + go func() { + defer cancel() s.executeDeleteAssets(newCtx, deleteSQLExpr) - s.removeCancelFnByIndex(index) - }(idx) + }() } return uint32(total), nil } -func (s *Service) removeCancelFnByIndex(index int) { - if index < len(s.cancelFnList) { - s.cancelFnList = append(s.cancelFnList[:index], s.cancelFnList[index+1:]...) - } -} - func (s *Service) executeDeleteAssets(ctx context.Context, deleteSQLExpr queryexpr.ExprStr) { deletedURNs, err := s.assetRepository.DeleteByQueryExpr(ctx, deleteSQLExpr) if err != nil { diff --git a/core/asset/service_test.go b/core/asset/service_test.go index 7145bb7f..d930ff9c 100644 --- a/core/asset/service_test.go +++ b/core/asset/service_test.go @@ -90,12 +90,11 @@ func TestService_GetAllAssets(t *testing.T) { tc.Setup(ctx, mockAssetRepo, mockDiscoveryRepo, mockLineageRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: mockAssetRepo, DiscoveryRepo: mockDiscoveryRepo, LineageRepo: mockLineageRepo, }) - defer cancel() got, cnt, err := svc.GetAllAssets(ctx, tc.Filter, tc.WithTotal) if err != nil && errors.Is(tc.Err, err) { @@ -161,8 +160,7 @@ func TestService_GetTypes(t *testing.T) { tc.Setup(ctx, mockAssetRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo}) - defer cancel() + svc := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo}) got, err := svc.GetTypes(ctx, tc.Filter) if err != nil && errors.Is(tc.Err, err) { @@ -247,13 +245,12 @@ func TestService_UpsertAsset(t *testing.T) { tc.Setup(ctx, assetRepo, discoveryRepo, lineageRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: assetRepo, DiscoveryRepo: discoveryRepo, LineageRepo: lineageRepo, Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}), }) - defer cancel() rid, err := svc.UpsertAsset(ctx, tc.Asset, tc.Upstreams, tc.Downstreams) if tc.Err != nil { @@ -312,13 +309,12 @@ func TestService_UpsertAssetWithoutLineage(t *testing.T) { tc.Setup(ctx, assetRepo, discoveryRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: assetRepo, DiscoveryRepo: discoveryRepo, LineageRepo: mocks.NewLineageRepository(t), Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}), }) - defer cancel() rid, err := svc.UpsertAssetWithoutLineage(ctx, tc.Asset) if tc.Err != nil { @@ -440,13 +436,12 @@ func TestService_DeleteAsset(t *testing.T) { tc.Setup(ctx, assetRepo, discoveryRepo, lineageRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: assetRepo, DiscoveryRepo: discoveryRepo, LineageRepo: lineageRepo, Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}), }) - defer cancel() err := svc.DeleteAsset(ctx, tc.ID) if err != nil && errors.Is(tc.Err, err) { @@ -524,13 +519,12 @@ func TestService_DeleteAssets(t *testing.T) { tc.Setup(ctx, assetRepo, worker, lineageRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: assetRepo, DiscoveryRepo: discoveryRepo, LineageRepo: lineageRepo, Worker: worker, }) - defer cancel() affectedRows, err := svc.DeleteAssets(ctx, tc.Request) time.Sleep(1 * time.Second) @@ -652,12 +646,11 @@ func TestService_GetAssetByID(t *testing.T) { tc.Setup(ctx, mockAssetRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: mockAssetRepo, DiscoveryRepo: mocks.NewDiscoveryRepository(t), LineageRepo: mocks.NewLineageRepository(t), }) - defer cancel() actual, err := svc.GetAssetByID(ctx, tc.ID) if tc.Expected != nil { @@ -762,12 +755,11 @@ func TestService_GetAssetByIDWithoutProbes(t *testing.T) { tc.Setup(ctx, mockAssetRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: mockAssetRepo, DiscoveryRepo: mocks.NewDiscoveryRepository(t), LineageRepo: mocks.NewLineageRepository(t), }) - defer cancel() actual, err := svc.GetAssetByIDWithoutProbes(ctx, tc.ID) if tc.Expected != nil { @@ -836,12 +828,11 @@ func TestService_GetAssetByVersion(t *testing.T) { tc.Setup(ctx, mockAssetRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: mockAssetRepo, DiscoveryRepo: mocks.NewDiscoveryRepository(t), LineageRepo: mocks.NewLineageRepository(t), }) - defer cancel() _, err := svc.GetAssetByVersion(ctx, tc.ID, "v0.0.2") if tc.ExpectedErr != nil { @@ -890,12 +881,11 @@ func TestService_GetAssetVersionHistory(t *testing.T) { tc.Setup(ctx, mockAssetRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: mockAssetRepo, DiscoveryRepo: mockDiscoveryRepo, LineageRepo: mockLineageRepo, }) - defer cancel() _, err := svc.GetAssetVersionHistory(ctx, asset.Filter{}, tc.ID) if err != nil && errors.Is(tc.Err, err) { @@ -1077,12 +1067,11 @@ func TestService_GetLineage(t *testing.T) { tc.Setup(ctx, mockAssetRepo, mockDiscoveryRepo, mockLineageRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: mockAssetRepo, DiscoveryRepo: mockDiscoveryRepo, LineageRepo: mockLineageRepo, }) - defer cancel() actual, err := svc.GetLineage(ctx, "urn-source-1", tc.Query) if tc.Err == nil { @@ -1152,12 +1141,11 @@ func TestService_SearchSuggestGroupAssets(t *testing.T) { tc.Setup(ctx, mockDiscoveryRepo) } - svc, cancel := asset.NewService(asset.ServiceDeps{ + svc := asset.NewService(asset.ServiceDeps{ AssetRepo: mockAssetRepo, DiscoveryRepo: mockDiscoveryRepo, LineageRepo: mockLineageRepo, }) - defer cancel() _, err := svc.SearchAssets(ctx, asset.SearchConfig{}) if err != nil && !assert.Equal(t, tc.ErrSearch, err) { @@ -1189,8 +1177,7 @@ func TestService_CreateAssetProbe(t *testing.T) { mockAssetRepo := mocks.NewAssetRepository(t) mockAssetRepo.EXPECT().AddProbe(ctx, assetURN, &probe).Return(nil) - svc, cancel := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo}) - defer cancel() + svc := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo}) err := svc.AddProbe(ctx, assetURN, &probe) assert.NoError(t, err) @@ -1202,8 +1189,7 @@ func TestService_CreateAssetProbe(t *testing.T) { mockAssetRepo := mocks.NewAssetRepository(t) mockAssetRepo.EXPECT().AddProbe(ctx, assetURN, &probe).Return(expectedErr) - svc, cancel := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo}) - defer cancel() + svc := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo}) err := svc.AddProbe(ctx, assetURN, &probe) assert.Equal(t, expectedErr, err)