Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matching logic for pinned workflows #6853

Merged
merged 28 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1f267ea
Matching logic for pinned workflows
ShahabT Nov 18, 2024
ac169b6
Merge remote-tracking branch 'origin/versioning-3' into shahab/routin…
ShahabT Nov 20, 2024
1931ea2
Matching logic for pinned workflows
ShahabT Nov 21, 2024
c0ad47b
Merge remote-tracking branch 'origin/versioning-3' into shahab/routin…
ShahabT Nov 21, 2024
c96ac37
Update to latest proto
ShahabT Nov 21, 2024
6f5b252
Merge remote-tracking branch 'origin/versioning-3' into shahab/routin…
ShahabT Nov 22, 2024
bae3cce
fix compile errors
ShahabT Nov 22, 2024
bee9e0a
Versioning History <-> Matching protos
ShahabT Nov 22, 2024
214bd74
Merge branch 'shahab/v3-internal-protos' into shahab/routing-behaviors
ShahabT Nov 22, 2024
f3f5cbf
Merge remote-tracking branch 'origin/versioning-3' into shahab/routin…
ShahabT Nov 22, 2024
2cdc9cd
Pass versioning info when adding Matching tasks
ShahabT Nov 22, 2024
8350126
add to GetMutableStateResponse
ShahabT Nov 22, 2024
5559ddf
Merge branch 'shahab/v3-internal-protos' into shahab/routing-behaviors
ShahabT Nov 22, 2024
f3f96bf
Pass versioning info when adding Matching tasks
ShahabT Nov 22, 2024
5d7333e
Merge remote-tracking branch 'origin/versioning-3' into shahab/v3-int…
ShahabT Nov 22, 2024
7828cf6
Merge remote-tracking branch 'origin/shahab/v3-internal-protos' into …
ShahabT Nov 22, 2024
790ed22
Merge branch 'shahab/transfer-versioning-info' into shahab/routing-be…
ShahabT Nov 22, 2024
544e62f
Merge remote-tracking branch 'origin/versioning-3' into shahab/transf…
ShahabT Nov 23, 2024
3a2c03c
merge versioning-3
ShahabT Nov 23, 2024
6a0c932
Merge branch 'shahab/transfer-versioning-info' into shahab/routing-be…
ShahabT Nov 23, 2024
ac239d2
merge base
ShahabT Nov 23, 2024
a0dcc9c
Merge remote-tracking branch 'origin/versioning-3' into shahab/transf…
ShahabT Nov 25, 2024
8aad00a
Merge branch 'shahab/transfer-versioning-info' into shahab/routing-be…
ShahabT Nov 25, 2024
efca1f0
don't spool forwarded task
ShahabT Nov 25, 2024
f4195d7
Merge remote-tracking branch 'origin/versioning-3' into shahab/routin…
ShahabT Nov 25, 2024
024b1ac
Merge remote-tracking branch 'origin/versioning-3' into shahab/routin…
ShahabT Nov 26, 2024
64b51d2
address comments
ShahabT Nov 26, 2024
80d2e66
lint
ShahabT Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -222,3 +223,18 @@ func StampFromCapabilities(cap *commonpb.WorkerVersionCapabilities) *commonpb.Wo
func StampFromBuildId(buildId string) *commonpb.WorkerVersionStamp {
return &commonpb.WorkerVersionStamp{UseVersioning: true, BuildId: buildId}
}

