Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create delete assets API #77

Merged
merged 46 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
498abdd
feat(proto): generate proto regarding delete assets
Aug 6, 2024
573ecac
feat(asset): add refreshed_at field
Aug 8, 2024
1bc3990
feat: create translator from query expr to SQL query and ES query
Aug 8, 2024
17918ce
feat(module): add expr-lang/expr module
Aug 8, 2024
cd81a69
feat(docs): add comments in QueryExprTranslator
Aug 8, 2024
ebb14c7
feat(asset): change migration update
Aug 8, 2024
94e9e7b
feat(asset): add down migration
Aug 8, 2024
b2094e7
feat(translator): mark as private func for func that only used in tra…
Aug 8, 2024
8e4782f
feat: create delete assets API by query expr
Aug 8, 2024
f068a05
refactor: make interface for query expr and implement to postgresql a…
Aug 9, 2024
49bdac9
refactor: resolve all lint issues
Aug 9, 2024
3d412eb
fix: add refreshed_at in insert asset query
Aug 9, 2024
c629b87
feat: add refreshed_at field in get all assets
Aug 9, 2024
41b32cb
fix: update return error in delete assets
Aug 9, 2024
3ef8559
fix: update asynchronous process in deletion assets
Aug 9, 2024
04d17a5
refactor: fix lint issues
Aug 9, 2024
ac922dc
feat: fix ConditionalNode logic and toString format
Aug 10, 2024
f2d5c8a
refactor: remove redundant code, and make return error as soon as pos…
Aug 10, 2024
13f53c9
feat: update mock using mockery
Aug 11, 2024
5411c6e
feat: equalize current time at created_at and updated_at based on ref…
Aug 11, 2024
7092fef
refactor: rename var
Aug 11, 2024
22d6d9e
refactor: remove unused go module
Aug 11, 2024
5994448
test: fix existing unit tests
Aug 11, 2024
2eee272
test: fix existing unit tests
Aug 11, 2024
a08f729
refactor: makes codes to more readable
Aug 12, 2024
a926792
test: create unit test for deletion API
Aug 12, 2024
57897ea
test: create unit test for converter
Aug 12, 2024
e4903d0
feat: add validation in deletion query and remove redundant code
Aug 12, 2024
b7afadc
feat: make can equals (==) or IN for type and service in delete asset…
Aug 13, 2024
4abd20d
test: create unit test for delete asset expr
Aug 13, 2024
a77cf72
feat: change flow of deletion assets and refactor codes based on feed…
Aug 20, 2024
6dc3c6c
test: fix unit test
Aug 20, 2024
600572e
test: fix unit test
Aug 20, 2024
1724260
refactor: remove unused comment
Aug 20, 2024
b1b5710
feat: make maxAttemptRetry to be configurable and improve code perfor…
Aug 21, 2024
3a01e1b
feat: make cancel for context, and remove pointer in ESExpr and SQLExpr
Aug 21, 2024
ed71b28
refactor: make clean as linter suggestion
Aug 21, 2024
184041f
test: fix unit test due to cancel func when create new asset service
Aug 21, 2024
f2ab1c3
refactor: change argument of DeleteByQueryExpr in discovery repositor…
Aug 21, 2024
a550154
feat: add condition when the complex query result is time and refacto…
Aug 21, 2024
c3fb143
refactor: revise some codes as feedbacks and add test case in es expr…
Aug 21, 2024
ed7b8ef
refactor: clean code as feedbacks
Aug 21, 2024
8c655b2
refactor: add comment so more clear
Aug 21, 2024
7b4254a
refactor: remove redundant code
Aug 22, 2024
0355599
Merge branch 'main' of github.com:goto/compass into feat-create-delet…
Aug 22, 2024
a1bf848
feat: add case for MemberNode which handle nested query
Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ help: ##@help show this help
NAME="github.com/goto/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "fe99cc96e060085d6052096e9ba59b4038c691c6"
PROTON_COMMIT := "8375eddcb23d38f601f6036c676493d8feb84a7e"

