Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FileCopy based bootstrap] GetMetadata Api #2991

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.github.ambry.clustermap.ClusterParticipant;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.store.LogInfo;
import com.github.ambry.store.Store;
import com.github.ambry.store.StoreException;
import java.io.IOException;
Expand Down Expand Up @@ -153,4 +154,12 @@ public interface StoreManager {
* @throws IOException
*/
boolean isFilesExistForPattern(PartitionId partitionId, Pattern pattern) throws IOException;

/**
* Get the list of log segment metadata files for a given partition.
* @param partitionId
* @param includeActiveLogSegment
* @return List of LogSegmentFiles along with its IndexFiles, BloomFilterFiles
*/
public List<LogInfo> getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment);
}
33 changes: 33 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/FileInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.store;


public class FileInfo {
private final String fileName;
private final long fileSize;

public FileInfo(String fileName, Long fileSize) {
this.fileName = fileName;
this.fileSize = fileSize;
}

public String getFileName() {
return fileName;
}

public Long getFileSize() {
return fileSize;
}
}
45 changes: 45 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/LogInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.store;

import java.util.Collections;
import java.util.List;


public class LogInfo {
private final FileInfo logSegment;
private final List<FileInfo> indexSegments;
private final List<FileInfo> bloomFilters;

public LogInfo(FileInfo logSegment, List<FileInfo> indexSegments, List<FileInfo> bloomFilters) {
this.logSegment = logSegment;
this.indexSegments = indexSegments;
this.bloomFilters = bloomFilters;
}

// TODO: Add isSealed prop
// private final boolean isSealed;

public FileInfo getLogSegment() {
return logSegment;
}

public List<FileInfo> getIndexSegments() {
return Collections.unmodifiableList(indexSegments);
}

public List<FileInfo> getBloomFilters() {
return Collections.unmodifiableList(bloomFilters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.server.StoreManager;
import com.github.ambry.store.LogInfo;
import com.github.ambry.store.Store;
import java.nio.file.FileStore;
import java.util.Collection;
Expand Down Expand Up @@ -151,6 +152,11 @@ public boolean isFilesExistForPattern(PartitionId partitionId, Pattern allLogSeg
throw new UnsupportedOperationException("Method not supported");
}

@Override
public List<LogInfo> getLogSegmentMetadataFiles(PartitionId partitionId, boolean includeActiveLogSegment) {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public List<PartitionId> setBlobStoreStoppedState(List<PartitionId> partitionIds, boolean markStop) {
throw new UnsupportedOperationException("Method not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static class LocalChannelRequest implements NetworkRequest {
private long startTimeInMs;
private int processorId;

LocalChannelRequest(RequestInfo requestInfo, int processorId) {
public LocalChannelRequest(RequestInfo requestInfo, int processorId) {
DevenAhluwalia marked this conversation as resolved.
Show resolved Hide resolved
this.requestInfo = requestInfo;
this.processorId = processorId;
startTimeInMs = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.github.ambry.protocol.AdminResponse;
import com.github.ambry.protocol.DeleteRequest;
import com.github.ambry.protocol.DeleteResponse;
import com.github.ambry.protocol.FileCopyGetMetaDataRequest;
import com.github.ambry.protocol.FileCopyGetMetaDataResponse;
import com.github.ambry.protocol.GetRequest;
import com.github.ambry.protocol.GetResponse;
import com.github.ambry.protocol.PurgeRequest;
Expand Down Expand Up @@ -96,6 +98,9 @@ public RequestOrResponse getDecodedRequest(NetworkRequest networkRequest) throws
case PurgeRequest:
request = PurgeRequest.readFrom(dis, clusterMap);
break;
case FileCopyGetMetaDataRequest:
request = FileCopyGetMetaDataRequest.readFrom(dis, clusterMap);
break;
default:
throw new UnsupportedOperationException("Request type not supported");
}
Expand Down Expand Up @@ -153,6 +158,9 @@ public Response createErrorResponse(RequestOrResponse request, ServerErrorCode s
case PurgeRequest:
response = new PurgeResponse(request.getCorrelationId(), request.getClientId(), serverErrorCode);
break;
case FileCopyGetMetaDataRequest:
response = new FileCopyGetMetaDataResponse(request.getCorrelationId(), request.getClientId(), serverErrorCode);
break;
default:
throw new UnsupportedOperationException("Request type not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import javax.annotation.Nonnull;


public class FileCopyGetMetaDataRequest extends RequestOrResponse{
private PartitionId partitionId;
private String hostName;
private static final short File_Metadata_Request_Version_V1 = 1;
private final PartitionId partitionId;
private final String hostName;
public static final short File_Metadata_Request_Version_V1 = 1;
private static final int HostName_Field_Size_In_Bytes = 4;

public FileCopyGetMetaDataRequest(short versionId, int correlationId, String clientId,
PartitionId partitionId, String hostName) {
public FileCopyGetMetaDataRequest(
short versionId,
int correlationId,
String clientId,
@Nonnull PartitionId partitionId,
@Nonnull String hostName) {
super(RequestOrResponseType.FileCopyGetMetaDataRequest, versionId, correlationId, clientId);
if (partitionId == null) {
throw new IllegalArgumentException("Partition cannot be null");
}

if (hostName.isEmpty()){
throw new IllegalArgumentException("Host Name cannot be null");
}
Expand All @@ -47,8 +51,10 @@ public PartitionId getPartitionId() {
return partitionId;
}

protected static FileCopyGetMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException {
Short versionId = stream.readShort();
public static FileCopyGetMetaDataRequest readFrom(
@Nonnull DataInputStream stream,
@Nonnull ClusterMap clusterMap) throws IOException {
short versionId = stream.readShort();
validateVersion(versionId);
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
Expand All @@ -59,8 +65,11 @@ protected static FileCopyGetMetaDataRequest readFrom(DataInputStream stream, Clu

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName)
.append("]");
sb.append("FileCopyGetMetaDataRequest[")
.append("PartitionId=")
.append(partitionId.getId()).append(", HostName=")
.append(hostName)
.append("]");
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,33 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;


public class FileCopyGetMetaDataResponse extends Response {
private final int numberOfLogfiles;
private final List<LogInfo> logInfos;
private static final short File_Copy_Protocol_Metadata_Response_Version_V1 = 1;
public static final short File_Copy_Protocol_Metadata_Response_Version_V1 = 1;

static short CURRENT_VERSION = File_Copy_Protocol_Metadata_Response_Version_V1;

public FileCopyGetMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles,
List<LogInfo> logInfos, ServerErrorCode errorCode) {
super(RequestOrResponseType.FileCopyGetMetaDataResponse, versionId, correlationId, clientId, errorCode);

validateVersion(versionId);
this.numberOfLogfiles = numberOfLogfiles;
this.logInfos = logInfos;
}

public static FileCopyGetMetaDataResponse readFrom(DataInputStream stream) throws IOException {
public FileCopyGetMetaDataResponse(int correlationId, String clientId, ServerErrorCode serverErrorCode) {
this(CURRENT_VERSION, correlationId, clientId, 0, new ArrayList<>(), serverErrorCode);
}

public static FileCopyGetMetaDataResponse readFrom(
@Nonnull DataInputStream stream) throws IOException {
RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()];
if (type != RequestOrResponseType.FileCopyGetMetaDataResponse) {
throw new IllegalArgumentException("The type of request response is not compatible. Expected : {}, Actual : {}" +
Expand Down Expand Up @@ -72,8 +82,12 @@ public long sizeInBytes() {

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileMetaDataResponse[NumberOfLogfiles=").append(numberOfLogfiles).append(", logInfoList").append(
logInfos.toString()).append("]");
sb
.append("FileCopyGetMetaDataResponse[NumberOfLogfiles=")
.append(numberOfLogfiles)
.append(", logInfoList")
.append(logInfos.toString())
.append("]");
return sb.toString();
}

Expand All @@ -82,7 +96,7 @@ public int getNumberOfLogfiles() {
}

public List<LogInfo> getLogInfos() {
return logInfos;
return Collections.unmodifiableList(logInfos);
}

static void validateVersion(short version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,38 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import javax.annotation.Nonnull;


/**
* Contains the fileName and fileSizeInBytes for a local partition. This is used
* by LogInfo as part of filecopy metadata request.
*/
public class FileInfo {
private String fileName;
private long fileSizeInBytes;
private final String fileName;
private final long fileSizeInBytes;

private static final int FileName_Field_Size_In_Bytes = 4;

private static final int FileSize_Field_Size_In_Bytes = 8;


public FileInfo(String fileName, long fileSize) {
public FileInfo(
@Nonnull String fileName,
@Nonnull long fileSize) {
this.fileName = fileName;
this.fileSizeInBytes = fileSize;
}

public long sizeInBytes() {
return FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes;
}
public static FileInfo readFrom(DataInputStream stream) throws IOException {

public static FileInfo readFrom(@Nonnull DataInputStream stream) throws IOException {
String fileName = Utils.readIntString(stream);
long fileSize = stream.readLong();
return new FileInfo(fileName, fileSize);
}
public void writeTo(ByteBuf buf) {

public void writeTo(@Nonnull ByteBuf buf) {
Utils.serializeString(buf, fileName, Charset.defaultCharset());
buf.writeLong(fileSizeInBytes);
}
Expand Down
Loading
Loading