Skip to content

Commit

Permalink
test: create unit test for deletion API
Browse files Browse the repository at this point in the history
  • Loading branch information
Muhammad Luthfi Fahlevi committed Aug 12, 2024
1 parent a08f729 commit a926792
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 28 deletions.
38 changes: 19 additions & 19 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ func (s *Service) DeleteAssets(ctx context.Context, queryExpr string, dryRun boo
return uint32(total), nil
}

// Perform the Assets deletion asynchronously.
s.deleteAssetsAsynchronously(&wg, queryExpr, urns, dbErrChan)

return uint32(total), nil
}

func (s *Service) deleteAssetsAsynchronously(wg *sync.WaitGroup, queryExpr string, urns []string, dbErrChan chan error) {
// Perform the Assets deletion asynchronously
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -153,38 +159,32 @@ func (s *Service) DeleteAssets(ctx context.Context, queryExpr string, dryRun boo
close(dbErrChan)
}()

// Perform Elasticsearch and Lineage asynchronously if the Assets deletion is successful.
// Perform Elasticsearch and Lineage deletion asynchronously if the Assets deletion is successful
wg.Add(1)
go func() {
defer wg.Done()
dbErr := <-dbErrChan
if dbErr == nil {
s.deleteAssetsInElasticsearchAsynchronously(queryExpr)
s.deleteAssetsInLineageAsynchronously(urns)
go s.deleteAssetsInElasticsearchByQueryExpr(queryExpr)
go s.deleteAssetsInLineageByQueryExpr(urns)
} else {
log.Printf("Database deletion failed, skipping Elasticsearch and Lineage deletions: %s", dbErr)
}
}()

return uint32(total), nil
}

func (s *Service) deleteAssetsInElasticsearchAsynchronously(queryExpr string) {
go func() {
if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(context.Background(), queryExpr); err != nil {
log.Printf("Error occurred during Elasticsearch deletion: %s", err)
}
}()
func (s *Service) deleteAssetsInElasticsearchByQueryExpr(queryExpr string) {
if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(context.Background(), queryExpr); err != nil {
log.Printf("Error occurred during Elasticsearch deletion: %s", err)
}
}

func (s *Service) deleteAssetsInLineageAsynchronously(urns []string) {
go func() {
for _, urn := range urns {
if err := s.lineageRepository.DeleteByURN(context.Background(), urn); err != nil {
log.Printf("Error occurred during Lineage deletion: %s", err)
}
func (s *Service) deleteAssetsInLineageByQueryExpr(urns []string) {
for _, urn := range urns {
if err := s.lineageRepository.DeleteByURN(context.Background(), urn); err != nil {
log.Printf("Error occurred during Lineage deletion: %s", err)
}
}()
}
}

func (s *Service) GetAssetByID(ctx context.Context, id string) (Asset, error) {
Expand Down
74 changes: 74 additions & 0 deletions core/asset/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,80 @@ func TestService_DeleteAsset(t *testing.T) {
}
}

func TestService_DeleteAssets(t *testing.T) {
emptyQueryExpr := ""
notMeetIdentifierRequirementQueryExpr := `refreshed_at < now()`
successfulQueryExpr := `refreshed_at <= "2023-12-12 23:59:59" && service in ["service-1", "service-2"] && type == "table"`
type testCase struct {
Description string
QueryExpr string
DryRun bool
Setup func(context.Context, *mocks.AssetRepository, *mocks.DiscoveryRepository, *mocks.LineageRepository)
ExpectAffectedRows uint32
ExpectErr error
}

testCases := []testCase{
{
Description: `should return error if query expr is empty`,
QueryExpr: emptyQueryExpr,
DryRun: false,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, _ *mocks.DiscoveryRepository, _ *mocks.LineageRepository) {
ar.EXPECT().GetCountByQueryExpr(ctx, emptyQueryExpr, true).Return(0, errors.New("error"))
},
ExpectAffectedRows: 0,
ExpectErr: errors.New("error"),
},
{
Description: `should return error if query expr does not meet identifier requirement`,
QueryExpr: notMeetIdentifierRequirementQueryExpr,
DryRun: false,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, _ *mocks.DiscoveryRepository, _ *mocks.LineageRepository) {
ar.EXPECT().GetCountByQueryExpr(ctx, notMeetIdentifierRequirementQueryExpr, true).
Return(0, errors.New("must exist these identifiers: refreshed_at, type, and service. Current identifiers: refreshed_at"))
},
ExpectAffectedRows: 0,
ExpectErr: errors.New("must exist these identifiers: refreshed_at, type, and service. Current identifiers: refreshed_at"),
},
{
Description: `should only return the numbers of assets that match the given query if dry run is true`,
QueryExpr: successfulQueryExpr,
DryRun: true,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, _ *mocks.DiscoveryRepository, _ *mocks.LineageRepository) {
ar.EXPECT().GetCountByQueryExpr(ctx, successfulQueryExpr, true).Return(11, nil)
},
ExpectAffectedRows: 11,
ExpectErr: nil,
},
}
for _, tc := range testCases {
t.Run(tc.Description, func(t *testing.T) {
ctx := context.Background()

assetRepo := mocks.NewAssetRepository(t)
discoveryRepo := mocks.NewDiscoveryRepository(t)
lineageRepo := mocks.NewLineageRepository(t)
if tc.Setup != nil {
tc.Setup(ctx, assetRepo, discoveryRepo, lineageRepo)
}

svc := asset.NewService(asset.ServiceDeps{
AssetRepo: assetRepo,
DiscoveryRepo: discoveryRepo,
LineageRepo: lineageRepo,
Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}),
})

affectedRows, err := svc.DeleteAssets(ctx, tc.QueryExpr, tc.DryRun)

if tc.ExpectErr != nil {
assert.ErrorContains(t, err, tc.ExpectErr.Error())
}
assert.Equal(t, tc.ExpectAffectedRows, affectedRows)
})
}
}

