Skip to content

Commit

Permalink
remove redundant cancel func storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Sumeet Rai committed Sep 25, 2024
1 parent 077fc6a commit c0186c9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 49 deletions.
3 changes: 1 addition & 2 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,14 @@ func runServer(ctx context.Context, cfg *Config) error {
}
}()

assetService, cancel := asset.NewService(asset.ServiceDeps{
assetService := asset.NewService(asset.ServiceDeps{
AssetRepo: assetRepository,
DiscoveryRepo: discoveryRepository,
LineageRepo: lineageRepository,
Worker: wrkr,
Logger: logger,
Config: cfg.Asset,
})
defer cancel()

// init discussion
discussionRepository, err := postgres.NewDiscussionRepository(pgClient, 0)
Expand Down
24 changes: 5 additions & 19 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Service struct {
worker Worker
logger log.Logger
config Config
cancelFnList []func()

assetOpCounter metric.Int64Counter
}
Expand All @@ -44,7 +43,7 @@ type ServiceDeps struct {
Config Config
}

func NewService(deps ServiceDeps) (service *Service, cancel func()) {
func NewService(deps ServiceDeps) (service *Service) {
assetOpCounter, err := otel.Meter("github.com/goto/compass/core/asset").
Int64Counter("compass.asset.operation")
if err != nil {
Expand All @@ -58,16 +57,11 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) {
worker: deps.Worker,
logger: deps.Logger,
config: deps.Config,
cancelFnList: make([]func(), 0),

assetOpCounter: assetOpCounter,
}

return newService, func() {
for i := range newService.cancelFnList {
newService.cancelFnList[i]()
}
}
return newService
}

func (s *Service) GetAllAssets(ctx context.Context, flt Filter, withTotal bool) ([]Asset, uint32, error) {
Expand Down Expand Up @@ -154,23 +148,15 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)

if !request.DryRun && total > 0 {
newCtx, cancel := context.WithTimeout(context.Background(), s.config.DeleteAssetsTimeout)
idx := len(s.cancelFnList)
s.cancelFnList = append(s.cancelFnList, cancel)
go func(index int) {
go func() {
defer cancel()
s.executeDeleteAssets(newCtx, deleteSQLExpr)
s.removeCancelFnByIndex(index)
}(idx)
}()
}

return uint32(total), nil
}

func (s *Service) removeCancelFnByIndex(index int) {
if index < len(s.cancelFnList) {
s.cancelFnList = append(s.cancelFnList[:index], s.cancelFnList[index+1:]...)
}
}

func (s *Service) executeDeleteAssets(ctx context.Context, deleteSQLExpr queryexpr.ExprStr) {
deletedURNs, err := s.assetRepository.DeleteByQueryExpr(ctx, deleteSQLExpr)
if err != nil {
Expand Down
42 changes: 14 additions & 28 deletions core/asset/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,11 @@ func TestService_GetAllAssets(t *testing.T) {
tc.Setup(ctx, mockAssetRepo, mockDiscoveryRepo, mockLineageRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: mockAssetRepo,
DiscoveryRepo: mockDiscoveryRepo,
LineageRepo: mockLineageRepo,
})
defer cancel()

got, cnt, err := svc.GetAllAssets(ctx, tc.Filter, tc.WithTotal)
if err != nil && errors.Is(tc.Err, err) {
Expand Down Expand Up @@ -161,8 +160,7 @@ func TestService_GetTypes(t *testing.T) {
tc.Setup(ctx, mockAssetRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})
defer cancel()
svc := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})

got, err := svc.GetTypes(ctx, tc.Filter)
if err != nil && errors.Is(tc.Err, err) {
Expand Down Expand Up @@ -247,13 +245,12 @@ func TestService_UpsertAsset(t *testing.T) {
tc.Setup(ctx, assetRepo, discoveryRepo, lineageRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: assetRepo,
DiscoveryRepo: discoveryRepo,
LineageRepo: lineageRepo,
Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}),
})
defer cancel()

rid, err := svc.UpsertAsset(ctx, tc.Asset, tc.Upstreams, tc.Downstreams)
if tc.Err != nil {
Expand Down Expand Up @@ -312,13 +309,12 @@ func TestService_UpsertAssetWithoutLineage(t *testing.T) {
tc.Setup(ctx, assetRepo, discoveryRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: assetRepo,
DiscoveryRepo: discoveryRepo,
LineageRepo: mocks.NewLineageRepository(t),
Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}),
})
defer cancel()

rid, err := svc.UpsertAssetWithoutLineage(ctx, tc.Asset)
if tc.Err != nil {
Expand Down Expand Up @@ -440,13 +436,12 @@ func TestService_DeleteAsset(t *testing.T) {
tc.Setup(ctx, assetRepo, discoveryRepo, lineageRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: assetRepo,
DiscoveryRepo: discoveryRepo,
LineageRepo: lineageRepo,
Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}),
})
defer cancel()

