Skip to content

Commit

Permalink
feat: enable column level search
Browse files Browse the repository at this point in the history
  • Loading branch information
anjaliagg9791 committed Oct 6, 2023
1 parent 868accc commit 809bba9
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 71 deletions.
3 changes: 3 additions & 0 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type Config struct {

// Client
Client client.Config `mapstructure:"client"`

// Column search excluded keyword list
ColSearchExclusionKeywords string `yaml:"col_search_excluded_keywords" mapstructure:"col_search_excluded_keywords"`
}

func LoadConfig() (*Config, error) {
Expand Down
3 changes: 2 additions & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"strings"

"github.com/MakeNowJust/heredoc"
"github.com/goto/compass/core/asset"
Expand Down Expand Up @@ -131,7 +132,7 @@ func runServer(ctx context.Context, cfg *Config) error {
if err != nil {
return fmt.Errorf("create new asset repository: %w", err)
}
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout)
discoveryRepository := esStore.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout, strings.Split(cfg.ColSearchExclusionKeywords, ","))
lineageRepository, err := postgres.NewLineageRepository(pgClient)
if err != nil {
return fmt.Errorf("create new lineage repository: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/MakeNowJust/heredoc"
"github.com/goto/compass/internal/store/elasticsearch"
Expand Down Expand Up @@ -67,9 +68,10 @@ func runWorker(ctx context.Context, cfg *Config) error {
}

mgr, err := workermanager.New(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout),
Logger: logger,
Config: cfg.Worker,
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout,
strings.Split(cfg.ColSearchExclusionKeywords, ",")),
Logger: logger,
})
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions core/asset/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type SearchFlags struct {

// DisableFuzzy disables fuzziness on search
DisableFuzzy bool

IsColumnSearch bool
}

// SearchConfig represents a search query along
Expand Down
1 change: 1 addition & 0 deletions internal/server/v1beta1/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,6 @@ func getSearchFlagsFromFlags(inputFlags *compassv1beta1.SearchFlags) asset.Searc
return asset.SearchFlags{
EnableHighlight: inputFlags.GetEnableHighlight(),
DisableFuzzy: inputFlags.GetDisableFuzzy(),
IsColumnSearch: inputFlags.GetIsColumnSearch(),
}
}
16 changes: 9 additions & 7 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ import (
// DiscoveryRepository implements discovery.Repository
// with elasticsearch as the backing store.
type DiscoveryRepository struct {
cli *Client
logger log.Logger
requestTimeout time.Duration
cli *Client
logger log.Logger
requestTimeout time.Duration
columnSearchExclusionList []string
}

func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout time.Duration) *DiscoveryRepository {
func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout time.Duration, colSearchExclusionList []string) *DiscoveryRepository {
return &DiscoveryRepository{
cli: cli,
logger: logger,
requestTimeout: requestTimeout,
cli: cli,
logger: logger,
requestTimeout: requestTimeout,
columnSearchExclusionList: colSearchExclusionList,
}
}

Expand Down
23 changes: 10 additions & 13 deletions internal/store/elasticsearch/discovery_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
store.WithClient(cli),
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})
err = repo.Upsert(ctx, asset.Asset{
ID: "",
Type: asset.TypeTable,
Expand All @@ -50,7 +49,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})
err = repo.Upsert(ctx, asset.Asset{
ID: "sample-id",
Type: asset.Type("unknown-type"),
Expand All @@ -68,8 +67,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
store.WithClient(cli),
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

// upsert with create_time as a object
err = repo.Upsert(ctx, asset.Asset{
Expand Down Expand Up @@ -129,7 +127,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})
err = repo.Upsert(ctx, ast)
assert.NoError(t, err)

Expand Down Expand Up @@ -177,8 +175,7 @@ func TestDiscoveryRepositoryUpsert(t *testing.T) {
store.WithClient(cli),
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

err = repo.Upsert(ctx, existingAsset)
assert.NoError(t, err)
Expand Down Expand Up @@ -219,7 +216,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})
err = repo.DeleteByID(ctx, "")
assert.ErrorIs(t, err, asset.ErrEmptyID)
})
Expand All @@ -241,7 +238,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

err = repo.Upsert(ctx, ast)
require.NoError(t, err)
Expand Down Expand Up @@ -288,7 +285,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)
Expand Down Expand Up @@ -319,7 +316,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

t.Run("should return error if the given urn is empty", func(t *testing.T) {
err = repo.DeleteByURN(ctx, "")
Expand Down Expand Up @@ -378,7 +375,7 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10)
repo := store.NewDiscoveryRepository(esClient, log.NewNoop(), time.Second*10, []string{"number", "id"})

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)
Expand Down
112 changes: 100 additions & 12 deletions internal/store/elasticsearch/discovery_search_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (repo *DiscoveryRepository) Search(ctx context.Context, cfg asset.SearchCon
})
}(time.Now())

query, err := buildQuery(cfg)
query, err := repo.buildQuery(cfg)
if err != nil {
return nil, asset.DiscoveryError{Op: "Search", Err: fmt.Errorf("build query: %w", err)}
}
Expand Down Expand Up @@ -197,17 +197,109 @@ func (repo *DiscoveryRepository) Suggest(ctx context.Context, config asset.Searc
return results, nil
}

