Skip to content

Commit

Permalink
Support for S3A Connector #14312 (#14474)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrajeshbabu authored Nov 21, 2024
1 parent 81f16f3 commit c5e00bb
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ public class S3PinotFS extends BasePinotFS {
private static final Logger LOGGER = LoggerFactory.getLogger(S3PinotFS.class);

private static final String DELIMITER = "/";
public static final String S3_SCHEME = "s3://";
public static final String S3_SCHEME = "s3";
public static final String S3A_SCHEME = "s3a";
public static final String SCHEME_SEPARATOR = "://";

private S3Client _s3Client;
private boolean _disableAcl;
private ServerSideEncryption _serverSideEncryption = null;
Expand Down Expand Up @@ -501,12 +504,14 @@ public long length(URI fileUri)
public String[] listFiles(URI fileUri, boolean recursive)
throws IOException {
ImmutableList.Builder<String> builder = ImmutableList.builder();
String scheme = fileUri.getScheme();
Preconditions.checkArgument(scheme.equals(S3_SCHEME) || scheme.equals(S3A_SCHEME));
visitFiles(fileUri, recursive, s3Object -> {
if (!s3Object.key().equals(fileUri.getPath()) && !s3Object.key().endsWith(DELIMITER)) {
builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object));
builder.add(scheme + SCHEME_SEPARATOR + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object));
}
}, commonPrefix -> {
builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(commonPrefix));
builder.add(scheme + SCHEME_SEPARATOR + fileUri.getHost() + DELIMITER + getNormalizedFileKey(commonPrefix));
});
String[] listedFiles = builder.build().toArray(new String[0]);
LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
Expand All @@ -517,17 +522,19 @@ public String[] listFiles(URI fileUri, boolean recursive)
public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
throws IOException {
ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
String scheme = fileUri.getScheme();
Preconditions.checkArgument(scheme.equals(S3_SCHEME) || scheme.equals(S3A_SCHEME));
visitFiles(fileUri, recursive, s3Object -> {
if (!s3Object.key().equals(fileUri.getPath())) {
FileMetadata.Builder fileBuilder = new FileMetadata.Builder().setFilePath(
S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object))
scheme + SCHEME_SEPARATOR + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object))
.setLastModifiedTime(s3Object.lastModified().toEpochMilli()).setLength(s3Object.size())
.setIsDirectory(s3Object.key().endsWith(DELIMITER));
listBuilder.add(fileBuilder.build());
}
}, commonPrefix -> {
FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
.setFilePath(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(commonPrefix))
.setFilePath(scheme + SCHEME_SEPARATOR + fileUri.getHost() + DELIMITER + getNormalizedFileKey(commonPrefix))
.setIsDirectory(true);
listBuilder.add(fileBuilder.build());
});
Expand Down
Loading

0 comments on commit c5e00bb

Please sign in to comment.