err := svc.DeleteAsset(ctx, tc.ID)
if err != nil && errors.Is(tc.Err, err) {
Expand Down Expand Up @@ -524,13 +519,12 @@ func TestService_DeleteAssets(t *testing.T) {
tc.Setup(ctx, assetRepo, worker, lineageRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: assetRepo,
DiscoveryRepo: discoveryRepo,
LineageRepo: lineageRepo,
Worker: worker,
})
defer cancel()

affectedRows, err := svc.DeleteAssets(ctx, tc.Request)
time.Sleep(1 * time.Second)
Expand Down Expand Up @@ -652,12 +646,11 @@ func TestService_GetAssetByID(t *testing.T) {
tc.Setup(ctx, mockAssetRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: mockAssetRepo,
DiscoveryRepo: mocks.NewDiscoveryRepository(t),
LineageRepo: mocks.NewLineageRepository(t),
})
defer cancel()

actual, err := svc.GetAssetByID(ctx, tc.ID)
if tc.Expected != nil {
Expand Down Expand Up @@ -762,12 +755,11 @@ func TestService_GetAssetByIDWithoutProbes(t *testing.T) {
tc.Setup(ctx, mockAssetRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: mockAssetRepo,
DiscoveryRepo: mocks.NewDiscoveryRepository(t),
LineageRepo: mocks.NewLineageRepository(t),
})
defer cancel()

actual, err := svc.GetAssetByIDWithoutProbes(ctx, tc.ID)
if tc.Expected != nil {
Expand Down Expand Up @@ -836,12 +828,11 @@ func TestService_GetAssetByVersion(t *testing.T) {
tc.Setup(ctx, mockAssetRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: mockAssetRepo,
DiscoveryRepo: mocks.NewDiscoveryRepository(t),
LineageRepo: mocks.NewLineageRepository(t),
})
defer cancel()

_, err := svc.GetAssetByVersion(ctx, tc.ID, "v0.0.2")
if tc.ExpectedErr != nil {
Expand Down Expand Up @@ -890,12 +881,11 @@ func TestService_GetAssetVersionHistory(t *testing.T) {
tc.Setup(ctx, mockAssetRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: mockAssetRepo,
DiscoveryRepo: mockDiscoveryRepo,
LineageRepo: mockLineageRepo,
})
defer cancel()

_, err := svc.GetAssetVersionHistory(ctx, asset.Filter{}, tc.ID)
if err != nil && errors.Is(tc.Err, err) {
Expand Down Expand Up @@ -1077,12 +1067,11 @@ func TestService_GetLineage(t *testing.T) {
tc.Setup(ctx, mockAssetRepo, mockDiscoveryRepo, mockLineageRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: mockAssetRepo,
DiscoveryRepo: mockDiscoveryRepo,
LineageRepo: mockLineageRepo,
})
defer cancel()

actual, err := svc.GetLineage(ctx, "urn-source-1", tc.Query)
if tc.Err == nil {
Expand Down Expand Up @@ -1152,12 +1141,11 @@ func TestService_SearchSuggestGroupAssets(t *testing.T) {
tc.Setup(ctx, mockDiscoveryRepo)
}

svc, cancel := asset.NewService(asset.ServiceDeps{
svc := asset.NewService(asset.ServiceDeps{
AssetRepo: mockAssetRepo,
DiscoveryRepo: mockDiscoveryRepo,
LineageRepo: mockLineageRepo,
})
defer cancel()

_, err := svc.SearchAssets(ctx, asset.SearchConfig{})
if err != nil && !assert.Equal(t, tc.ErrSearch, err) {
Expand Down Expand Up @@ -1189,8 +1177,7 @@ func TestService_CreateAssetProbe(t *testing.T) {
mockAssetRepo := mocks.NewAssetRepository(t)
mockAssetRepo.EXPECT().AddProbe(ctx, assetURN, &probe).Return(nil)

svc, cancel := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})
defer cancel()
svc := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})

err := svc.AddProbe(ctx, assetURN, &probe)
assert.NoError(t, err)
Expand All @@ -1202,8 +1189,7 @@ func TestService_CreateAssetProbe(t *testing.T) {
mockAssetRepo := mocks.NewAssetRepository(t)
mockAssetRepo.EXPECT().AddProbe(ctx, assetURN, &probe).Return(expectedErr)

svc, cancel := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})
defer cancel()
svc := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})

err := svc.AddProbe(ctx, assetURN, &probe)
assert.Equal(t, expectedErr, err)
Expand Down

0 comments on commit c0186c9

Please sign in to comment.