diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index df52304b2a9c..c4ab3790cacd 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -214,7 +214,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed( stream := &activeMuxRangeFeed{ // TODO(msbutler): It's sad that there's a bunch of repeat metadata. // Deduplicate once old style rangefeed code is banished from the codebase. - activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics, parentRangefeedMetadata), + activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics, parentRangefeedMetadata, token.Desc().RangeID), rSpan: rs, startAfter: startAfter, token: token, @@ -287,6 +287,11 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error args.Replica = s.transport.NextReplica() args.StreamID = streamID s.ReplicaDescriptor = args.Replica + + s.activeRangeFeed.Lock() + s.activeRangeFeed.NodeID = args.Replica.NodeID + s.activeRangeFeed.Unlock() + rpcClient, err := s.transport.NextInternalClient(ctx) if err != nil { log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index b9cd84c8475f..b0ce2e7233d7 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -496,6 +496,7 @@ func newActiveRangeFeed( rr *rangeFeedRegistry, metrics *DistSenderRangeFeedMetrics, parentMetadata parentRangeFeedMetadata, + initialRangeID roachpb.RangeID, ) *activeRangeFeed { // Register partial range feed with registry. active := &activeRangeFeed{ @@ -504,6 +505,7 @@ func newActiveRangeFeed( StartAfter: startAfter, ParentRangefeedMetadata: parentMetadata, CreatedTime: timeutil.Now(), + RangeID: initialRangeID, }, }