Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace min/max helpers with built-in min/max #6674

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions bench/lib/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/uber-go/tally"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"

"github.com/uber/cadence/common"
)

// counters go here
Expand Down Expand Up @@ -86,7 +84,7 @@ func RecordActivityStart(
scheduledTimeNanos int64,
) (tally.Scope, tally.Stopwatch) {
scope = scope.Tagged(map[string]string{"operation": name})
elapsed := common.MaxInt64(0, time.Now().UnixNano()-scheduledTimeNanos)
elapsed := max(0, time.Now().UnixNano()-scheduledTimeNanos)
scope.Timer(startLatency).Record(time.Duration(elapsed))
scope.Counter(startedCount).Inc(1)
sw := scope.Timer(latency).Start()
Expand Down Expand Up @@ -125,7 +123,7 @@ func recordWorkflowStart(
) *WorkflowMetricsProfile {
now := workflow.Now(ctx).UnixNano()
scope := workflowMetricScope(ctx, wfType)
elapsed := common.MaxInt64(0, now-scheduledTimeNanos)
elapsed := max(0, now-scheduledTimeNanos)
scope.Timer(startLatency).Record(time.Duration(elapsed))
scope.Counter(startedCount).Inc(1)
return &WorkflowMetricsProfile{
Expand Down
7 changes: 0 additions & 7 deletions canary/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ import (
"go.uber.org/zap"
)

func maxInt64(a, b int64) int64 {
if a > b {
return a
}
return b
}

func absDurationDiff(d1, d2 time.Duration) time.Duration {
if d1 > d2 {
return d1 - d2
Expand Down
4 changes: 2 additions & 2 deletions canary/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func workflowMetricScope(ctx workflow.Context, wfType string) tally.Scope {
func recordWorkflowStart(ctx workflow.Context, wfType string, scheduledTimeNanos int64) *workflowMetricsProfile {
now := workflow.Now(ctx).UnixNano()
scope := workflowMetricScope(ctx, wfType)
elapsed := maxInt64(0, now-scheduledTimeNanos)
elapsed := max(0, now-scheduledTimeNanos)
scope.Timer(startLatency).Record(time.Duration(elapsed))
scope.Counter(startedCount).Inc(1)
return &workflowMetricsProfile{
Expand Down Expand Up @@ -117,7 +117,7 @@ func (profile *workflowMetricsProfile) end(err error) error {
func recordActivityStart(
scope tally.Scope, activityType string, scheduledTimeNanos int64) (tally.Scope, tally.Stopwatch) {
scope = scope.Tagged(map[string]string{"operation": activityType})
elapsed := maxInt64(0, time.Now().UnixNano()-scheduledTimeNanos)
elapsed := max(0, time.Now().UnixNano()-scheduledTimeNanos)
scope.Timer(startLatency).Record(time.Duration(elapsed))
scope.Counter(startedCount).Inc(1)
sw := scope.Timer(latency).Start()
Expand Down
8 changes: 4 additions & 4 deletions common/archiver/filestore/queryParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ func (p *queryParser) convertCloseTime(timestamp int64, op string, parsedQuery *
return err
}
case "<":
parsedQuery.latestCloseTime = common.MinInt64(parsedQuery.latestCloseTime, timestamp-1)
parsedQuery.latestCloseTime = min(parsedQuery.latestCloseTime, timestamp-1)
case "<=":
parsedQuery.latestCloseTime = common.MinInt64(parsedQuery.latestCloseTime, timestamp)
parsedQuery.latestCloseTime = min(parsedQuery.latestCloseTime, timestamp)
case ">":
parsedQuery.earliestCloseTime = common.MaxInt64(parsedQuery.earliestCloseTime, timestamp+1)
parsedQuery.earliestCloseTime = max(parsedQuery.earliestCloseTime, timestamp+1)
case ">=":
parsedQuery.earliestCloseTime = common.MaxInt64(parsedQuery.earliestCloseTime, timestamp)
parsedQuery.earliestCloseTime = max(parsedQuery.earliestCloseTime, timestamp)
default:
return fmt.Errorf("operator %s is not supported for close time", op)
}
Expand Down
2 changes: 1 addition & 1 deletion common/collection/concurrent_tx_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type (
func NewShardedConcurrentTxMap(initialCap int, hashfn HashFunc) ConcurrentTxMap {
cmap := new(ShardedConcurrentTxMap)
cmap.hashfn = hashfn
cmap.initialCap = MaxInt(nShards, initialCap/nShards)
cmap.initialCap = max(nShards, initialCap/nShards)
return cmap
}

Expand Down
32 changes: 0 additions & 32 deletions common/collection/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,3 @@ func UUIDHashCode(input interface{}) uint32 {
}
return binary.BigEndian.Uint32(b)
}

// MinInt returns the min of given two integers
func MinInt(a, b int) int {
if a > b {
return b
}
return a
}

// MaxInt returns the max of given two integers
func MaxInt(a, b int) int {
if a > b {
return a
}
return b
}

// MinInt64 returns the min of given two integers
func MinInt64(a, b int64) int64 {
if a > b {
return b
}
return a
}

// MaxInt64 returns the max of given two integers
func MaxInt64(a, b int64) int64 {
if a > b {
return a
}
return b
}
2 changes: 1 addition & 1 deletion common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,7 @@ func updateFailoverHistory(
failoverHistory = append([]FailoverEvent{newFailoverEvent}, failoverHistory...)

// Truncate the history to the max size
failoverHistoryJSON, err := json.Marshal(failoverHistory[:common.MinInt(config.FailoverHistoryMaxSize(info.Name), len(failoverHistory))])
failoverHistoryJSON, err := json.Marshal(failoverHistory[:min(config.FailoverHistoryMaxSize(info.Name), len(failoverHistory))])
if err != nil {
return err
}
Expand Down
7 changes: 0 additions & 7 deletions common/persistence/data_store_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,3 @@ func TestDataBlob(t *testing.T) {
})
})
}

func max[T ~int32](a, b T) T {
if a > b {
return a
}
return b
}
3 changes: 1 addition & 2 deletions common/persistence/sql/sql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/log"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/serialization"
Expand Down Expand Up @@ -1053,7 +1052,7 @@ func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxR
}
}

maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel)
maxReadLevelInclusive = max(readLevel+int64(request.BatchSize), request.MaxReadLevel)
return readLevel, maxReadLevelInclusive, nil
}

Expand Down
8 changes: 0 additions & 8 deletions common/quotas/global/collection/internal/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,3 @@ func (b *FallbackLimiter) both() quotas.Limiter {
}
return NewShadowedLimiter(b.primary, b.fallback)
}

// intentionally shadows builtin max, so it can simply be deleted when 1.21 is adopted
func max(a, b int) int {
if a > b {
return a
}
return b
}
67 changes: 1 addition & 66 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -468,70 +467,6 @@ func CreateMatchingPollForDecisionTaskResponse(historyResponse *types.RecordDeci
return matchingResp
}

// MinInt64 returns the smaller of two given int64
func MinInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}

