Skip to content

Commit

Permalink
kv/kvclient: set activeRangefeed NodeID and ReplicaID earlier
Browse files Browse the repository at this point in the history
Previously, we only set the NodeID and ReplicaID on receipt of the
first rangefeed message.

That meant that if a rangefeed was waiting on the catchup iterator
semaphore, we couldn't see which node or replica it was running on in
the crdb_internal.active_range_feeds table.

Here, we add these fields before making the rangefeed request.

Epic: none
Release note: None
  • Loading branch information
stevendanna committed Oct 11, 2024
1 parent 3005436 commit 67621a2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
stream := &activeMuxRangeFeed{
// TODO(msbutler): It's sad that there's a bunch of repeat metadata.
// Deduplicate once old style rangefeed code is banished from the codebase.
activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics, parentRangefeedMetadata),
activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics, parentRangefeedMetadata, token.Desc().RangeID),
rSpan: rs,
startAfter: startAfter,
token: token,
Expand Down Expand Up @@ -287,6 +287,11 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
args.Replica = s.transport.NextReplica()
args.StreamID = streamID
s.ReplicaDescriptor = args.Replica

s.activeRangeFeed.Lock()
s.activeRangeFeed.NodeID = args.Replica.NodeID
s.activeRangeFeed.Unlock()

rpcClient, err := s.transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ func newActiveRangeFeed(
rr *rangeFeedRegistry,
metrics *DistSenderRangeFeedMetrics,
parentMetadata parentRangeFeedMetadata,
initialRangeID roachpb.RangeID,
) *activeRangeFeed {
// Register partial range feed with registry.
active := &activeRangeFeed{
Expand All @@ -504,6 +505,7 @@ func newActiveRangeFeed(
StartAfter: startAfter,
ParentRangefeedMetadata: parentMetadata,
CreatedTime: timeutil.Now(),
RangeID: initialRangeID,
},
}

Expand Down

0 comments on commit 67621a2

Please sign in to comment.