Skip to content

Commit

Permalink
feat: support resource spec v2 (#274)
Browse files Browse the repository at this point in the history
* feat: add 1st version of resource spec v2 parser

* feat: restructure

* feat: add ReadByURN method for resource, update GetByURN to use it

* feat: restructure function, add unit tests

* feat: use urncomponent for dataset and resource name

* feat: add unit test for update & create

* feat: restore empty name checking

* fix
  • Loading branch information
ahmadnaufal authored Oct 14, 2024
1 parent b88fa7f commit 51a43a2
Show file tree
Hide file tree
Showing 12 changed files with 617 additions and 29 deletions.
6 changes: 3 additions & 3 deletions core/resource/handler/v1beta1/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,9 @@ func (rh ResourceHandler) GetResourceChangelogs(ctx context.Context, req *pb.Get
return nil, errors.GRPCErr(err, "invalid project name")
}

resourceName, err := resource.NameFrom(req.GetResourceName())
if err != nil {
return nil, errors.GRPCErr(err, "invalid resource name")
resourceName := resource.Name(req.GetResourceName())
if resourceName == "" {
return nil, errors.GRPCErr(errors.InvalidArgument(resource.EntityResource, "resource name is empty"), "invalid parameter")
}

changelogs, err := rh.changelogService.GetChangelogs(ctx, projectName, resourceName)
Expand Down
13 changes: 13 additions & 0 deletions core/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ const (

UnspecifiedImpactChange UpdateImpact = "unspecified_impact"
ResourceDataPipeLineImpact UpdateImpact = "data_impact"

ResourceSpecV1 = 1
ResourceSpecV2 = 2

DefaultResourceSpecVersion = ResourceSpecV1
)

type UpdateImpact string
Expand Down Expand Up @@ -183,6 +188,14 @@ func (r *Resource) Spec() map[string]any {
return r.spec
}

func (r *Resource) Version() int32 {
if r.metadata == nil || r.metadata.Version == 0 {
return DefaultResourceSpecVersion
}

return r.metadata.Version
}

func (r *Resource) Equal(incoming *Resource) bool {
if r == nil || incoming == nil {
return r == nil && incoming == nil
Expand Down
37 changes: 37 additions & 0 deletions core/resource/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,4 +604,41 @@ func TestResource(t *testing.T) {
})
})
})

t.Run("Version", func(t *testing.T) {
t.Run("returns default version if no version is provided", func(t *testing.T) {
tnnt, tnntErr := tenant.NewTenant("proj", "ns")
assert.Nil(t, tnntErr)

metadata := &resource.Metadata{
Description: "description",
}
spec := map[string]any{
"description": "spec for unit test",
}
res, err := resource.NewResource("proj.set.res_name", "table", resource.Bigquery, tnnt, metadata, spec)
assert.Nil(t, err)

version := res.Version()
assert.Equal(t, int32(resource.DefaultResourceSpecVersion), version)
})

t.Run("returns version from metadata if metadata is not nil", func(t *testing.T) {
tnnt, tnntErr := tenant.NewTenant("proj", "ns")
assert.Nil(t, tnntErr)

meta := &resource.Metadata{
Version: resource.ResourceSpecV2,
Description: "description",
}
spec := map[string]any{
"description": "spec for unit test",
}
res, err := resource.NewResource("proj.set.res_name", "table", resource.Bigquery, tnnt, meta, spec)
assert.Nil(t, err)

version := res.Version()
assert.Equal(t, int32(resource.ResourceSpecV2), version)
})
})
}
13 changes: 2 additions & 11 deletions core/resource/service/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ResourceRepository interface {
ReadByFullName(ctx context.Context, tnnt tenant.Tenant, store resource.Store, fullName string, onlyActive bool) (*resource.Resource, error)
ReadAll(ctx context.Context, tnnt tenant.Tenant, store resource.Store, onlyActive bool) ([]*resource.Resource, error)
GetResources(ctx context.Context, tnnt tenant.Tenant, store resource.Store, names []string) ([]*resource.Resource, error)
ReadByURN(ctx context.Context, tnnt tenant.Tenant, urn resource.URN) (*resource.Resource, error)
}

type ResourceManager interface {
Expand Down Expand Up @@ -348,17 +349,7 @@ func (rs ResourceService) GetByURN(ctx context.Context, tnnt tenant.Tenant, urn
return nil, errors.InvalidArgument(resource.EntityResource, "urn is zero value")
}

store, err := resource.FromStringToStore(urn.GetStore())
if err != nil {
return nil, err
}

name, err := resource.NameFrom(urn.GetName())
if err != nil {
return nil, err
}

return rs.repo.ReadByFullName(ctx, tnnt, store, name.String(), false)
return rs.repo.ReadByURN(ctx, tnnt, urn)
}

func (rs ResourceService) ExistInStore(ctx context.Context, tnnt tenant.Tenant, urn resource.URN) (bool, error) {
Expand Down
59 changes: 59 additions & 0 deletions core/resource/service/resource_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,57 @@ func TestResourceService(t *testing.T) {
assert.Nil(t, actual)
})
})

