Skip to content

Commit

Permalink
exp: cli bq rs validation for loadByFolderPath
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Feb 10, 2025
1 parent d89b8cc commit 84656e1
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 28 deletions.
19 changes: 14 additions & 5 deletions cmd/rudder-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,23 @@ func main() {
Usage: "Test underlying warehouse",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "dest",
Usage: `Specify destination ID to test underlying warehouse`,
Aliases: []string{"d"},
Name: "revisionID",
Usage: `Specify destination revision ID to test underlying warehouse`,
Aliases: []string{"revID"},
},
&cli.StringSliceFlag{
Name: "revisionIDs",
Usage: `Specify destination revision IDs to test underlying warehouse`,
Aliases: []string{"revIDs"},
},
&cli.StringFlag{
Name: "file",
Usage: `Specify comma separated destination revision IDs to test underlying warehouse`,
Aliases: []string{"f"},
},
},
Action: func(c *cli.Context) error {
err := warehouse.ConfigurationTest(c)
return err
return warehouse.ConfigurationTest(c)
},
},
{
Expand Down
111 changes: 90 additions & 21 deletions cmd/rudder-cli/warehouse/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@ package warehouse
import (
"fmt"
"os"
"strings"
"time"

"github.com/olekukonko/tablewriter"
"github.com/samber/lo"
"github.com/urfave/cli/v2"

"github.com/rudderlabs/rudder-go-kit/config"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/cmd/rudder-cli/client"
"github.com/rudderlabs/rudder-server/services/controlplane"
"github.com/rudderlabs/rudder-server/utils/misc"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)

type QueryResult struct {
Expand All @@ -21,15 +31,6 @@ type QueryInput struct {
SQLStatement string
}

type ConfigurationTestInput struct {
DestID string
}

type ConfigurationTestOutput struct {
Valid bool
Error string
}

func Query(c *cli.Context) (err error) {
reply := QueryResult{}

Expand Down Expand Up @@ -69,22 +70,90 @@ func Query(c *cli.Context) (err error) {
return
}

func ConfigurationTest(c *cli.Context) (err error) {
reply := ConfigurationTestOutput{}
func ConfigurationTest(c *cli.Context) error {
misc.Init()
backendconfig.Init()
warehouseutils.Init()
validations.Init()

input := ConfigurationTestInput{
DestID: c.String("dest"),
}
revisionID := c.String("revisionID")
revisionIDs := c.StringSlice("revisionIDs")
revisionFile := c.String("file")

err = client.GetUDSClient().Call("Warehouse.ConfigurationTest", input, &reply)
if err != nil {
return
fmt.Println("revisionID: ", revisionID)
fmt.Println("revisionIDs: ", revisionIDs)
fmt.Println("revisionFile: ", revisionFile)

if strings.TrimSpace(revisionID) == "" && len(revisionIDs) == 0 && strings.TrimSpace(revisionFile) == "" {
return fmt.Errorf("revisionID or revisionIDs is required")
}
if err := backendconfig.Setup(nil); err != nil {
return fmt.Errorf("setting up backend config: %w", err)
}

cpClient := controlplane.NewClient(
config.GetString("CONFIG_BACKEND_URL", "https://api.rudderstack.com"),
backendconfig.DefaultBackendConfig.Identity(),
)

var revisionIDsToValidate []string

if reply.Valid {
fmt.Printf("Successfully validated destID: %s \n", input.DestID)
if len(revisionFile) > 0 {
data, err := os.ReadFile(revisionFile)
if err != nil {
return fmt.Errorf("reading revision file failed with error: %w", err)
}

revisionIDsToValidate = strings.Split(string(data), ",")
} else if revisionID != "" {
revisionIDsToValidate = append(revisionIDsToValidate, revisionID)
} else {
fmt.Printf("Failed validation for destID: %s with err: %s \n", input.DestID, reply.Error)
revisionIDsToValidate = revisionIDs
}
return
if len(revisionIDsToValidate) == 0 {
return fmt.Errorf("revisions are required")
}

for i := range revisionIDsToValidate {
revisionIDsToValidate[i] = strings.TrimSpace(revisionIDsToValidate[i])
}

output := map[string]string{}

for i, revisionID := range revisionIDsToValidate {
fmt.Println(fmt.Sprintf("Validating destination %d of %d with revision ID: %s", i+1, len(revisionIDsToValidate), revisionID))

Check failure on line 124 in cmd/rudder-cli/warehouse/warehouse.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)

destination, err := cpClient.DestinationHistory(c.Context, revisionID)
if err != nil {
return fmt.Errorf("getting destination history failed for revision ID: %s with error: %w", revisionID, err)
}
fmt.Println("Destination history fetched successfully")

res := validations.NewDestinationValidator().Validate(c.Context, &destination)
if res.Success {
output[revisionID] = fmt.Sprintf("Successfully validated destination with revision ID: %s", revisionID)
} else {
output[revisionID] = fmt.Sprintf("Failed to validate destination with revision ID: %s with error: %s", revisionID, res.Error)
}
fmt.Println(output[revisionID])

time.Sleep(1 * time.Second)
}

fmt.Println("Failed destinations revision IDs:")
fmt.Println(lo.Keys(output))

columns := []string{"Revision ID", "Status"}

table := tablewriter.NewWriter(os.Stdout)
table.SetHeader(columns)
table.SetAutoFormatHeaders(false)
table.SetHeaderColor(lo.Map(columns, func(item string, index int) tablewriter.Colors {
return tablewriter.Colors{tablewriter.Bold, tablewriter.BgCyanColor}
})...)
for revisionID, status := range output {
table.Append([]string{revisionID, status})
}
table.Render()
return nil
}
2 changes: 1 addition & 1 deletion warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func New(conf *config.Config, log logger.Logger) *BigQuery {
bq.config.enableDeleteByJobs = conf.GetBool("Warehouse.bigquery.enableDeleteByJobs", false)
bq.config.customPartitionsEnabledWorkspaceIDs = conf.GetStringSlice("Warehouse.bigquery.customPartitionsEnabledWorkspaceIDs", nil)
bq.config.slowQueryThreshold = conf.GetDuration("Warehouse.bigquery.slowQueryThreshold", 5, time.Minute)
bq.config.loadByFolderPath = conf.GetBool("Warehouse.bigquery.loadByFolderPath", false)
bq.config.loadByFolderPath = true

return bq
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats) *Redshift {
rs.config.skipComputingUserLatestTraits = conf.GetBool("Warehouse.redshift.skipComputingUserLatestTraits", false)
rs.config.enableDeleteByJobs = conf.GetBool("Warehouse.redshift.enableDeleteByJobs", false)
rs.config.slowQueryThreshold = conf.GetDuration("Warehouse.redshift.slowQueryThreshold", 5, time.Minute)
rs.config.loadByFolderPath = conf.GetBool("Warehouse.redshift.loadByFolderPath", false)
rs.config.loadByFolderPath = true

return rs
}
Expand Down

0 comments on commit 84656e1

Please sign in to comment.