func TestService_GetAssetByID(t *testing.T) {
assetID := "f742aa61-1100-445c-8d72-355a42e2fb59"
urn := "my-test-urn"
Expand Down
91 changes: 90 additions & 1 deletion internal/server/v1beta1/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
compassv1beta1 "github.com/goto/compass/proto/gotocompany/compass/v1beta1"
"github.com/goto/salt/log"
"github.com/r3labs/diff/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -975,9 +976,97 @@ func TestDeleteAsset(t *testing.T) {
_, err := handler.DeleteAsset(ctx, &compassv1beta1.DeleteAssetRequest{Id: tc.AssetID})
code := status.Code(err)
if code != tc.ExpectStatus {
t.Errorf("expected handler to return Code %s, returned Code %sinstead", tc.ExpectStatus.String(), code.String())
t.Errorf("expected handler to return Code %s, returned Code %s instead", tc.ExpectStatus.String(), code.String())
return
}
})
}
}

func TestDeleteAssets(t *testing.T) {
var (
userID = uuid.NewString()
userUUID = uuid.NewString()
emptyQueryExpr = ""
notMeetIdentifierRequirementQueryExpr = `refreshed_at < now()`
successfulQueryExpr = `refreshed_at <= "2023-12-12 23:59:59" && service in ["service-1", "service-2"] && type == "table"`
)
type TestCase struct {
Description string
QueryExpr string
DryRun bool
ExpectStatus codes.Code
ExpectResult *compassv1beta1.DeleteAssetsResponse
Setup func(ctx context.Context, as *mocks.AssetService, astID string)
}

testCases := []TestCase{
{
Description: "should return error when insert empty query expr",
QueryExpr: emptyQueryExpr,
DryRun: false,
ExpectStatus: codes.InvalidArgument,
ExpectResult: nil,
Setup: func(ctx context.Context, as *mocks.AssetService, astID string) {
as.EXPECT().DeleteAssets(ctx, emptyQueryExpr, false).Return(0, errors.New("error"))
},
},
{
Description: "should return error when query expr does not meet identifier requirement",
QueryExpr: notMeetIdentifierRequirementQueryExpr,
DryRun: false,
ExpectStatus: codes.InvalidArgument,
ExpectResult: nil,
Setup: func(ctx context.Context, as *mocks.AssetService, astID string) {
as.EXPECT().DeleteAssets(ctx, notMeetIdentifierRequirementQueryExpr, false).
Return(0, errors.New("must exist these identifiers: refreshed_at, type, and service. Current identifiers: refreshed_at"))
},
},
{
Description: `should only return the numbers of assets that match the given query if dry run is true`,
QueryExpr: successfulQueryExpr,
DryRun: true,
ExpectStatus: codes.OK,
ExpectResult: &compassv1beta1.DeleteAssetsResponse{AffectedRows: 11},
Setup: func(ctx context.Context, as *mocks.AssetService, astID string) {
as.EXPECT().DeleteAssets(ctx, successfulQueryExpr, true).Return(11, nil)
},
},
{
Description: `should return the affected rows numbers and perform deletion in the background if dry run is false`,
QueryExpr: successfulQueryExpr,
DryRun: false,
ExpectStatus: codes.OK,
ExpectResult: &compassv1beta1.DeleteAssetsResponse{AffectedRows: 2},
Setup: func(ctx context.Context, as *mocks.AssetService, astID string) {
as.EXPECT().DeleteAssets(ctx, successfulQueryExpr, false).Return(2, nil)
},
},
}
for _, tc := range testCases {
t.Run(tc.Description, func(t *testing.T) {
ctx := user.NewContext(context.Background(), user.User{UUID: userUUID})

logger := log.NewNoop()
mockUserSvc := new(mocks.UserService)
mockAssetSvc := new(mocks.AssetService)
if tc.Setup != nil {
tc.Setup(ctx, mockAssetSvc, assetID)
}
defer mockUserSvc.AssertExpectations(t)
defer mockAssetSvc.AssertExpectations(t)

mockUserSvc.EXPECT().ValidateUser(ctx, userUUID, "").Return(userID, nil)

handler := NewAPIServer(APIServerDeps{AssetSvc: mockAssetSvc, UserSvc: mockUserSvc, Logger: logger})

result, err := handler.DeleteAssets(ctx, &compassv1beta1.DeleteAssetsRequest{QueryExpr: tc.QueryExpr, DryRun: tc.DryRun})
code := status.Code(err)
if code != tc.ExpectStatus {
t.Errorf("expected handler to return Code %s, returned Code %s instead", tc.ExpectStatus.String(), code.String())
return
}
assert.Equal(t, tc.ExpectResult, result)
})
}
}
Expand Down
106 changes: 106 additions & 0 deletions internal/store/elasticsearch/discovery_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/goto/compass/core/asset"
store "github.com/goto/compass/internal/store/elasticsearch"
queryexpr "github.com/goto/compass/pkg/query_expr"
"github.com/goto/salt/log"
"github.com/olivere/elastic/v7"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -391,6 +392,111 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
})
}