t.Run("GetByURN", func(t *testing.T) {
t.Run("returns error if urn is zero value", func(t *testing.T) {
mgr := NewResourceManager(t)
repo := newResourceRepository(t)
logger := log.NewLogrus()

rscService := service.NewResourceService(logger, repo, nil, mgr, nil, nil, nil)

_, err := rscService.GetByURN(ctx, tnnt, resource.ZeroURN())
assert.Error(t, err)
assert.ErrorContains(t, err, "urn is zero value")
})

t.Run("returns error if repo returns error", func(t *testing.T) {
mgr := NewResourceManager(t)
repo := newResourceRepository(t)
logger := log.NewLogrus()

urn, err := resource.ParseURN("bigquery://project:dataset.table")
assert.NoError(t, err)

repo.On("ReadByURN", ctx, tnnt, urn).Return(nil, errors.New("unknown error"))

rscService := service.NewResourceService(logger, repo, nil, mgr, nil, nil, nil)

_, err = rscService.GetByURN(ctx, tnnt, urn)
assert.Error(t, err)
assert.ErrorContains(t, err, "unknown error")
})

t.Run("returns resource if no error is encountered", func(t *testing.T) {
mgr := NewResourceManager(t)
repo := newResourceRepository(t)
logger := log.NewLogrus()

urn, err := resource.ParseURN("bigquery://project:dataset.table")
assert.NoError(t, err)

expectedResource, err := resource.NewResource("project.dataset", "dataset", resource.Bigquery, tnnt, meta, spec)
assert.NoError(t, err)

repo.On("ReadByURN", ctx, tnnt, urn).Return(expectedResource, nil)

rscService := service.NewResourceService(logger, repo, nil, mgr, nil, nil, nil)

actualResource, err := rscService.GetByURN(ctx, tnnt, urn)
assert.NoError(t, err)
assert.Equal(t, expectedResource, actualResource)
})
})
}

type mockResourceRepository struct {
Expand Down Expand Up @@ -1465,6 +1516,14 @@ func (m *mockResourceRepository) Delete(ctx context.Context, res *resource.Resou
return m.Called(ctx, res).Error(0)
}

func (m *mockResourceRepository) ReadByURN(ctx context.Context, tnnt tenant.Tenant, urn resource.URN) (*resource.Resource, error) {
args := m.Called(ctx, tnnt, urn)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*resource.Resource), args.Error(1)
}

type mockConstructorTestingTNewResourceRepository interface {
mock.TestingT
Cleanup(func())
Expand Down
6 changes: 1 addition & 5 deletions ext/store/bigquery/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ func CreateIfDatasetDoesNotExist(ctx context.Context, client Client, dataset Dat
}

func BackupTable(ctx context.Context, backup *resource.Backup, source *resource.Resource, client Client) (string, error) {
sourceDataset, err := DataSetFor(source.Name())
if err != nil {
return "", err
}
sourceName, err := ResourceNameFor(source.Name(), source.Kind())
sourceDataset, sourceName, err := getDatasetAndResourceName(source)
if err != nil {
return "", err
}
Expand Down
40 changes: 30 additions & 10 deletions ext/store/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,7 @@ func (s Store) Create(ctx context.Context, res *resource.Resource) error {
}
defer client.Close()

dataset, err := DataSetFor(res.Name())
if err != nil {
return err
}
resourceName, err := ResourceNameFor(res.Name(), res.Kind())
dataset, resourceName, err := getDatasetAndResourceName(res)
if err != nil {
return err
}
Expand Down Expand Up @@ -123,11 +119,7 @@ func (s Store) Update(ctx context.Context, res *resource.Resource) error {
}
defer client.Close()

dataset, err := DataSetFor(res.Name())
if err != nil {
return err
}
resourceName, err := ResourceNameFor(res.Name(), res.Kind())
dataset, resourceName, err := getDatasetAndResourceName(res)
if err != nil {
return err
}
Expand All @@ -154,6 +146,34 @@ func (s Store) Update(ctx context.Context, res *resource.Resource) error {
}
}

func getDatasetAndResourceName(res *resource.Resource) (Dataset, string, error) {
if res.Version() == resource.ResourceSpecV2 {
bqURN, err := getURNComponent(res)
if err != nil {
return Dataset{}, "", err
}

dataset, err := DataSetFrom(bqURN.Project, bqURN.Dataset)
if err != nil {
return Dataset{}, "", err
}

return dataset, bqURN.Name, nil
}

dataset, err := DataSetFor(res.Name())
if err != nil {
return Dataset{}, "", err
}

resourceName, err := ResourceNameFor(res.Name(), res.Kind())
if err != nil {
return Dataset{}, "", err
}

return dataset, resourceName, nil
}

func (s Store) BatchUpdate(ctx context.Context, resources []*resource.Resource) error {
spanCtx, span := startChildSpan(ctx, "bigquery/BatchUpdate")
defer span.End()
Expand Down
Loading

0 comments on commit 51a43a2

Please sign in to comment.