Skip to content

Commit

Permalink
chore: fix merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhar-rudder committed Feb 10, 2025
1 parent ec54f5d commit 672a444
Showing 1 changed file with 10 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ func (m *Manager) retryableClient() *retryablehttp.Client {
return client
}

func (m *Manager) validateConfig(ctx context.Context, dest *backendconfig.DestinationT) string {
func (m *Manager) validateConfig(ctx context.Context, dest *backendconfig.DestinationT) error {
dest.Config["useKeyPairAuth"] = true // Since we are currently only supporting key pair auth
response := m.validator.Validate(ctx, dest)
if response.Success {
return ""
return nil
}
return response.Error
return errors.New(response.Error)
}

func (m *Manager) Now() time.Time {
Expand Down Expand Up @@ -188,8 +188,8 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
case errors.Is(err, errAuthz):
m.setBackOff(err)
validationError := m.validateConfig(ctx, asyncDest.Destination)
if validationError != "" {
err = fmt.Errorf("failed to validate snowpipe credentials: %s", validationError)
if validationError != nil {
err = fmt.Errorf("failed to validate snowpipe credentials: %s", validationError.Error())
}
return m.failedJobs(asyncDest, err.Error())
case errors.Is(err, errBackoff):
Expand Down Expand Up @@ -231,7 +231,6 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
)
shouldResetBackoff := true // backoff should be reset if authz error is not encountered for any of the tables
isBackoffSet := false // should not be set again if already set
var failedReason error
for _, info := range uploadInfos {
imInfo, discardImInfo, err := m.sendEventsToSnowpipe(ctx, asyncDest.Destination.ID, &destConf, info)
if err != nil {
Expand All @@ -242,8 +241,8 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
isBackoffSet = true
m.setBackOff(err)
validationError := m.validateConfig(ctx, asyncDest.Destination)
if validationError != "" && failedReason == nil {
failedReason = fmt.Errorf("failed to validate snowpipe credentials: %s", validationError)
if validationError != nil && failedReason == "" {
failedReason = fmt.Sprintf("failed to validate snowpipe credentials: %s", validationError.Error())
}
}
case errors.Is(err, errBackoff):
Expand All @@ -253,10 +252,9 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
logger.NewStringField("table", info.tableName),
obskit.Error(err),
)
if failedReason == nil {
failedReason = err
if failedReason == "" {
failedReason = err.Error()
}

failedJobIDs = append(failedJobIDs, info.jobIDs...)
continue
}
Expand Down Expand Up @@ -292,7 +290,7 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
m.stats.jobs.importing.Count(len(importingJobIDs))
m.stats.jobs.failed.Count(len(failedJobIDs))

asyncUploadOutput := common.AsyncUploadOutput{
return common.AsyncUploadOutput{
ImportingJobIDs: importingJobIDs,
ImportingCount: len(importingJobIDs),
ImportingParameters: importParameters,
Expand All @@ -301,10 +299,6 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
FailedCount: len(failedJobIDs),
DestinationID: asyncDest.Destination.ID,
}
if failedReason != nil {
asyncUploadOutput.FailedReason = failedReason.Error()
}
return asyncUploadOutput
}

func (m *Manager) eventsFromFile(fileName string, eventsCount int) ([]*event, error) {
Expand Down

0 comments on commit 672a444

Please sign in to comment.