// ValidateDeployment returns error if the deployment is nil or it has empty build ID or deployment
// name.
func ValidateDeployment(deployment *deploymentpb.Deployment) error {
if deployment == nil {
return serviceerror.NewInvalidArgument("deployment cannot be nil")
}
if deployment.GetSeriesName() == "" {
return serviceerror.NewInvalidArgument("deployment series name cannot be empty")
}
if deployment.GetBuildId() == "" {
return serviceerror.NewInvalidArgument("deployment build ID cannot be empty")
}
return nil
}
2 changes: 1 addition & 1 deletion service/matching/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (db *taskQueueDB) updateApproximateBacklogCount(
// Prevent under-counting
if db.approximateBacklogCount.Load()+delta < 0 {
db.logger.Info("ApproximateBacklogCounter could have under-counted.",
tag.WorkerBuildId(db.queue.BuildId()), tag.WorkflowNamespace(db.queue.Partition().NamespaceId()))
tag.WorkerBuildId(db.queue.Version().MetricsTagValue()), tag.WorkflowNamespace(db.queue.Partition().NamespaceId()))
db.approximateBacklogCount.Store(0)
} else {
db.approximateBacklogCount.Add(delta)
Expand Down
4 changes: 2 additions & 2 deletions service/matching/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func (fwdr *Forwarder) getForwardInfo(task *internalTask) *taskqueuespb.TaskForw
forwardInfo := &taskqueuespb.TaskForwardInfo{
TaskSource: task.source,
SourcePartition: fwdr.partition.RpcName(),
DispatchBuildId: fwdr.queue.BuildId(),
DispatchVersionSet: fwdr.queue.VersionSet(),
DispatchBuildId: fwdr.queue.Version().BuildId(),
DispatchVersionSet: fwdr.queue.Version().VersionSet(),
RedirectInfo: task.redirectInfo,
}
return forwardInfo
Expand Down
8 changes: 4 additions & 4 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ func (e *matchingEngineImpl) AddWorkflowTask(

return pm.AddTask(ctx, addTaskParams{
taskInfo: taskInfo,
directive: addRequest.VersionDirective,
dnr marked this conversation as resolved.
Show resolved Hide resolved
forwardInfo: addRequest.ForwardInfo,
})
}
Expand Down Expand Up @@ -542,7 +541,6 @@ func (e *matchingEngineImpl) AddActivityTask(

return pm.AddTask(ctx, addTaskParams{
taskInfo: taskInfo,
directive: addRequest.VersionDirective,
forwardInfo: addRequest.ForwardInfo,
})
}
Expand Down Expand Up @@ -2128,9 +2126,11 @@ func (e *matchingEngineImpl) unloadTaskQueuePartitionByKey(
func (e *matchingEngineImpl) updatePhysicalTaskQueueGauge(pqm *physicalTaskQueueManagerImpl, delta int) {
// calculating versioned to be one of: “unversioned” or "buildId” or “versionSet”
versioned := "unversioned"
if buildID := pqm.queue.BuildId(); buildID != "" {
if dep := pqm.queue.Version().Deployment(); dep != nil {
versioned = "deployment"
} else if buildID := pqm.queue.Version().BuildId(); buildID != "" {
versioned = "buildId"
} else if versionSet := pqm.queue.VersionSet(); versionSet != "" {
} else if versionSet := pqm.queue.Version().VersionSet(); versionSet != "" {
versioned = "versionSet"
}

Expand Down
21 changes: 15 additions & 6 deletions service/matching/matching_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2970,13 +2970,18 @@ func (s *matchingEngineSuite) TestUpdatePhysicalTaskQueueGauge_VersionSet() {
// the size of the map to 1 and it's counter to 1.
s.PhysicalQueueMetricValidator(capture, 1, 1)

Vqtpm, err := tqm.(*taskQueuePartitionManagerImpl).getVersionedQueueNoWait(versionSet, "", true)
vqtpm, err := tqm.(*taskQueuePartitionManagerImpl).getVersionedQueueNoWait(
versionSet,
"",
nil,
true,
)
s.Require().NoError(err)

// Creating a VersionedQueue results in increasing the size of the map to 2, due to 2 entries now,
// with it's counter to 1.
s.PhysicalQueueMetricValidator(capture, 2, 1)
s.matchingEngine.updatePhysicalTaskQueueGauge(Vqtpm.(*physicalTaskQueueManagerImpl), 1)
s.matchingEngine.updatePhysicalTaskQueueGauge(vqtpm.(*physicalTaskQueueManagerImpl), 1)
s.PhysicalQueueMetricValidator(capture, 3, 2)

// Validating if versioned has been set right for the specific parameters
Expand Down Expand Up @@ -3007,7 +3012,12 @@ func (s *matchingEngineSuite) TestUpdatePhysicalTaskQueueGauge_BuildID() {
// the size of the map to 1 and it's counter to 1.
s.PhysicalQueueMetricValidator(capture, 1, 1)

Vqtpm, err := tqm.(*taskQueuePartitionManagerImpl).getVersionedQueueNoWait("", buildID, true)
Vqtpm, err := tqm.(*taskQueuePartitionManagerImpl).getVersionedQueueNoWait(
"",
buildID,
nil,
true,
)
s.Require().NoError(err)

// Creating a VersionedQueue results in increasing the size of the map to 2, due to 2 entries now,
Expand Down Expand Up @@ -3473,12 +3483,11 @@ type testTaskManager struct {

type dbTaskQueueKey struct {
partitionKey tqid.PartitionKey
versionSet string
buildId string
version PhysicalTaskQueueVersion
}

func getKey(dbq *PhysicalTaskQueueKey) dbTaskQueueKey {
return dbTaskQueueKey{dbq.partition.Key(), dbq.versionSet, dbq.buildId}
return dbTaskQueueKey{dbq.partition.Key(), dbq.Version()}
}

func newTestTaskManager(logger log.Logger) *testTaskManager {
Expand Down
153 changes: 120 additions & 33 deletions service/matching/physical_task_queue_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,18 @@ import (
"strconv"
"strings"

"go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/server/common/tqid"
)

const (
// nonRootPartitionPrefix is the prefix for all mangled task queue names.
nonRootPartitionPrefix = "/_sys/"
partitionDelimiter = "/"
versionSetDelimiter = ":"
buildIdDelimiter = "#"
nonRootPartitionPrefix = "/_sys/"
partitionDelimiter = "/"
versionSetDelimiter = ":"
buildIdDelimiter = "#"
deploymentNameDelimiter = "|"
Comment on lines +41 to +45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really cool! Let's aim to draw something meaningful here. Like a Christmas tree.

dnr marked this conversation as resolved.
Show resolved Hide resolved
)

type (
Expand All @@ -51,10 +53,16 @@ type (
// Physical task queues with a version set or build ID are called "versioned". The ones without a version set
// or build ID are called "unversioned". A physical queue cannot have both version set and build ID.
PhysicalTaskQueueKey struct {
partition tqid.Partition
partition tqid.Partition
version PhysicalTaskQueueVersion
}

PhysicalTaskQueueVersion struct {
versionSet string // version set id
// BuildId and VersionSet are mutually exclusive
// buildId and versionSet are mutually exclusive. buildId must be set if deploymentSeriesName is.
buildId string
// When present, it means this is a V3 pinned queue.
deploymentSeriesName string
}
)

Expand All @@ -78,14 +86,6 @@ func (q *PhysicalTaskQueueKey) Partition() tqid.Partition {
return q.partition
}

func (q *PhysicalTaskQueueKey) BuildId() string {
return q.buildId
}

func (q *PhysicalTaskQueueKey) VersionSet() string {
return q.versionSet
}

// UnversionedQueueKey returns the unversioned PhysicalTaskQueueKey of a task queue partition
func UnversionedQueueKey(p tqid.Partition) *PhysicalTaskQueueKey {
return &PhysicalTaskQueueKey{
Expand All @@ -96,16 +96,31 @@ func UnversionedQueueKey(p tqid.Partition) *PhysicalTaskQueueKey {
// VersionSetQueueKey returns a PhysicalTaskQueueKey of a task queue partition with the given version set id.
func VersionSetQueueKey(p tqid.Partition, versionSet string) *PhysicalTaskQueueKey {
return &PhysicalTaskQueueKey{
partition: p,
versionSet: versionSet,
partition: p,
version: PhysicalTaskQueueVersion{
versionSet: versionSet,
},
}
}

// BuildIdQueueKey returns a PhysicalTaskQueueKey of a task queue partition with the given build ID.
func BuildIdQueueKey(p tqid.Partition, buildId string) *PhysicalTaskQueueKey {
return &PhysicalTaskQueueKey{
partition: p,
buildId: buildId,
version: PhysicalTaskQueueVersion{
buildId: buildId,
},
}
}

// DeploymentQueueKey returns a PhysicalTaskQueueKey of a task queue partition for a deployment.
func DeploymentQueueKey(p tqid.Partition, deployment *deployment.Deployment) *PhysicalTaskQueueKey {
return &PhysicalTaskQueueKey{
partition: p,
version: PhysicalTaskQueueVersion{
buildId: deployment.GetBuildId(),
deploymentSeriesName: deployment.GetSeriesName(),
},
}
}

Expand All @@ -119,6 +134,7 @@ func BuildIdQueueKey(p tqid.Partition, buildId string) *PhysicalTaskQueueKey {
//
// All versioned DB queues use mangled names, using the following format:
//
// with build ID: /_sys/<base name>/<deployment name base64 URL encoded>|<build ID base64 URL encoded>#<partition id>
// with build ID: /_sys/<base name>/<build ID base64 URL encoded>#<partition id>
// with version set: /_sys/<base name>/<version set id>:<partition id>
func (q *PhysicalTaskQueueKey) PersistenceName() string {
Expand All @@ -128,12 +144,16 @@ func (q *PhysicalTaskQueueKey) PersistenceName() string {
case *tqid.NormalPartition:
baseName := q.TaskQueueFamily().Name()

if len(q.versionSet) > 0 {
return nonRootPartitionPrefix + baseName + partitionDelimiter + q.versionSet + versionSetDelimiter + strconv.Itoa(p.PartitionId())
if len(q.version.versionSet) > 0 {
return nonRootPartitionPrefix + baseName + partitionDelimiter + q.version.versionSet + versionSetDelimiter + strconv.Itoa(p.PartitionId())
}

if len(q.buildId) > 0 {
encodedBuildId := base64.URLEncoding.EncodeToString([]byte(q.buildId))
if len(q.version.deploymentSeriesName) > 0 {
encodedBuildId := base64.RawURLEncoding.EncodeToString([]byte(q.version.buildId))
encodedDeploymentName := base64.RawURLEncoding.EncodeToString([]byte(q.version.deploymentSeriesName))
return nonRootPartitionPrefix + baseName + partitionDelimiter + encodedDeploymentName + deploymentNameDelimiter + encodedBuildId + buildIdDelimiter + strconv.Itoa(p.PartitionId())
} else if len(q.version.buildId) > 0 {
encodedBuildId := base64.URLEncoding.EncodeToString([]byte(q.version.buildId))
return nonRootPartitionPrefix + baseName + partitionDelimiter + encodedBuildId + buildIdDelimiter + strconv.Itoa(p.PartitionId())
}

Expand All @@ -154,6 +174,7 @@ func ParsePhysicalTaskQueueKey(persistenceName string, namespaceId string, taskT
partitionId := 0
versionSet := ""
buildId := ""
deploymentName := ""

if strings.HasPrefix(persistenceName, nonRootPartitionPrefix) {
suffixOff := strings.LastIndex(persistenceName, partitionDelimiter)
Expand All @@ -163,7 +184,7 @@ func ParsePhysicalTaskQueueKey(persistenceName string, namespaceId string, taskT
baseName = persistenceName[len(nonRootPartitionPrefix):suffixOff]
suffix := persistenceName[suffixOff+1:]
var err error
partitionId, versionSet, buildId, err = parseSuffix(persistenceName, suffix)
partitionId, versionSet, buildId, deploymentName, err = parseSuffix(persistenceName, suffix)
if err != nil {
return nil, err
}
Expand All @@ -174,36 +195,102 @@ func ParsePhysicalTaskQueueKey(persistenceName string, namespaceId string, taskT
return nil, err
}
return &PhysicalTaskQueueKey{
partition: f.TaskQueue(taskType).NormalPartition(partitionId),
versionSet: versionSet,
buildId: buildId,
partition: f.TaskQueue(taskType).NormalPartition(partitionId),
version: PhysicalTaskQueueVersion{
versionSet: versionSet,
buildId: buildId,
deploymentSeriesName: deploymentName,
},
}, nil
}

func parseSuffix(persistenceName string, suffix string) (partition int, versionSet string, buildId string, err error) {
if partitionOff := strings.LastIndex(suffix, buildIdDelimiter); partitionOff == 0 {
return 0, "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
//nolint:revive // cognitive complexity will be simpler once versioning 1-2 is cleaned up
func parseSuffix(persistenceName string, suffix string) (partition int, versionSet string, buildId string, deploymentName string, err error) {
if buildIdOff := strings.LastIndex(suffix, deploymentNameDelimiter); buildIdOff == 0 {
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
} else if buildIdOff > 0 {
deploymentNameBytes, err := base64.RawURLEncoding.DecodeString(suffix[:buildIdOff])
if err != nil {
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
}
deploymentName = string(deploymentNameBytes)

suffix = suffix[buildIdOff+1:]
if partitionOff := strings.LastIndex(suffix, buildIdDelimiter); partitionOff == 0 {
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
} else if partitionOff > 0 {
buildIdBytes, err := base64.RawURLEncoding.DecodeString(suffix[:partitionOff])
if err != nil {
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
}
buildId = string(buildIdBytes)
suffix = suffix[partitionOff+1:]
}
} else if partitionOff := strings.LastIndex(suffix, buildIdDelimiter); partitionOff == 0 {
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
} else if partitionOff > 0 {
buildIdBytes, err := base64.URLEncoding.DecodeString(suffix[:partitionOff])
if err != nil {
return 0, "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
}
buildId = string(buildIdBytes)
suffix = suffix[partitionOff+1:]
} else if partitionOff := strings.LastIndex(suffix, versionSetDelimiter); partitionOff == 0 {
return 0, "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
} else if partitionOff > 0 {
// pull out version set
versionSet, suffix = suffix[:partitionOff], suffix[partitionOff+1:]
}

partition, err = strconv.Atoi(suffix)
if err != nil || partition < 0 || (partition == 0 && len(versionSet) == 0 && len(buildId) == 0) {
return 0, "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
return 0, "", "", "", fmt.Errorf("%w: %s", ErrInvalidPersistenceName, persistenceName)
}
return partition, versionSet, buildId, err
return partition, versionSet, buildId, deploymentName, err
}

func (q *PhysicalTaskQueueKey) IsVersioned() bool {
return q.versionSet != "" || q.buildId != ""
return q.version.IsVersioned()
}

// Version returns a pointer to the physical queue version key. Caller must not manipulate the
// returned value.
func (q *PhysicalTaskQueueKey) Version() PhysicalTaskQueueVersion {
return q.version
}

func (v PhysicalTaskQueueVersion) IsVersioned() bool {
return v.versionSet != "" || v.buildId != ""
}

func (v PhysicalTaskQueueVersion) Deployment() *deployment.Deployment {
if len(v.deploymentSeriesName) > 0 {
return &deployment.Deployment{
SeriesName: v.deploymentSeriesName,
BuildId: v.buildId,
}
}
return nil
}

// BuildId returns empty if this is not a Versioning v2 queue.
func (v PhysicalTaskQueueVersion) BuildId() string {
if len(v.deploymentSeriesName) > 0 {
return ""
}
return v.buildId
}

func (v PhysicalTaskQueueVersion) VersionSet() string {
return v.versionSet
}

// MetricsTagValue returns the build ID tag value for this version.
func (v PhysicalTaskQueueVersion) MetricsTagValue() string {
if v.versionSet != "" {
return v.versionSet
} else if v.deploymentSeriesName == "" {
return v.buildId
}
return v.deploymentSeriesName + "/" + v.buildId
}
Loading
Loading