func TestDiscoveryRepositoryDeleteByQueryExpr(t *testing.T) {
var (
ctx = context.Background()
bigqueryService = "bigquery-test"
kafkaService = "kafka-test"
)

cli, err := esTestServer.NewClient()
require.NoError(t, err)

esClient, err := store.NewClient(
log.NewNoop(), store.Config{}, store.WithClient(cli),
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

t.Run("should return error if the given query expr is empty", func(t *testing.T) {
err = repo.DeleteByQueryExpr(ctx, "")
assert.ErrorIs(t, err, asset.ErrEmptyQuery)
})

t.Run("should not return error on success", func(t *testing.T) {
ast := asset.Asset{
ID: "delete-id",
Type: asset.TypeTable,
Service: bigqueryService,
URN: "some-urn",
RefreshedAt: time.Now(),
}

err = repo.Upsert(ctx, ast)
require.NoError(t, err)

queryExpr := "refreshed_at <= '" + time.Now().Format("2006-01-02 15:04:05") +
"' && service == '" + bigqueryService +
"' && type == '" + asset.TypeTable.String() + "'"
err = repo.DeleteByQueryExpr(ctx, queryExpr)
assert.NoError(t, err)

deleteAssetESExpr := &store.DeleteAssetESExpr{
ESExpr: queryexpr.ESExpr(queryExpr),
}
esQuery, _ := queryexpr.ValidateAndGetQueryFromExpr(deleteAssetESExpr)

res, err := cli.Search(
cli.Search.WithBody(strings.NewReader(esQuery)),
cli.Search.WithIndex("_all"),
)
require.NoError(t, err)
assert.False(t, res.IsError())

var body struct {
Hits struct {
Total elastic.TotalHits `json:"total"`
} `json:"hits"`
}
require.NoError(t, json.NewDecoder(res.Body).Decode(&body))
assert.Equal(t, int64(0), body.Hits.Total.Value)
})

t.Run("should ignore unavailable indices", func(t *testing.T) {
currentTime := time.Now()
ast1 := asset.Asset{
ID: "id1",
Type: asset.TypeTable,
Service: bigqueryService,
URN: "urn1",
RefreshedAt: currentTime,
}
ast2 := asset.Asset{
ID: "id2",
Type: asset.TypeTopic,
Service: kafkaService,
URN: "urn2",
RefreshedAt: currentTime,
}
cli, err := esTestServer.NewClient()
require.NoError(t, err)
esClient, err := store.NewClient(
log.NewNoop(),
store.Config{},
store.WithClient(cli),
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)

err = repo.Upsert(ctx, ast2)
require.NoError(t, err)

_, err = cli.Indices.Close([]string{kafkaService})
require.NoError(t, err)

queryExpr := "refreshed_at <= '" + time.Now().Format("2006-01-02 15:04:05") +
"' && service == '" + kafkaService +
"' && type == '" + asset.TypeTopic.String() + "'"
err = repo.DeleteByQueryExpr(ctx, queryExpr)
assert.NoError(t, err)
})
}

func TestDiscoveryRepository_SyncAssets(t *testing.T) {
t.Run("should return success", func(t *testing.T) {
var (
Expand Down
2 changes: 1 addition & 1 deletion internal/store/postgres/asset_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (d *DeleteAssetSQLExpr) Validate() error {
generichelper.Contains(identifiers, "type") &&
generichelper.Contains(identifiers, "service")
if !mustExist {
return fmt.Errorf("must exists these identifiers: refreshed_at, type, and service. "+
return fmt.Errorf("must exist these identifiers: refreshed_at, type, and service. "+
"Current identifiers: %v", identifiers)
}

Expand Down
Loading

0 comments on commit a926792

Please sign in to comment.