Skip to content

Commit

Permalink
Merge #132458
Browse files Browse the repository at this point in the history
132458: kv/kvclient: set activeRangefeed NodeID and ReplicaID earlier r=dt a=stevendanna

Previously, we only set the NodeID and ReplicaID on receipt of the first rangefeed message.

That meant that if a rangefeed was waiting on the catchup iterator semaphore, we couldn't see which node or replica it was running on in the crdb_internal.active_range_feeds table.

Here, we add these fields before making the rangefeed request.

Epic: none
Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Oct 14, 2024
2 parents 12200c7 + 67621a2 commit b46fa6d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
stream := &activeMuxRangeFeed{
// TODO(msbutler): It's sad that there's a bunch of repeat metadata.
// Deduplicate once old style rangefeed code is banished from the codebase.
activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics, parentRangefeedMetadata),
activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics, parentRangefeedMetadata, token.Desc().RangeID),
rSpan: rs,
startAfter: startAfter,
token: token,
Expand Down Expand Up @@ -287,6 +287,11 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
args.Replica = s.transport.NextReplica()
args.StreamID = streamID
s.ReplicaDescriptor = args.Replica

s.activeRangeFeed.Lock()
s.activeRangeFeed.NodeID = args.Replica.NodeID
s.activeRangeFeed.Unlock()

rpcClient, err := s.transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ func newActiveRangeFeed(
rr *rangeFeedRegistry,
metrics *DistSenderRangeFeedMetrics,
parentMetadata parentRangeFeedMetadata,
initialRangeID roachpb.RangeID,
) *activeRangeFeed {
// Register partial range feed with registry.
active := &activeRangeFeed{
Expand All @@ -504,6 +505,7 @@ func newActiveRangeFeed(
StartAfter: startAfter,
ParentRangefeedMetadata: parentMetadata,
CreatedTime: timeutil.Now(),
RangeID: initialRangeID,
},
}

Expand Down

0 comments on commit b46fa6d

Please sign in to comment.