Skip to content

Commit

Permalink
Add primary store size to snapshot status result for shallow V2 snap…
Browse files Browse the repository at this point in the history
…shots

Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi committed Sep 2, 2024
1 parent cd03680 commit 39248a5
Show file tree
Hide file tree
Showing 24 changed files with 217 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private void assertShallowV2SnapshotStatus(SnapshotStatus snapshotStatus, boolea
if (hasIndexFilter) {
assertEquals(0, snapshotStatus.getStats().getTotalSize());
} else {
// TODO: after adding primary store size at the snapshot level, total size here should be > 0
assertTrue(snapshotStatus.getStats().getTotalSize() > 0L);
}
// assert that total and incremental values of file count and size_in_bytes are 0 at index and shard levels
assertEquals(0, snapshotStatus.getStats().getTotalFileCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.snapshots.status;

import org.opensearch.Version;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -86,6 +87,8 @@ public class SnapshotStatus implements ToXContentObject, Writeable {

private SnapshotStats stats;

private final long initialTotalSizeInBytes;

@Nullable
private final Boolean includeGlobalState;

Expand All @@ -96,7 +99,12 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
includeGlobalState = in.readOptionalBoolean();
final long startTime = in.readLong();
final long time = in.readLong();
updateShardStats(startTime, time);
if (in.getVersion().onOrAfter(Version.CURRENT)) {
initialTotalSizeInBytes = in.readOptionalLong();
} else {
initialTotalSizeInBytes = 0L;
}
updateShardStats(startTime, time, initialTotalSizeInBytes);
}

SnapshotStatus(
Expand All @@ -105,15 +113,18 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
List<SnapshotIndexShardStatus> shards,
Boolean includeGlobalState,
long startTime,
long time
long time,
long initialTotalSizeInBytes
) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.shards = Objects.requireNonNull(shards);
this.includeGlobalState = includeGlobalState;
shardsStats = new SnapshotShardsStats(shards);
assert time >= 0 : "time must be >= 0 but received [" + time + "]";
updateShardStats(startTime, time);
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
assert initialTotalSizeInBytes >= 0 : "initialTotalSizeInBytes must be >= 0 but received [" + initialTotalSizeInBytes + "]";
updateShardStats(startTime, time, initialTotalSizeInBytes);
}

private SnapshotStatus(
Expand All @@ -123,7 +134,8 @@ private SnapshotStatus(
Map<String, SnapshotIndexStatus> indicesStatus,
SnapshotShardsStats shardsStats,
SnapshotStats stats,
Boolean includeGlobalState
Boolean includeGlobalState,
long initialTotalSizeInBytes
) {
this.snapshot = snapshot;
this.state = state;
Expand All @@ -132,6 +144,7 @@ private SnapshotStatus(
this.shardsStats = shardsStats;
this.stats = stats;
this.includeGlobalState = includeGlobalState;
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
}

/**
Expand Down Expand Up @@ -204,6 +217,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBoolean(includeGlobalState);
out.writeLong(stats.getStartTime());
out.writeLong(stats.getTime());
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalLong(initialTotalSizeInBytes);
}
}

@Override
Expand All @@ -224,6 +240,7 @@ public SnapshotStats getStats() {
private static final String STATE = "state";
private static final String INDICES = "indices";
private static final String INCLUDE_GLOBAL_STATE = "include_global_state";
private static final String INITIAL_TOTAL_SIZE_IN_BYTES = "initial_total_size_in_bytes";

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand All @@ -235,6 +252,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (includeGlobalState != null) {
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
}
if (initialTotalSizeInBytes != 0) {
builder.field(INITIAL_TOTAL_SIZE_IN_BYTES, initialTotalSizeInBytes);
}
builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, params);
builder.field(SnapshotStats.Fields.STATS, stats, params);
builder.startObject(INDICES);
Expand All @@ -256,6 +276,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
String uuid = (String) parsedObjects[i++];
String rawState = (String) parsedObjects[i++];
Boolean includeGlobalState = (Boolean) parsedObjects[i++];
Long initialTotalSizeInBytes = (Long) parsedObjects[i++];
SnapshotStats stats = ((SnapshotStats) parsedObjects[i++]);
SnapshotShardsStats shardsStats = ((SnapshotShardsStats) parsedObjects[i++]);
@SuppressWarnings("unchecked")
Expand All @@ -276,7 +297,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
shards.addAll(index.getShards().values());
}
}
return new SnapshotStatus(snapshot, state, shards, indicesStatus, shardsStats, stats, includeGlobalState);
return new SnapshotStatus(
snapshot,
state,
shards,
indicesStatus,
shardsStats,
stats,
includeGlobalState,
initialTotalSizeInBytes
);
}
);
static {
Expand All @@ -285,6 +315,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
PARSER.declareString(constructorArg(), new ParseField(UUID));
PARSER.declareString(constructorArg(), new ParseField(STATE));
PARSER.declareBoolean(optionalConstructorArg(), new ParseField(INCLUDE_GLOBAL_STATE));
PARSER.declareLong(optionalConstructorArg(), new ParseField(INITIAL_TOTAL_SIZE_IN_BYTES));
PARSER.declareField(
constructorArg(),
SnapshotStats::fromXContent,
Expand All @@ -299,8 +330,8 @@ public static SnapshotStatus fromXContent(XContentParser parser) throws IOExcept
return PARSER.parse(parser, null);
}

private void updateShardStats(long startTime, long time) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
private void updateShardStats(long startTime, long time, long initialTotalSizeInBytes) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, initialTotalSizeInBytes, 0);
shardsStats = new SnapshotShardsStats(shards);
for (SnapshotIndexShardStatus shard : shards) {
// BWC: only update timestamps when we did not get a start time from an old node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ private void buildResponse(
Collections.unmodifiableList(shardStatusBuilder),
entry.includeGlobalState(),
entry.startTime(),
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L)
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L),
0L
)
);
}
Expand Down Expand Up @@ -344,7 +345,7 @@ private void loadRepositoryData(
boolean isShallowV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0;
long initialSnapshotTotalSize = 0;
if (isShallowV2Snapshot && request.indices().length == 0) {
// TODO: add primary store size in bytes at the snapshot level
initialSnapshotTotalSize = snapshotInfo.getSnapshotSizeInBytes();
}

for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
Expand Down Expand Up @@ -377,7 +378,8 @@ private void loadRepositoryData(
snapshotInfo.includeGlobalState(),
startTime,
// Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime,
initialSnapshotTotalSize
)
);
}
Expand Down
25 changes: 23 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;
final long primaryStoreSize;

private long avgTotalBytes;
private long avgFreeByte;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), 0L);
}

/**
Expand All @@ -84,6 +86,7 @@ protected ClusterInfo() {
* @param shardSizes a shardkey to size in bytes mapping per shard.
* @param routingToDataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param primaryStoreSize total size in bytes for all the primary shards
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(
Expand All @@ -92,14 +95,16 @@ public ClusterInfo(
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats
final Map<String, FileCacheStats> nodeFileCacheStats,
final long primaryStoreSize
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
this.primaryStoreSize = primaryStoreSize;
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

Expand All @@ -121,6 +126,11 @@ public ClusterInfo(StreamInput in) throws IOException {
} else {
this.nodeFileCacheStats = Map.of();
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
this.primaryStoreSize = in.readOptionalLong();
} else {
this.primaryStoreSize = 0L;
}

calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}
Expand Down Expand Up @@ -166,6 +176,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalLong(this.primaryStoreSize);
}
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -220,6 +233,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}
builder.endArray(); // end "reserved_sizes"
builder.field("primary_store_size", this.primaryStoreSize);
return builder;
}

Expand All @@ -246,6 +260,13 @@ public Map<String, FileCacheStats> getNodeFileCacheStats() {
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

/**
* Returns the total size in bytes for all the primary shards
*/
public long getPrimaryStoreSize() {
return primaryStoreSize;
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
private volatile IndicesStatsSummary indicesStatsSummary;
// null if this node is not currently the cluster-manager

private volatile long primaryStoreSize;
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
private volatile boolean enabled;
private volatile TimeValue fetchTimeout;
Expand All @@ -127,6 +129,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
this.mostAvailableSpaceUsages = Map.of();
this.nodeFileCacheStats = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.primaryStoreSize = 0L;
this.threadPool = threadPool;
this.client = client;
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -213,7 +216,8 @@ public ClusterInfo getClusterInfo() {
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardRoutingToDataPath,
indicesStatsSummary.reservedSpace,
nodeFileCacheStats
nodeFileCacheStats,
primaryStoreSize
);
}

Expand Down Expand Up @@ -305,8 +309,13 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
final Map<String, Long> shardSizeByIdentifierBuilder = new HashMap<>();
final Map<ShardRouting, String> dataPathByShardRoutingBuilder = new HashMap<>();
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders = new HashMap<>();
buildShardLevelInfo(logger, stats, shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, reservedSpaceBuilders);

primaryStoreSize = buildShardLevelInfo(
logger,
stats,
shardSizeByIdentifierBuilder,
dataPathByShardRoutingBuilder,
reservedSpaceBuilders
);
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> rsrvdSpace = new HashMap<>();
reservedSpaceBuilders.forEach((nodeAndPath, builder) -> rsrvdSpace.put(nodeAndPath, builder.build()));

Expand Down Expand Up @@ -366,13 +375,14 @@ public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
listeners.add(clusterInfoConsumer);
}

static void buildShardLevelInfo(
static long buildShardLevelInfo(
Logger logger,
ShardStats[] stats,
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> newShardRoutingToDataPath,
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceByShard
) {
long currentPrimaryStoreSize = 0L;
for (ShardStats s : stats) {
final ShardRouting shardRouting = s.getShardRouting();
newShardRoutingToDataPath.put(shardRouting, s.getDataPath());
Expand All @@ -382,6 +392,9 @@ static void buildShardLevelInfo(
continue;
}
final long size = storeStats.sizeInBytes();
if (shardRouting.primary()) {
currentPrimaryStoreSize += size;
}
final long reserved = storeStats.getReservedSize().getBytes();

final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
Expand All @@ -396,6 +409,7 @@ static void buildShardLevelInfo(
reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
}
}
return currentPrimaryStoreSize;
}

static void fillDiskUsagePerNode(
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,7 @@ protected Node(
clusterModule.getIndexNameExpressionResolver(),
repositoryService,
transportService,
clusterInfoService,
actionModule.getActionFilters(),
remoteStorePinnedTimestampService
);
Expand Down
Loading

0 comments on commit 39248a5

Please sign in to comment.