TOOLS_MOD_DIR = ./tools
TOOLS_DIR = $(abspath ./.tools)
Expand Down
4 changes: 3 additions & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ func runServer(ctx context.Context, cfg *Config) error {
}
}()

assetService := asset.NewService(asset.ServiceDeps{
assetService, cancel := asset.NewService(asset.ServiceDeps{
AssetRepo: assetRepository,
DiscoveryRepo: discoveryRepository,
LineageRepo: lineageRepository,
Worker: wrkr,
Logger: logger,
})
defer cancel()

// init discussion
discussionRepository, err := postgres.NewDiscussionRepository(pgClient, 0)
Expand Down
1 change: 1 addition & 0 deletions compass.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ worker:
sync_job_timeout: 15m
index_job_timeout: 5s
delete_job_timeout: 5s
max_attempt_retry: 3

client:
host: localhost:8081
Expand Down
4 changes: 4 additions & 0 deletions core/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"time"

"github.com/goto/compass/core/user"
"github.com/goto/compass/pkg/queryexpr"
"github.com/r3labs/diff/v2"
)

type Repository interface {
GetAll(context.Context, Filter) ([]Asset, error)
GetCount(context.Context, Filter) (int, error)
GetCountByQueryExpr(ctx context.Context, queryExpr queryexpr.ExprStr) (int, error)
GetByID(ctx context.Context, id string) (Asset, error)
GetByURN(ctx context.Context, urn string) (Asset, error)
GetVersionHistory(ctx context.Context, flt Filter, id string) ([]Asset, error)
Expand All @@ -21,6 +23,7 @@ type Repository interface {
Upsert(ctx context.Context, ast *Asset) (string, error)
DeleteByID(ctx context.Context, id string) error
DeleteByURN(ctx context.Context, urn string) error
DeleteByQueryExpr(ctx context.Context, queryExpr queryexpr.ExprStr) ([]string, error)
AddProbe(ctx context.Context, assetURN string, probe *Probe) error
GetProbes(ctx context.Context, assetURN string) ([]Probe, error)
GetProbesWithFilter(ctx context.Context, flt ProbesFilter) (map[string][]Probe, error)
Expand All @@ -40,6 +43,7 @@ type Asset struct {
Owners []user.User `json:"owners,omitempty" diff:"owners"`
CreatedAt time.Time `json:"created_at" diff:"-"`
UpdatedAt time.Time `json:"updated_at" diff:"-"`
RefreshedAt *time.Time `json:"refreshed_at" diff:"-"`
Version string `json:"version" diff:"-"`
UpdatedBy user.User `json:"updated_by" diff:"-"`
Changelog diff.Changelog `json:"changelog,omitempty" diff:"-"`
Expand Down
73 changes: 73 additions & 0 deletions core/asset/delete_asset_expr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package asset

import (
"errors"
"fmt"
"strings"

"github.com/goto/compass/pkg/generichelper"
"github.com/goto/compass/pkg/queryexpr"
)

var (
assetJSONTagsSchema = generichelper.GetJSONTags(Asset{})
errTypeOrServiceHasWrongOperator = errors.New("identifier type and service must be equals (==) or IN operator")
errMissRequiredIdentifier = errors.New("must exists these identifiers: refreshed_at, type, and service")
)

type DeleteAssetExpr struct {
queryexpr.ExprStr
}

func (d DeleteAssetExpr) ToQuery() (string, error) {
return d.ExprStr.ToQuery()
}

func (d DeleteAssetExpr) Validate() error {
identifiersWithOperator, err := queryexpr.GetIdentifiersMap(d.ExprStr.String())
if err != nil {
return err
}

if err := d.isRequiredIdentifiersExist(identifiersWithOperator); err != nil {
return err
}

if err := d.isUsingRightOperator(identifiersWithOperator); err != nil {
return err
}

return d.isAllIdentifiersExistInStruct(identifiersWithOperator)
}

func (DeleteAssetExpr) isRequiredIdentifiersExist(identifiersWithOperator map[string]string) error {
isExist := func(jsonTag string) bool {
return identifiersWithOperator[jsonTag] != ""
}
mustExist := isExist("refreshed_at") && isExist("type") && isExist("service")
if !mustExist {
return errMissRequiredIdentifier
}
return nil
}

func (DeleteAssetExpr) isUsingRightOperator(identifiersWithOperator map[string]string) error {
isOperatorEqualsOrIn := func(jsonTag string) bool {
return identifiersWithOperator[jsonTag] == "==" || strings.ToUpper(identifiersWithOperator[jsonTag]) == "IN"
}
if !isOperatorEqualsOrIn("type") || !isOperatorEqualsOrIn("service") {
return errTypeOrServiceHasWrongOperator
}
return nil
}

func (DeleteAssetExpr) isAllIdentifiersExistInStruct(identifiersWithOperator map[string]string) error {
identifiers := generichelper.GetMapKeys(identifiersWithOperator)
for _, identifier := range identifiers {
isFieldValid := generichelper.Contains(assetJSONTagsSchema, identifier)
if !isFieldValid {
return fmt.Errorf("%s is not a valid identifier", identifier)
}
}
return nil
}
150 changes: 150 additions & 0 deletions core/asset/delete_asset_expr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package asset_test

import (
"errors"
"testing"

"github.com/goto/compass/core/asset"
"github.com/goto/compass/pkg/queryexpr"
"github.com/stretchr/testify/assert"
)

func TestDeleteAssetExpr_ToQuery(t *testing.T) {
queryExp := `name == "John" || service not in ["test1","test2","test3"]`
sqlExpr := queryexpr.SQLExpr(queryExp)
esExpr := queryexpr.ESExpr(queryExp)
wrongExpr := queryexpr.SQLExpr("findLast(")
tests := []struct {
name string
exprStr queryexpr.ExprStr
want string
wantErr bool
}{
{
name: "convert to SQL query",
exprStr: asset.DeleteAssetExpr{
ExprStr: sqlExpr,
},
want: "((name = 'John') OR (service NOT IN ('test1', 'test2', 'test3')))",
wantErr: false,
},
{
name: "convert to ES query",
exprStr: asset.DeleteAssetExpr{
ExprStr: esExpr,
},
want: `{"query":{"bool":{"should":[{"term":{"name":"John"}},{"bool":{"must_not":[{"terms":{"service.keyword":["test1","test2","test3"]}}]}}]}}}`,
wantErr: false,
},
{
name: "got error due to wrong syntax",
exprStr: asset.DeleteAssetExpr{
ExprStr: wrongExpr,
},
want: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := asset.DeleteAssetExpr{
ExprStr: tt.exprStr,
}
got, err := d.ToQuery()
if (err != nil) != tt.wantErr {
t.Errorf("ToQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("ToQuery() got = %v, want %v", got, tt.want)
}
})
}
}

func TestDeleteAssetExpr_Validate(t *testing.T) {
tests := []struct {
name string
exprStrFn func() queryexpr.ExprStr
expectErr error
wantErr bool
}{
{
name: "error get identifiers map",
exprStrFn: func() queryexpr.ExprStr {
wrongExpr := queryexpr.SQLExpr("findLast(")
return asset.DeleteAssetExpr{
ExprStr: wrongExpr,
}
},
expectErr: errors.New("error parsing expression"),
wantErr: true,
},
{
name: "error miss refreshed_at not exist",
exprStrFn: func() queryexpr.ExprStr {
missRefreshedAt := queryexpr.SQLExpr(`updated_at < "2023-12-12 23:59:59" && type == "table" && service in ["test1","test2","test3"]`)
return asset.DeleteAssetExpr{
ExprStr: missRefreshedAt,
}
},
expectErr: errors.New("must exists these identifiers: refreshed_at, type, and service"),
wantErr: true,
},
{
name: "error miss type not exist",
exprStrFn: func() queryexpr.ExprStr {
missType := queryexpr.SQLExpr(`refreshed_at < "2023-12-12 23:59:59" && service in ["test1","test2","test3"]`)
return asset.DeleteAssetExpr{
ExprStr: missType,
}
},
expectErr: errors.New("must exists these identifiers: refreshed_at, type, and service"),
wantErr: true,
},
{
name: "error miss service not exist",
exprStrFn: func() queryexpr.ExprStr {
missService := queryexpr.SQLExpr(`refreshed_at < "2023-12-12 23:59:59" && type == "table"`)
return asset.DeleteAssetExpr{
ExprStr: missService,
}
},
expectErr: errors.New("must exists these identifiers: refreshed_at, type, and service"),
wantErr: true,
},
{
name: "error wrong operator for type identifier",
exprStrFn: func() queryexpr.ExprStr {
wrongTypeOperator := queryexpr.SQLExpr(`refreshed_at < "2023-12-12 23:59:59" && type != "table" && service in ["test1","test2","test3"]`)
return asset.DeleteAssetExpr{
ExprStr: wrongTypeOperator,
}
},
expectErr: errors.New("identifier type and service must be equals (==) or IN operator"),
wantErr: true,
},
{
name: "error wrong operator for service identifier",
exprStrFn: func() queryexpr.ExprStr {
wrongServiceOperator := queryexpr.SQLExpr(`refreshed_at < "2023-12-12 23:59:59" && type != "table" && service not in ["test1","test2","test3"]`)
return asset.DeleteAssetExpr{
ExprStr: wrongServiceOperator,
}
},
expectErr: errors.New("identifier type and service must be equals (==) or IN operator"),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.exprStrFn().Validate()
if (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
if err != nil {
assert.ErrorContains(t, err, tt.expectErr.Error())
}
})
}
}
6 changes: 6 additions & 0 deletions core/asset/delete_assets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package asset

type DeleteAssetsRequest struct {
QueryExpr string
DryRun bool
}
2 changes: 2 additions & 0 deletions core/asset/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package asset
//go:generate mockery --name=DiscoveryRepository -r --case underscore --with-expecter --structname DiscoveryRepository --filename discovery_repository.go --output=./mocks
import (
"context"
"github.com/goto/compass/pkg/queryexpr"
)

type DiscoveryRepository interface {
Upsert(context.Context, Asset) error
DeleteByID(ctx context.Context, assetID string) error
DeleteByURN(ctx context.Context, assetURN string) error
DeleteByQueryExpr(ctx context.Context, queryExpr queryexpr.ExprStr) error
Search(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error)
Suggest(ctx context.Context, cfg SearchConfig) (suggestions []string, err error)
GroupAssets(ctx context.Context, cfg GroupConfig) (results []GroupResult, err error)
Expand Down
1 change: 1 addition & 0 deletions core/asset/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var (
ErrEmptyID = errors.New("asset does not have ID")
ErrProbeExists = errors.New("asset probe already exists")
ErrEmptyURN = errors.New("asset does not have URN")
ErrEmptyQuery = errors.New("query is empty")
ErrUnknownType = errors.New("unknown type")
ErrNilAsset = errors.New("nil asset")
)
Expand Down
1 change: 1 addition & 0 deletions core/asset/lineage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type LineageRepository interface {
GetGraph(ctx context.Context, urn string, query LineageQuery) (LineageGraph, error)
Upsert(ctx context.Context, urn string, upstreams, downstreams []string) error
DeleteByURN(ctx context.Context, urn string) error
DeleteByURNs(ctx context.Context, urns []string) error
}

type LineageGraph []LineageEdge
Expand Down
Loading
Loading