func buildQuery(cfg asset.SearchConfig) (io.Reader, error) {
func (repo *DiscoveryRepository) buildColumnQuery(query *elastic.BoolQuery, cfg asset.SearchConfig, field string) *elastic.Highlight {
matchString := cfg.Text
for _, exclusionStr := range repo.columnSearchExclusionList {
exclusionStr = strings.TrimSpace(exclusionStr)
if strings.Contains(matchString, exclusionStr) {
matchString = strings.ReplaceAll(matchString, fmt.Sprintf("_%s", exclusionStr), "")
matchString = strings.ReplaceAll(matchString, fmt.Sprintf(" %s", exclusionStr), "")
matchString = strings.ReplaceAll(matchString, fmt.Sprintf("-%s", exclusionStr), "")
}
}

if matchString == "" {
matchString = cfg.Text
}

queries := make([]elastic.Query, 0)
termQuery := elastic.NewTermQuery(
fmt.Sprintf("%s.keyword", field),
cfg.Text,
).Boost(20)

descriptionTermQuery := elastic.NewTermQuery(
fmt.Sprintf("%s.keyword", "data.columns.description"),
cfg.Text,
)
phraseQuery := elastic.NewMultiMatchQuery(
cfg.Text,
[]string{
"data.columns.name^10",
"data.columns.description",
}...,
).Type("phrase")

matchQuery := elastic.NewMultiMatchQuery(
matchString,
[]string{
"data.columns.name^5",
"data.columns.description",
}...,
)

andMatchQuery := elastic.NewMultiMatchQuery(
matchString,
[]string{
"data.columns.name^5",
"data.columns.description",
}...,
).Operator("and")

multiMatchQueries := []*elastic.MultiMatchQuery{matchQuery, andMatchQuery}
queries = append(queries, termQuery, descriptionTermQuery, phraseQuery)
query.Should(queries...)
highlightQuery := make([]elastic.Query, 0)
highlightQuery = append(highlightQuery, queries...)
for _, q := range multiMatchQueries {
if !cfg.Flags.DisableFuzzy {
updatedQuery := q.Fuzziness("AUTO")
highlightQuery = append(highlightQuery, updatedQuery)
}
query.Should(q)
}

if cfg.Flags.EnableHighlight {
return elastic.NewHighlight().
Order("score").
Field("data.columns.name").
Field("data.columns.description").
HighlightQuery(
elastic.NewBoolQuery().
Should(highlightQuery...),
)
}

return nil
}

func (repo *DiscoveryRepository) buildQuery(cfg asset.SearchConfig) (io.Reader, error) {
boolQuery := elastic.NewBoolQuery()
buildTextQuery(boolQuery, cfg)
var highlightQuery *elastic.Highlight
field := ""

// if the search text is empty, do a match all query and return results
if strings.TrimSpace(cfg.Text) == "" {
boolQuery.Should(elastic.NewMatchAllQuery())
highlightQuery = buildHighlightQuery(cfg)
} else {
if cfg.Flags.IsColumnSearch {
field = "data.columns.name"
highlightQuery = repo.buildColumnQuery(boolQuery, cfg, field)
} else {
field = "name"
buildTextQuery(boolQuery, cfg)
highlightQuery = buildHighlightQuery(cfg)
}
}

buildFilterTermQueries(boolQuery, cfg.Filters)
buildMustMatchQueries(boolQuery, cfg)
query := buildFunctionScoreQuery(boolQuery, cfg.RankBy, cfg.Text)
highlight := buildHighlightQuery(cfg)
query := buildFunctionScoreQuery(boolQuery, cfg.RankBy, cfg.Text, field)

body, err := elastic.NewSearchRequest().
Query(query).
Highlight(highlight).
Highlight(highlightQuery).
MinScore(defaultMinScore).
Body()
if err != nil {
Expand Down Expand Up @@ -240,10 +332,6 @@ func buildSuggestQuery(cfg asset.SearchConfig) (io.Reader, error) {
}

func buildTextQuery(q *elastic.BoolQuery, cfg asset.SearchConfig) {
if strings.TrimSpace(cfg.Text) == "" {
q.Should(elastic.NewMatchAllQuery())
}

boostedFields := []string{"urn^10", "name^5"}
q.Should(
// Phrase query cannot have `FUZZINESS`
Expand Down Expand Up @@ -314,12 +402,12 @@ func buildFilterExistsQueries(q *elastic.BoolQuery, fields []string) {
}
}

func buildFunctionScoreQuery(query elastic.Query, rankBy, text string) elastic.Query {
func buildFunctionScoreQuery(query elastic.Query, rankBy, text, field string) elastic.Query {
// Added exact match term query here so that exact match gets higher priority.
fsQuery := elastic.NewFunctionScoreQuery()
if text != "" {
fsQuery.Add(
elastic.NewTermQuery("name.keyword", text),
elastic.NewTermQuery(fmt.Sprintf("%s.keyword", field), text),
elastic.NewWeightFactorFunction(2),
)
}
Expand Down
Loading

0 comments on commit 809bba9

Please sign in to comment.