Skip to content

Commit

Permalink
Added changes for filestore implementation for FileCopy
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Balani committed Jan 22, 2025
1 parent 5df7368 commit 3dd73ad
Show file tree
Hide file tree
Showing 6 changed files with 403 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.ambry.clustermap;

public class FileStoreException extends RuntimeException{

private static final long serialVersionUID = 1L;
private final FileStoreErrorCode error;

public FileStoreException(String s, FileStoreErrorCode error) {
super(s);
this.error = error;
}

public FileStoreException(String s, FileStoreErrorCode error, Throwable throwable) {
super(s, throwable);
this.error = error;
}

public enum FileStoreErrorCode{
FileStoreRunningFailure
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.github.ambry.config;

public class FileCopyConfig {

public static final String PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK = "parallel.partition.hydration.count.per.disk";
@Config(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK)
public final int parallelPartitionHydrationCountPerDisk;

public static final String NUMBER_OF_FILE_COPY_THREADS = "number.of.file.copy.threads";
@Config(NUMBER_OF_FILE_COPY_THREADS)
public final int numberOfFileCopyThreads;

public static final String FILE_CHUNK_TIMEOUT_IN_MINUTES = "file.chunk.timeout.in.minutes";
@Config(FILE_CHUNK_TIMEOUT_IN_MINUTES)
public final long fileChunkTimeoutInMins;

/**
* The frequency at which the data gets flushed to disk
*/
public static final String STORE_DATA_FLUSH_INTERVAL_IN_MBS = "store.data.flush.interval.In.MBs";
@Config(STORE_DATA_FLUSH_INTERVAL_IN_MBS)
@Default("1000")
public final long storeDataFlushIntervalInMbs;

public static final String FILE_COPY_META_DATA_FILE_NAME = "file.copy.meta.data.file.name";
@Config(FILE_COPY_META_DATA_FILE_NAME)
@Default("sealed_logs_metadata_file")
public final String fileCopyMetaDataFileName;

public FileCopyConfig(VerifiableProperties verifiableProperties) {
fileCopyMetaDataFileName = verifiableProperties.getString(FILE_COPY_META_DATA_FILE_NAME, "sealed_logs_metadata_file");
parallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1);
numberOfFileCopyThreads = verifiableProperties.getInt(NUMBER_OF_FILE_COPY_THREADS, 4);
fileChunkTimeoutInMins = verifiableProperties.getInt(FILE_CHUNK_TIMEOUT_IN_MINUTES, 5);
storeDataFlushIntervalInMbs = verifiableProperties.getLong(STORE_DATA_FLUSH_INTERVAL_IN_MBS, 1000);
}
}
27 changes: 27 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/FileInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

package com.github.ambry.store;

public class FileInfo {
private String fileName;
private final long fileSize;

public FileInfo(String fileName, Long fileSize) {
this.fileName = fileName;
this.fileSize = fileSize;
}
public String getFileName() {
return fileName;
}

public Long getFileSize() {
return fileSize;
}

@Override
public String toString() {
return "FileInfo{" +
"fileName='" + fileName + '\'' +
", fileSize=" + fileSize +
'}';
}
}
49 changes: 49 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/LogInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.github.ambry.store;

import java.util.List;


public class LogInfo {
FileInfo sealedSegment;
List<FileInfo> indexSegments;
List<FileInfo> bloomFilters;

public LogInfo(FileInfo sealedSegment, List<FileInfo> indexSegments, List<FileInfo> bloomFilters) {
this.sealedSegment = sealedSegment;
this.indexSegments = indexSegments;
this.bloomFilters = bloomFilters;
}

public FileInfo getSealedSegment() {
return sealedSegment;
}

public void setSealedSegments(FileInfo sealedSegments) {
this.sealedSegment = sealedSegments;
}

public List<FileInfo> getIndexSegments() {
return indexSegments;
}

public void setIndexSegments(List<FileInfo> indexSegments) {
this.indexSegments = indexSegments;
}

public List<FileInfo> getBloomFilters() {
return bloomFilters;
}

public void setBloomFilters(List<FileInfo> bloomFilters) {
this.bloomFilters = bloomFilters;
}

@Override
public String toString() {
return "LogInfo{" +
"sealedSegment=" + sealedSegment +
", indexSegments=" + indexSegments +
", bloomFilters=" + bloomFilters +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.ConnectionPoolConfig;
import com.github.ambry.config.DiskManagerConfig;
import com.github.ambry.config.FileCopyConfig;
import com.github.ambry.config.Http2ClientConfig;
import com.github.ambry.config.NettyConfig;
import com.github.ambry.config.NetworkConfig;
Expand Down Expand Up @@ -86,15 +87,21 @@
import com.github.ambry.rest.ServerSecurityServiceFactory;
import com.github.ambry.rest.StorageServerNettyFactory;
import com.github.ambry.server.storagestats.AggregatedAccountStorageStats;
import com.github.ambry.store.FileInfo;
import com.github.ambry.store.FileStore;
import com.github.ambry.store.LogInfo;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.StorageManager;
import com.github.ambry.store.StoreKeyConverterFactory;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.Time;
import com.github.ambry.utils.Utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -478,6 +485,32 @@ public void startup() throws InstantiationException {
long processingTime = SystemTime.getInstance().milliseconds() - startTime;
metrics.serverStartTimeInMs.update(processingTime);
logger.info("Server startup time in Ms {}", processingTime);

// Testing FileStore utils
FileCopyConfig fileCopyConfig = new FileCopyConfig(properties);
FileStore fileStore = new FileStore(fileCopyConfig);
fileStore.start();
List<LogInfo> logInfoList = Collections.singletonList(new LogInfo(new FileInfo("0_log", 20000L),
Collections.singletonList(new FileInfo("0_index", 100L)),
Collections.singletonList(new FileInfo("0_bloom", 50L))));
System.out.println("Persisting metadata" + logInfoList + " to file");
fileStore.persistMetaDataToFile("/tmp/0/", logInfoList);
System.out.println("Reading metadata" + fileStore.readMetaDataFromFile("/tmp/0/") + " from file");
String chunkPath = "/tmp/0/test_chunk";
try (FileInputStream inputStream = new FileInputStream(chunkPath)) {
System.out.println("Trying putChunkToFile for chunk at " + chunkPath);
fileStore.putChunkToFile("/tmp/0/0_log", inputStream);
} catch (IOException e) {
System.err.println("An error occurred: " + e.getMessage());
}

FileInputStream inputStream = fileStore.getStreamForFileRead("/tmp/0/", "0_log");
long fileSize = inputStream.available();
byte[] content = new byte[(int) fileSize]; // Read the content of the source file into a byte array
inputStream.read(content); // Read bytes into the array
System.out.println("Parsed log file contents read: " + new String(content));


} catch (Exception e) {
logger.error("Error during startup", e);
throw new InstantiationException("failure during startup " + e);
Expand Down
Loading

0 comments on commit 3dd73ad

Please sign in to comment.