Skip to content

Commit

Permalink
chore: add mocks client
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Feb 12, 2025
1 parent 1ff1da4 commit 526c759
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
2 changes: 2 additions & 0 deletions internal/transformer-client/client.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:generate mockgen -destination=../../mocks/transformer-client/mock_transformer_client.go -package=mocks_transformer_client github.com/rudderlabs/rudder-server/internal/transformer-client Client

package transformerclient

import (
Expand Down
56 changes: 56 additions & 0 deletions mocks/transformer-client/mock_transformer_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions processor/internal/user_transformer/user_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -37,6 +38,7 @@ type UserTransformer struct {
failOnError config.ValueLoader[bool]
maxRetryBackoffInterval config.ValueLoader[time.Duration]
timeoutDuration time.Duration
collectInstanceLevelStats bool
maxConcurrency int
}
conf *config.Config
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewUserTransformer(conf *config.Config, log logger.Logger, stat stats.Stats
handle.config.maxRetry = conf.GetReloadableIntVar(30, 1, "Processor.maxRetry")
handle.config.failOnError = conf.GetReloadableBoolVar(false, "Processor.Transformer.failOnError")
handle.config.maxRetryBackoffInterval = conf.GetReloadableDurationVar(30, time.Second, "Processor.maxRetryBackoffInterval")
handle.config.collectInstanceLevelStats = conf.GetBool("Processor.collectInstanceLevelStats", false)
return handle
}

Expand Down Expand Up @@ -291,10 +294,24 @@ func (u *UserTransformer) doPost(ctx context.Context, rawJSON []byte, url string

resp, reqErr = u.client.Do(req)
defer func() { httputil.CloseResponse(resp) }()
duration := time.Since(requestStartTime)
u.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(time.Since(requestStartTime))
if reqErr != nil {
return reqErr
}
headerResponseTime := resp.Header.Get("X-Response-Time")
instanceWorker := resp.Header.Get("X-Instance-ID")

if u.config.collectInstanceLevelStats && instanceWorker != "" {
newTags := lo.Assign(tags)
newTags["instanceWorker"] = instanceWorker
dur := duration.Milliseconds()
headerTime, err := strconv.ParseFloat(strings.TrimSuffix(headerResponseTime, "ms"), 64)
if err == nil {
diff := float64(dur) - headerTime
u.stat.NewTaggedStat("processor_transform_duration_diff_time", stats.TimerType, newTags).SendTiming(time.Duration(diff) * time.Millisecond)
}

Check warning on line 313 in processor/internal/user_transformer/user_transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/internal/user_transformer/user_transformer.go#L290-L313

Added lines #L290 - L313 were not covered by tests
}

if !transformer_utils.IsJobTerminated(resp.StatusCode) && resp.StatusCode != transformer_utils.StatusCPDown {
return fmt.Errorf("transformer returned status code: %v", resp.StatusCode)
Expand Down

0 comments on commit 526c759

Please sign in to comment.