Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahabT committed Nov 26, 2024
1 parent 024b1ac commit 64b51d2
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 25 deletions.
4 changes: 3 additions & 1 deletion service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2126,7 +2126,9 @@ func (e *matchingEngineImpl) unloadTaskQueuePartitionByKey(
func (e *matchingEngineImpl) updatePhysicalTaskQueueGauge(pqm *physicalTaskQueueManagerImpl, delta int) {
// calculating versioned to be one of: “unversioned” or "buildId” or “versionSet”
versioned := "unversioned"
if buildID := pqm.queue.Version().BuildId(); buildID != "" {
if deployment := pqm.queue.Version().Deployment(); deployment != nil {

Check failure on line 2129 in service/matching/matching_engine.go

View workflow job for this annotation

GitHub Actions / lint

import-shadowing: The name 'deployment' shadows an import name (revive)
versioned = "deployment"
} else if buildID := pqm.queue.Version().BuildId(); buildID != "" {
versioned = "buildId"
} else if versionSet := pqm.queue.Version().VersionSet(); versionSet != "" {
versioned = "versionSet"
Expand Down
6 changes: 3 additions & 3 deletions service/matching/matching_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2970,7 +2970,7 @@ func (s *matchingEngineSuite) TestUpdatePhysicalTaskQueueGauge_VersionSet() {
// the size of the map to 1 and it's counter to 1.
s.PhysicalQueueMetricValidator(capture, 1, 1)

Vqtpm, err := tqm.(*taskQueuePartitionManagerImpl).getVersionedQueueNoWait(
vqtpm, err := tqm.(*taskQueuePartitionManagerImpl).getVersionedQueueNoWait(
versionSet,
"",
nil,
Expand All @@ -2981,7 +2981,7 @@ func (s *matchingEngineSuite) TestUpdatePhysicalTaskQueueGauge_VersionSet() {
// Creating a VersionedQueue results in increasing the size of the map to 2, due to 2 entries now,
// with it's counter to 1.
s.PhysicalQueueMetricValidator(capture, 2, 1)
s.matchingEngine.updatePhysicalTaskQueueGauge(Vqtpm.(*physicalTaskQueueManagerImpl), 1)
s.matchingEngine.updatePhysicalTaskQueueGauge(vqtpm.(*physicalTaskQueueManagerImpl), 1)
s.PhysicalQueueMetricValidator(capture, 3, 2)

// Validating if versioned has been set right for the specific parameters
Expand Down Expand Up @@ -3487,7 +3487,7 @@ type dbTaskQueueKey struct {
}

func getKey(dbq *PhysicalTaskQueueKey) dbTaskQueueKey {
return dbTaskQueueKey{dbq.partition.Key(), *dbq.Version()}
return dbTaskQueueKey{dbq.partition.Key(), dbq.Version()}
}

func newTestTaskManager(logger log.Logger) *testTaskManager {
Expand Down
22 changes: 11 additions & 11 deletions service/matching/physical_task_queue_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func (q *PhysicalTaskQueueKey) PersistenceName() string {
}

if len(q.version.deploymentSeriesName) > 0 {
encodedBuildId := base64.URLEncoding.EncodeToString([]byte(q.version.buildId))
encodedDeploymentName := base64.URLEncoding.EncodeToString([]byte(q.version.deploymentSeriesName))
encodedBuildId := base64.RawURLEncoding.EncodeToString([]byte(q.version.buildId))
encodedDeploymentName := base64.RawURLEncoding.EncodeToString([]byte(q.version.deploymentSeriesName))
return nonRootPartitionPrefix + baseName + partitionDelimiter + encodedDeploymentName + deploymentNameDelimiter + encodedBuildId + buildIdDelimiter + strconv.Itoa(p.PartitionId())
} else if len(q.version.buildId) > 0 {
encodedBuildId := base64.URLEncoding.EncodeToString([]byte(q.version.buildId))
Expand Down Expand Up @@ -209,7 +209,7 @@ func parseSuffix(persistenceName string, suffix string) (partition int, versionS
if buildIdOff := strings.LastIndex(suffix, deploymentNameDelimiter); buildIdOff == 0 {
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
} else if buildIdOff > 0 {
deploymentNameBytes, err := base64.URLEncoding.DecodeString(suffix[:buildIdOff])
deploymentNameBytes, err := base64.RawURLEncoding.DecodeString(suffix[:buildIdOff])
if err != nil {
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
}
Expand All @@ -219,7 +219,7 @@ func parseSuffix(persistenceName string, suffix string) (partition int, versionS
if partitionOff := strings.LastIndex(suffix, buildIdDelimiter); partitionOff == 0 {
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
} else if partitionOff > 0 {
buildIdBytes, err := base64.URLEncoding.DecodeString(suffix[:partitionOff])
buildIdBytes, err := base64.RawURLEncoding.DecodeString(suffix[:partitionOff])
if err != nil {
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
}
Expand Down Expand Up @@ -255,15 +255,15 @@ func (q *PhysicalTaskQueueKey) IsVersioned() bool {

// Version returns a pointer to the physical queue version key. Caller must not manipulate the
// returned value.
func (q *PhysicalTaskQueueKey) Version() *PhysicalTaskQueueVersion {
return &q.version
func (q *PhysicalTaskQueueKey) Version() PhysicalTaskQueueVersion {
return q.version
}

func (v *PhysicalTaskQueueVersion) IsVersioned() bool {
func (v PhysicalTaskQueueVersion) IsVersioned() bool {
return v.versionSet != "" || v.buildId != ""
}

func (v *PhysicalTaskQueueVersion) Deployment() *deployment.Deployment {
func (v PhysicalTaskQueueVersion) Deployment() *deployment.Deployment {
if len(v.deploymentSeriesName) > 0 {
return &deployment.Deployment{
SeriesName: v.deploymentSeriesName,
Expand All @@ -274,19 +274,19 @@ func (v *PhysicalTaskQueueVersion) Deployment() *deployment.Deployment {
}

// BuildId returns empty if this is not a Versioning v2 queue.
func (v *PhysicalTaskQueueVersion) BuildId() string {
func (v PhysicalTaskQueueVersion) BuildId() string {
if len(v.deploymentSeriesName) > 0 {
return ""
}
return v.buildId
}

func (v *PhysicalTaskQueueVersion) VersionSet() string {
func (v PhysicalTaskQueueVersion) VersionSet() string {
return v.versionSet
}

// MetricsTagValue returns the build ID tag value for this version.
func (v *PhysicalTaskQueueVersion) MetricsTagValue() string {
func (v PhysicalTaskQueueVersion) MetricsTagValue() string {
if v.versionSet != "" {
return v.versionSet
} else if v.deploymentSeriesName == "" {
Expand Down
9 changes: 5 additions & 4 deletions service/matching/physical_task_queue_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ func TestValidPersistenceNames(t *testing.T) {
versionSet := "asdf89SD-lks_="
buildID := "build-ABC/adsf:98"
seriesName := "?deployment-ABC|ad/sf:98"
encodedBuildID := base64.URLEncoding.EncodeToString([]byte(buildID))
encodedSeriesName := base64.URLEncoding.EncodeToString([]byte(seriesName))
v2EncodedBuildID := base64.URLEncoding.EncodeToString([]byte(buildID))
encodedBuildID := base64.RawURLEncoding.EncodeToString([]byte(buildID))
encodedSeriesName := base64.RawURLEncoding.EncodeToString([]byte(seriesName))
deployment := &deploymentpb.Deployment{
SeriesName: seriesName,
BuildId: buildID,
Expand All @@ -124,8 +125,8 @@ func TestValidPersistenceNames(t *testing.T) {
{"/_sys//list0//41", "/list0/", 41, "", "", nil},
{"/_sys/list0/" + versionSet + ":1", "list0", 1, versionSet, "", nil},
{"/_sys//list0//" + versionSet + ":41", "/list0/", 41, versionSet, "", nil},
{"/_sys/list0/" + encodedBuildID + "#1", "list0", 1, "", buildID, nil},
{"/_sys//list0//" + encodedBuildID + "#41", "/list0/", 41, "", buildID, nil},
{"/_sys/list0/" + v2EncodedBuildID + "#1", "list0", 1, "", buildID, nil},
{"/_sys//list0//" + v2EncodedBuildID + "#41", "/list0/", 41, "", buildID, nil},
{"/_sys/list0/" + encodedSeriesName + "|" + encodedBuildID + "#1", "list0", 1, "", "", deployment},
{"/_sys//list0//" + encodedSeriesName + "|" + encodedBuildID + "#41", "/list0/", 41, "", "", deployment},
}
Expand Down
5 changes: 3 additions & 2 deletions service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ func (pm *taskQueuePartitionManagerImpl) Describe(
// In the future, active will mean that the physical queue for that version has had a task added recently or a recent poller.
if includeAllActive {
for k := range pm.versionedQueues {
// TODO: add deployment info to DescribeTaskQueue
if b := k.BuildId(); b != "" {
buildIds[b] = true
}
Expand Down Expand Up @@ -622,13 +623,13 @@ func (pm *taskQueuePartitionManagerImpl) unloadPhysicalQueue(unloadedDbq physica
}

pm.versionedQueuesLock.Lock()
foundDbq, ok := pm.versionedQueues[*version]
foundDbq, ok := pm.versionedQueues[version]
if !ok || foundDbq != unloadedDbq {
pm.versionedQueuesLock.Unlock()
unloadedDbq.Stop(unloadCause)
return
}
delete(pm.versionedQueues, *version)
delete(pm.versionedQueues, version)
pm.versionedQueuesLock.Unlock()
unloadedDbq.Stop(unloadCause)
}
Expand Down
6 changes: 3 additions & 3 deletions service/matching/version_rule_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,17 +609,17 @@ func getSourcesForTarget(buildId string, redirectRules []*persistencespb.Redirec
return sources
}

// FindAssignmentBuildId finds a build ID for the given workflowId based on the given rules.
// FindAssignmentBuildId finds a build ID for the given runId based on the given rules.
// Non-empty runId is deterministically mapped to a ramp threshold, while empty runId is mapped randomly each time.
func FindAssignmentBuildId(rules []*persistencespb.AssignmentRule, workflowId string) string {
func FindAssignmentBuildId(rules []*persistencespb.AssignmentRule, runId string) string {
rampThreshold := -1.
for _, r := range rules {
if r.GetDeleteTimestamp() != nil {
continue
}
if !isFullyRamped(r.GetRule()) {
if rampThreshold == -1. {
rampThreshold = calcRampThreshold(workflowId)
rampThreshold = calcRampThreshold(runId)
}
if float64(r.GetRule().GetPercentageRamp().GetRampPercentage()) <= rampThreshold {
continue
Expand Down
1 change: 0 additions & 1 deletion tests/versioning_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// nolint:revive
package tests

import (
Expand Down

0 comments on commit 64b51d2

Please sign in to comment.