// MaxInt64 returns the greater of two given int64
func MaxInt64(a, b int64) int64 {
if a > b {
return a
}
return b
}

// MinInt32 return smaller one of two inputs int32
func MinInt32(a, b int32) int32 {
if a < b {
return a
}
return b
}

// MinInt returns the smaller of two given integers
func MinInt(a, b int) int {
if a < b {
return a
}
return b
}

// MaxInt returns the greater one of two given integers
func MaxInt(a, b int) int {
if a > b {
return a
}
return b
}

// MinDuration returns the smaller of two given time duration
func MinDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

// MaxDuration returns the greater of two given time durations
func MaxDuration(a, b time.Duration) time.Duration {
if a > b {
return a
}
return b
}

// SortInt64Slice sorts the given int64 slice.
// Sort is not guaranteed to be stable.
func SortInt64Slice(slice []int64) {
sort.Slice(slice, func(i int, j int) bool {
return slice[i] < slice[j]
})
}

// ValidateRetryPolicy validates a retry policy
func ValidateRetryPolicy(policy *types.RetryPolicy) error {
if policy == nil {
Expand Down Expand Up @@ -1027,7 +962,7 @@ func SecondsToDuration(d int64) time.Duration {
// SleepWithMinDuration sleeps for the minimum of desired and available duration
// returns the remaining available time duration
func SleepWithMinDuration(desired time.Duration, available time.Duration) time.Duration {
d := MinDuration(desired, available)
d := min(desired, available)
if d > 0 {
time.Sleep(d)
}
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ func (s *Service) Stop() {
// 4. Wait for a second
// 5. Stop everything forcefully and return

requestDrainTime := common.MinDuration(time.Second, s.config.ShutdownDrainDuration())
failureDetectionTime := common.MaxDuration(0, s.config.ShutdownDrainDuration()-requestDrainTime)
requestDrainTime := min(time.Second, s.config.ShutdownDrainDuration())
failureDetectionTime := max(0, s.config.ShutdownDrainDuration()-requestDrainTime)

s.GetLogger().Info("ShutdownHandler: Updating rpc health status to ShuttingDown")
s.handler.UpdateHealthStatus(api.HealthStatusShuttingDown)
Expand Down
2 changes: 1 addition & 1 deletion service/history/decision/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (v *attrValidator) validateActivityScheduleAttributes(

domainName, _ := v.domainCache.GetDomainName(domainID) // if this call returns an error, we will just used the default value for max timeout
maximumScheduleToStartTimeoutForRetryInSeconds := int32(v.config.ActivityMaxScheduleToStartTimeoutForRetry(domainName).Seconds())
scheduleToStartExpiration := common.MinInt32(expiration, maximumScheduleToStartTimeoutForRetryInSeconds)
scheduleToStartExpiration := min(expiration, maximumScheduleToStartTimeoutForRetryInSeconds)
if attributes.GetScheduleToStartTimeoutSeconds() < scheduleToStartExpiration {
attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(scheduleToStartExpiration)
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/engine/engineimpl/poll_mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (e *historyEngineImpl) longPollForEventID(
return nil, context.DeadlineExceeded
}
// longPollCompletionBuffer is here to leave some room to finish current request without its timeout.
expirationInterval = common.MinDuration(
expirationInterval = min(
expirationInterval,
remainingTime-longPollCompletionBuffer,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,8 @@ func (e *historyEngineImpl) overrideStartWorkflowExecutionRequest(
maxDecisionStartToCloseTimeoutSeconds := int32(e.config.MaxDecisionStartToCloseSeconds(domainName))

taskStartToCloseTimeoutSecs := request.GetTaskStartToCloseTimeoutSeconds()
taskStartToCloseTimeoutSecs = common.MinInt32(taskStartToCloseTimeoutSecs, maxDecisionStartToCloseTimeoutSeconds)
taskStartToCloseTimeoutSecs = common.MinInt32(taskStartToCloseTimeoutSecs, request.GetExecutionStartToCloseTimeoutSeconds())
taskStartToCloseTimeoutSecs = min(taskStartToCloseTimeoutSecs, maxDecisionStartToCloseTimeoutSeconds)
taskStartToCloseTimeoutSecs = min(taskStartToCloseTimeoutSecs, request.GetExecutionStartToCloseTimeoutSeconds())

if taskStartToCloseTimeoutSecs != request.GetTaskStartToCloseTimeoutSeconds() {
request.TaskStartToCloseTimeoutSeconds = &taskStartToCloseTimeoutSecs
Expand Down
11 changes: 6 additions & 5 deletions service/history/execution/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package execution

import (
"fmt"
"slices"

checksumgen "github.com/uber/cadence/.gen/go/checksum"
"github.com/uber/cadence/common"
Expand Down Expand Up @@ -80,35 +81,35 @@ func newMutableStateChecksumPayload(ms MutableState) *checksumgen.MutableStateCh
for _, ti := range ms.GetPendingTimerInfos() {
pendingTimerIDs = append(pendingTimerIDs, ti.StartedID)
}
common.SortInt64Slice(pendingTimerIDs)
slices.Sort(pendingTimerIDs)
payload.PendingTimerStartedIDs = pendingTimerIDs

pendingActivityIDs := make([]int64, 0, len(ms.GetPendingActivityInfos()))
for id := range ms.GetPendingActivityInfos() {
pendingActivityIDs = append(pendingActivityIDs, id)
}
common.SortInt64Slice(pendingActivityIDs)
slices.Sort(pendingActivityIDs)
payload.PendingActivityScheduledIDs = pendingActivityIDs

pendingChildIDs := make([]int64, 0, len(ms.GetPendingChildExecutionInfos()))
for id := range ms.GetPendingChildExecutionInfos() {
pendingChildIDs = append(pendingChildIDs, id)
}
common.SortInt64Slice(pendingChildIDs)
slices.Sort(pendingChildIDs)
payload.PendingChildInitiatedIDs = pendingChildIDs

signalIDs := make([]int64, 0, len(ms.GetPendingSignalExternalInfos()))
for id := range ms.GetPendingSignalExternalInfos() {
signalIDs = append(signalIDs, id)
}
common.SortInt64Slice(signalIDs)
slices.Sort(signalIDs)
payload.PendingSignalInitiatedIDs = signalIDs

requestCancelIDs := make([]int64, 0, len(ms.GetPendingRequestCancelExternalInfos()))
for id := range ms.GetPendingRequestCancelExternalInfos() {
requestCancelIDs = append(requestCancelIDs, id)
}
common.SortInt64Slice(requestCancelIDs)
slices.Sort(requestCancelIDs)
payload.PendingReqCancelInitiatedIDs = requestCancelIDs
return payload
}
7 changes: 3 additions & 4 deletions service/history/execution/context_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package execution
import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -35,15 +34,15 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo
if c.shard.GetConfig().EnableShardIDMetrics() {
shardID := c.shard.GetShardID()

blobSizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()), int64(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName())))
blobSizeWarn := min(int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()), int64(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName())))
// check if blob size is larger than threshold in Dynamic config if so alert on it every time
if blobSize > blobSizeWarn {
c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID()))
c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardID), metrics.DomainTag(c.GetDomainName())).IncCounter(metrics.LargeHistoryBlobCount)
}

historyCountWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()), int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())))
historyCountWarn := min(int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()), int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())))
// check if the new history count is greater than our threshold and only count/log it once when it passes it
// this seems to double count and I can't figure out why but should be ok to get a rough idea and identify bad actors
if oldHistoryCount < historyCountWarn && newHistoryCount >= historyCountWarn {
Expand All @@ -52,7 +51,7 @@ func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCo
c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardID), metrics.DomainTag(c.GetDomainName())).IncCounter(metrics.LargeHistoryEventCount)
}

historySizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()), int64(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())))
historySizeWarn := min(int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()), int64(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())))
// check if the new history size is greater than our threshold and only count/log it once when it passes it
if oldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn {
c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()),
Expand Down
2 changes: 1 addition & 1 deletion service/history/queue/queue_processor_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func validateProcessingQueueStates(pStates []*types.ProcessingQueueState, ackLev

minAckLevel := pStates[0].GetAckLevel()
for _, pState := range pStates {
minAckLevel = common.MinInt64(minAckLevel, pState.GetAckLevel())
minAckLevel = min(minAckLevel, pState.GetAckLevel())
}

switch ackLevel := ackLevel.(type) {
Expand Down
Loading
Loading