Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Code changes for supporting INT32 entities + tests #125

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,627 changes: 4,627 additions & 0 deletions coverage.html

Large diffs are not rendered by default.

950 changes: 950 additions & 0 deletions coverage.out

Large diffs are not rendered by default.

48 changes: 46 additions & 2 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ import (
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
)

type FeatureStoreInterface interface {
GetOnlineFeatures(
ctx context.Context,
featureRefs []string,
featureService *model.FeatureService,
joinKeyToEntityValues map[string]*prototypes.RepeatedValue,
requestData map[string]*prototypes.RepeatedValue,
fullFeatureNames bool) ([]*onlineserving.FeatureVector, error)
}
type FeatureStore struct {
config *registry.RepoConfig
registry *registry.Registry
Expand Down Expand Up @@ -85,7 +94,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
joinKeyToEntityValues map[string]*prototypes.RepeatedValue,
requestData map[string]*prototypes.RepeatedValue,
fullFeatureNames bool) ([]*onlineserving.FeatureVector, error) {
fvs, odFvs, err := fs.listAllViews()
fvs, odFvs, err := fs.ListAllViews()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -230,7 +239,7 @@ func (fs *FeatureStore) GetFeatureService(name string) (*model.FeatureService, e
return fs.registry.GetFeatureService(fs.config.Project, name)
}

func (fs *FeatureStore) listAllViews() (map[string]*model.FeatureView, map[string]*model.OnDemandFeatureView, error) {
func (fs *FeatureStore) ListAllViews() (map[string]*model.FeatureView, map[string]*model.OnDemandFeatureView, error) {
fvs := make(map[string]*model.FeatureView)
odFvs := make(map[string]*model.OnDemandFeatureView)

Expand Down Expand Up @@ -291,6 +300,33 @@ func (fs *FeatureStore) ListEntities(hideDummyEntity bool) ([]*model.Entity, err
return entities, nil
}

func (fs *FeatureStore) GetEntityByKey(entityKey string) (*model.Entity, error) {

entities, err := fs.ListEntities(false)
if err != nil {
return nil, err
}
for _, entity := range entities {
if entity.JoinKey == entityKey {
return entity, nil
}
}
return nil, fmt.Errorf("Entity with key %s not found", entityKey)
}
func (fs *FeatureStore) GetRequestSources(odfvList []*model.OnDemandFeatureView) (map[string]prototypes.ValueType_Enum, error) {

requestSources := make(map[string]prototypes.ValueType_Enum, 0)
if len(odfvList) > 0 {
for _, odfv := range odfvList {
schema := odfv.GetRequestDataSchema()
for name, dtype := range schema {
requestSources[name] = dtype
}
}
}
return requestSources, nil
}

func (fs *FeatureStore) ListOnDemandFeatureViews() ([]*model.OnDemandFeatureView, error) {
return fs.registry.ListOnDemandFeatureViews(fs.config.Project)
}
Expand All @@ -311,6 +347,14 @@ func (fs *FeatureStore) GetFeatureView(featureViewName string, hideDummyEntity b
return fv, nil
}

func (fs *FeatureStore) GetOnDemandFeatureView(featureViewName string) (*model.OnDemandFeatureView, error) {
fv, err := fs.registry.GetOnDemandFeatureView(fs.config.Project, featureViewName)
if err != nil {
return nil, err
}
return fv, nil
}

func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*prototypes.EntityKey,
requestedFeatureViewNames []string,
requestedFeatureNames []string,
Expand Down
84 changes: 68 additions & 16 deletions go/internal/feast/featurestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,17 @@ package feast

import (
"context"
"path/filepath"
"runtime"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/protos/feast/core"
"testing"

"github.com/stretchr/testify/assert"

"github.com/feast-dev/feast/go/internal/feast/onlinestore"
"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/protos/feast/types"
types "github.com/feast-dev/feast/go/protos/feast/types"
)

// Return absolute path to the test_repo registry regardless of the working directory
func getRegistryPath() map[string]interface{} {
// Get the file path of this source file, regardless of the working directory
_, filename, _, ok := runtime.Caller(0)
if !ok {
panic("couldn't find file path of the test file")
}
registry := map[string]interface{}{
"path": filepath.Join(filename, "..", "..", "..", "feature_repo/data/registry.db"),
}
return registry
}

func TestNewFeatureStore(t *testing.T) {
t.Skip("@todo(achals): feature_repo isn't checked in yet")
config := registry.RepoConfig{
Expand Down Expand Up @@ -70,3 +57,68 @@ func TestGetOnlineFeaturesRedis(t *testing.T) {
assert.Nil(t, err)
assert.Len(t, response, 4) // 3 Features + 1 entity = 4 columns (feature vectors) in response
}
func TestGetRequestSources(t *testing.T) {
config := GetRepoConfig()
fs, _ := NewFeatureStore(&config, nil)

odfv := &core.OnDemandFeatureView{
Spec: &core.OnDemandFeatureViewSpec{
Name: "odfv1",
Project: "feature_repo",
Sources: map[string]*core.OnDemandSource{
"odfv1": {
Source: &core.OnDemandSource_RequestDataSource{
RequestDataSource: &core.DataSource{
Name: "request_source_1",
Type: core.DataSource_REQUEST_SOURCE,
Options: &core.DataSource_RequestDataOptions_{
RequestDataOptions: &core.DataSource_RequestDataOptions{
DeprecatedSchema: map[string]types.ValueType_Enum{
"feature1": types.ValueType_INT64,
},
Schema: []*core.FeatureSpecV2{
{
Name: "feat1",
ValueType: types.ValueType_INT64,
},
},
},
},
},
},
},
},
},
}

cached_odfv := &model.OnDemandFeatureView{
Base: model.NewBaseFeatureView("odfv1", []*core.FeatureSpecV2{
{
Name: "feat1",
ValueType: types.ValueType_INT64,
},
}),
SourceFeatureViewProjections: make(map[string]*model.FeatureViewProjection),
SourceRequestDataSources: map[string]*core.DataSource_RequestDataOptions{
"request_source_1": {
Schema: []*core.FeatureSpecV2{
{
Name: "feat1",
ValueType: types.ValueType_INT64,
},
},
},
},
}
fVList := make([]*model.OnDemandFeatureView, 0)
fVList = append(fVList, cached_odfv)
cachedOnDemandFVs := make(map[string]map[string]*core.OnDemandFeatureView)
cachedOnDemandFVs["feature_repo"] = make(map[string]*core.OnDemandFeatureView)
cachedOnDemandFVs["feature_repo"]["odfv1"] = odfv
fs.registry.CachedOnDemandFeatureViews = cachedOnDemandFVs
requestSources, err := fs.GetRequestSources(fVList)

assert.Nil(t, err)
assert.Equal(t, 1, len(requestSources))
assert.Equal(t, types.ValueType_INT64.Enum(), requestSources["feat1"].Enum())
}
11 changes: 7 additions & 4 deletions go/internal/feast/model/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package model

import (
"github.com/feast-dev/feast/go/protos/feast/core"
"github.com/feast-dev/feast/go/protos/feast/types"
)

type Entity struct {
Name string
JoinKey string
Name string
JoinKey string
ValueType types.ValueType_Enum
}

func NewEntityFromProto(proto *core.Entity) *Entity {
return &Entity{
Name: proto.Spec.Name,
JoinKey: proto.Spec.JoinKey,
Name: proto.Spec.Name,
JoinKey: proto.Spec.JoinKey,
ValueType: proto.Spec.ValueType,
}
}
56 changes: 28 additions & 28 deletions go/internal/feast/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ var REGISTRY_STORE_CLASS_FOR_SCHEME map[string]string = map[string]string{
type Registry struct {
project string
registryStore RegistryStore
cachedFeatureServices map[string]map[string]*core.FeatureService
cachedEntities map[string]map[string]*core.Entity
cachedFeatureViews map[string]map[string]*core.FeatureView
CachedFeatureServices map[string]map[string]*core.FeatureService
CachedEntities map[string]map[string]*core.Entity
CachedFeatureViews map[string]map[string]*core.FeatureView
cachedStreamFeatureViews map[string]map[string]*core.StreamFeatureView
cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView
CachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView
cachedRegistry *core.Registry
cachedRegistryProtoLastUpdated time.Time
cachedRegistryProtoTtl time.Duration
Expand Down Expand Up @@ -113,11 +113,11 @@ func (r *Registry) load(registry *core.Registry) {
r.mu.Lock()
defer r.mu.Unlock()
r.cachedRegistry = registry
r.cachedFeatureServices = make(map[string]map[string]*core.FeatureService)
r.cachedEntities = make(map[string]map[string]*core.Entity)
r.cachedFeatureViews = make(map[string]map[string]*core.FeatureView)
r.CachedFeatureServices = make(map[string]map[string]*core.FeatureService)
r.CachedEntities = make(map[string]map[string]*core.Entity)
r.CachedFeatureViews = make(map[string]map[string]*core.FeatureView)
r.cachedStreamFeatureViews = make(map[string]map[string]*core.StreamFeatureView)
r.cachedOnDemandFeatureViews = make(map[string]map[string]*core.OnDemandFeatureView)
r.CachedOnDemandFeatureViews = make(map[string]map[string]*core.OnDemandFeatureView)
r.loadEntities(registry)
r.loadFeatureServices(registry)
r.loadFeatureViews(registry)
Expand All @@ -129,30 +129,30 @@ func (r *Registry) load(registry *core.Registry) {
func (r *Registry) loadEntities(registry *core.Registry) {
entities := registry.Entities
for _, entity := range entities {
if _, ok := r.cachedEntities[r.project]; !ok {
r.cachedEntities[r.project] = make(map[string]*core.Entity)
if _, ok := r.CachedEntities[r.project]; !ok {
r.CachedEntities[r.project] = make(map[string]*core.Entity)
}
r.cachedEntities[r.project][entity.Spec.Name] = entity
r.CachedEntities[r.project][entity.Spec.Name] = entity
}
}

func (r *Registry) loadFeatureServices(registry *core.Registry) {
featureServices := registry.FeatureServices
for _, featureService := range featureServices {
if _, ok := r.cachedFeatureServices[r.project]; !ok {
r.cachedFeatureServices[r.project] = make(map[string]*core.FeatureService)
if _, ok := r.CachedFeatureServices[r.project]; !ok {
r.CachedFeatureServices[r.project] = make(map[string]*core.FeatureService)
}
r.cachedFeatureServices[r.project][featureService.Spec.Name] = featureService
r.CachedFeatureServices[r.project][featureService.Spec.Name] = featureService
}
}

func (r *Registry) loadFeatureViews(registry *core.Registry) {
featureViews := registry.FeatureViews
for _, featureView := range featureViews {
if _, ok := r.cachedFeatureViews[r.project]; !ok {
r.cachedFeatureViews[r.project] = make(map[string]*core.FeatureView)
if _, ok := r.CachedFeatureViews[r.project]; !ok {
r.CachedFeatureViews[r.project] = make(map[string]*core.FeatureView)
}
r.cachedFeatureViews[r.project][featureView.Spec.Name] = featureView
r.CachedFeatureViews[r.project][featureView.Spec.Name] = featureView
}
}

Expand All @@ -169,10 +169,10 @@ func (r *Registry) loadStreamFeatureViews(registry *core.Registry) {
func (r *Registry) loadOnDemandFeatureViews(registry *core.Registry) {
onDemandFeatureViews := registry.OnDemandFeatureViews
for _, onDemandFeatureView := range onDemandFeatureViews {
if _, ok := r.cachedOnDemandFeatureViews[r.project]; !ok {
r.cachedOnDemandFeatureViews[r.project] = make(map[string]*core.OnDemandFeatureView)
if _, ok := r.CachedOnDemandFeatureViews[r.project]; !ok {
r.CachedOnDemandFeatureViews[r.project] = make(map[string]*core.OnDemandFeatureView)
}
r.cachedOnDemandFeatureViews[r.project][onDemandFeatureView.Spec.Name] = onDemandFeatureView
r.CachedOnDemandFeatureViews[r.project][onDemandFeatureView.Spec.Name] = onDemandFeatureView
}
}

Expand All @@ -184,7 +184,7 @@ func (r *Registry) loadOnDemandFeatureViews(registry *core.Registry) {
func (r *Registry) ListEntities(project string) ([]*model.Entity, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedEntities, ok := r.cachedEntities[project]; !ok {
if cachedEntities, ok := r.CachedEntities[project]; !ok {
return []*model.Entity{}, nil
} else {
entities := make([]*model.Entity, len(cachedEntities))
Expand All @@ -205,7 +205,7 @@ func (r *Registry) ListEntities(project string) ([]*model.Entity, error) {
func (r *Registry) ListFeatureViews(project string) ([]*model.FeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok {
if cachedFeatureViews, ok := r.CachedFeatureViews[project]; !ok {
return []*model.FeatureView{}, nil
} else {
featureViews := make([]*model.FeatureView, len(cachedFeatureViews))
Expand Down Expand Up @@ -247,7 +247,7 @@ func (r *Registry) ListStreamFeatureViews(project string) ([]*model.FeatureView,
func (r *Registry) ListFeatureServices(project string) ([]*model.FeatureService, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
if cachedFeatureServices, ok := r.CachedFeatureServices[project]; !ok {
return []*model.FeatureService{}, nil
} else {
featureServices := make([]*model.FeatureService, len(cachedFeatureServices))
Expand All @@ -268,7 +268,7 @@ func (r *Registry) ListFeatureServices(project string) ([]*model.FeatureService,
func (r *Registry) ListOnDemandFeatureViews(project string) ([]*model.OnDemandFeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok {
if cachedOnDemandFeatureViews, ok := r.CachedOnDemandFeatureViews[project]; !ok {
return []*model.OnDemandFeatureView{}, nil
} else {
onDemandFeatureViews := make([]*model.OnDemandFeatureView, len(cachedOnDemandFeatureViews))
Expand All @@ -284,7 +284,7 @@ func (r *Registry) ListOnDemandFeatureViews(project string) ([]*model.OnDemandFe
func (r *Registry) GetEntity(project, entityName string) (*model.Entity, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedEntities, ok := r.cachedEntities[project]; !ok {
if cachedEntities, ok := r.CachedEntities[project]; !ok {
return nil, fmt.Errorf("no cached entities found for project %s", project)
} else {
if entity, ok := cachedEntities[entityName]; !ok {
Expand All @@ -298,7 +298,7 @@ func (r *Registry) GetEntity(project, entityName string) (*model.Entity, error)
func (r *Registry) GetFeatureView(project, featureViewName string) (*model.FeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok {
if cachedFeatureViews, ok := r.CachedFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached feature views found for project %s", project)
} else {
if featureViewProto, ok := cachedFeatureViews[featureViewName]; !ok {
Expand Down Expand Up @@ -326,7 +326,7 @@ func (r *Registry) GetStreamFeatureView(project, streamFeatureViewName string) (
func (r *Registry) GetFeatureService(project, featureServiceName string) (*model.FeatureService, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
if cachedFeatureServices, ok := r.CachedFeatureServices[project]; !ok {
return nil, fmt.Errorf("no cached feature services found for project %s", project)
} else {
if featureServiceProto, ok := cachedFeatureServices[featureServiceName]; !ok {
Expand All @@ -340,7 +340,7 @@ func (r *Registry) GetFeatureService(project, featureServiceName string) (*model
func (r *Registry) GetOnDemandFeatureView(project, onDemandFeatureViewName string) (*model.OnDemandFeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok {
if cachedOnDemandFeatureViews, ok := r.CachedOnDemandFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached on demand feature views found for project %s", project)
} else {
if onDemandFeatureViewProto, ok := cachedOnDemandFeatureViews[onDemandFeatureViewName]; !ok {
Expand Down
Loading
Loading