Skip to content

Commit

Permalink
HADOOP-19232: [ABFS][FNSOverBlob] Implementing Ingress Support with v…
Browse files Browse the repository at this point in the history
…arious Fallback Handling (#7272)

Contributed by Anmol Asrani
  • Loading branch information
anmolanmol1234 authored Jan 30, 2025
1 parent a592666 commit c7e1b66
Show file tree
Hide file tree
Showing 65 changed files with 6,429 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,20 @@ void setIsNamespaceEnabledAccount(String isNamespaceEnabledAccount) {
this.isNamespaceEnabledAccount = isNamespaceEnabledAccount;
}

/**
* Checks if the FixedSASTokenProvider is configured for the current account.
*
* @return true if the FixedSASTokenProvider is configured, false otherwise.
*/
public boolean isFixedSASTokenProviderConfigured() {
try {
return getSASTokenProvider() instanceof FixedSASTokenProvider;
} catch (AzureBlobFileSystemException e) {
LOG.debug("Failed to get SAS token provider", e);
return false;
}
}

private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
String value = getPasswordString(key);
if (StringUtils.isBlank(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -122,11 +123,13 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_FIXED_TOKEN;
import static org.apache.hadoop.fs.azurebfs.constants.FSOperationType.CREATE_FILESYSTEM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.UNAUTHORIZED_SAS;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -234,6 +237,29 @@ public void initialize(URI uri, Configuration configuration)
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

/*
* Validates if the correct SAS Token provider is configured for non-HNS accounts.
* For non-HNS accounts, if the authentication type is set to SAS, only a fixed SAS Token is supported as of now.
* A custom SAS Token Provider should not be configured in such cases, as it will override the FixedSASTokenProvider and render it unused.
* If the namespace is not enabled and the FixedSASTokenProvider is not configured,
* an InvalidConfigurationValueException will be thrown.
*
* @throws InvalidConfigurationValueException if account is not namespace enabled and FixedSASTokenProvider is not configured.
*/
try {
if (abfsConfiguration.getAuthType(abfsConfiguration.getAccountName()) == AuthType.SAS && // Auth type is SAS
!tryGetIsNamespaceEnabled(new TracingContext(initFSTracingContext)) && // Account is FNS
!abfsConfiguration.isFixedSASTokenProviderConfigured()) { // Fixed SAS Token Provider is not configured
throw new InvalidConfigurationValueException(FS_AZURE_SAS_FIXED_TOKEN, UNAUTHORIZED_SAS);
}
} catch (InvalidConfigurationValueException ex) {
LOG.error("File system configured with Invalid SAS Token Provider for FNS Accounts", ex);
throw ex;
} catch (AzureBlobFileSystemException ex) {
LOG.error("Failed to determine account type for auth type validation", ex);
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

/*
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
* Fail initialization of filesystem if the configs are provided. CPK is of
Expand Down Expand Up @@ -266,6 +292,7 @@ public void initialize(URI uri, Configuration configuration)
}
}
}
getAbfsStore().updateClientWithNamespaceInfo(new TracingContext(initFSTracingContext));

LOG.trace("Initiate check for delegation token manager");
if (UserGroupInformation.isSecurityEnabled()) {
Expand Down Expand Up @@ -797,7 +824,7 @@ private FileStatus getFileStatus(final Path path,
Path qualifiedPath = makeQualified(path);

try {
return abfsStore.getFileStatus(qualifiedPath, tracingContext);
return getAbfsStore().getFileStatus(qualifiedPath, tracingContext);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ public AzureBlobFileSystemStore(
"abfs-bounded");
}

/**
* Updates the client with the namespace information.
*
* @param tracingContext the tracing context to be used for the operation
* @throws AzureBlobFileSystemException if an error occurs while updating the client
*/
public void updateClientWithNamespaceInfo(TracingContext tracingContext)
throws AzureBlobFileSystemException {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
AbfsClient.setIsNamespaceEnabled(isNamespaceEnabled);
}

/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
Expand Down Expand Up @@ -635,14 +647,15 @@ public OutputStream createFile(final Path path,
final FsPermission permission, final FsPermission umask,
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);
createClient.getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);

String relativePath = getRelativePath(path);
boolean isAppendBlob = false;
Expand All @@ -660,9 +673,9 @@ public OutputStream createFile(final Path path,
}

final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (createClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path));
createClient.getEncryptionContextProvider(), getRelativePath(path));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
}
Expand All @@ -677,7 +690,7 @@ public OutputStream createFile(final Path path,
);

} else {
op = getClient().createPath(relativePath, true,
op = createClient.createPath(relativePath, true,
overwrite,
new Permissions(isNamespaceEnabled, permission, umask),
isAppendBlob,
Expand All @@ -689,15 +702,16 @@ public OutputStream createFile(final Path path,
perfInfo.registerResult(op.getResult()).registerSuccess(true);

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);

String eTag = extractEtagHeader(op.getResult());
return new AbfsOutputStream(
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
0,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand All @@ -720,12 +734,12 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext) throws IOException {
AbfsRestOperation op;

AbfsClient createClient = getClientHandler().getIngressClient();
try {
// Trigger a create with overwrite=false first so that eTag fetch can be
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = getClient().createPath(relativePath, true, false, permissions,
op = createClient.createPath(relativePath, true, false, permissions,
isAppendBlob, null, contextEncryptionAdapter, tracingContext);

} catch (AbfsRestOperationException e) {
Expand All @@ -745,12 +759,11 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
String eTag = extractEtagHeader(op.getResult());

try {
// overwrite only if eTag matches with the file properties fetched befpre
op = getClient().createPath(relativePath, true, true, permissions,
op = createClient.createPath(relativePath, true, true, permissions,
isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
Expand Down Expand Up @@ -778,22 +791,24 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
*
* @param isAppendBlob is Append blob support enabled?
* @param lease instance of AbfsLease for this AbfsOutputStream.
* @param client AbfsClient.
* @param clientHandler AbfsClientHandler.
* @param statistics FileSystem statistics.
* @param path Path for AbfsOutputStream.
* @param position Position or offset of the file being opened, set to 0
* when creating a new file, but needs to be set for APPEND
* calls on the same file.
* @param eTag eTag of the file.
* @param tracingContext instance of TracingContext for this AbfsOutputStream.
* @return AbfsOutputStreamContext instance with the desired parameters.
*/
private AbfsOutputStreamContext populateAbfsOutputStreamContext(
boolean isAppendBlob,
AbfsLease lease,
AbfsClient client,
AbfsClientHandler clientHandler,
FileSystem.Statistics statistics,
String path,
long position,
String eTag,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
Expand All @@ -814,24 +829,38 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withEncryptionAdapter(contextEncryptionAdapter)
.withBlockFactory(getBlockFactory())
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClient(client)
.withClientHandler(clientHandler)
.withPosition(position)
.withFsStatistics(statistics)
.withPath(path)
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.withIngressServiceType(abfsConfiguration.getIngressServiceType())
.withDFSToBlobFallbackEnabled(abfsConfiguration.isDfsToBlobFallbackEnabled())
.withETag(eTag)
.build();
}

/**
* Creates a directory.
*
* @param path Path of the directory to create.
* @param permission Permission of the directory.
* @param umask Umask of the directory.
* @param tracingContext tracing context
*
* @throws AzureBlobFileSystemException server error.
*/
public void createDirectory(final Path path, final FsPermission permission,
final FsPermission umask, TracingContext tracingContext)
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
createClient.getFileSystem(),
path,
permission,
umask,
Expand All @@ -841,7 +870,7 @@ public void createDirectory(final Path path, final FsPermission permission,
!isNamespaceEnabled || abfsConfiguration.isEnabledMkdirOverwrite();
Permissions permissions = new Permissions(isNamespaceEnabled,
permission, umask);
final AbfsRestOperation op = getClient().createPath(getRelativePath(path),
final AbfsRestOperation op = createClient.createPath(getRelativePath(path),
false, overwrite, permissions, false, null, null, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
Expand Down Expand Up @@ -976,6 +1005,7 @@ public OutputStream openFileForWrite(final Path path,
overwrite);

String relativePath = getRelativePath(path);
AbfsClient writeClient = getClientHandler().getIngressClient();

final AbfsRestOperation op = getClient()
.getPathStatus(relativePath, false, tracingContext, null);
Expand All @@ -1000,8 +1030,9 @@ public OutputStream openFileForWrite(final Path path,
}

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
final String eTag = extractEtagHeader(op.getResult());
final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (writeClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
final String encryptionContext = op.getResult()
.getResponseHeader(
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
Expand All @@ -1010,7 +1041,7 @@ public OutputStream openFileForWrite(final Path path,
"File doesn't have encryptionContext.");
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path),
writeClient.getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
Expand All @@ -1020,10 +1051,11 @@ public OutputStream openFileForWrite(final Path path,
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
offset,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String LIST = "list";
public static final String BLOCK_BLOB_TYPE = "BlockBlob";
public static final String APPEND_BLOCK = "appendblock";

//Abfs Http Client Constants for Blob Endpoint APIs.

Expand Down Expand Up @@ -238,7 +240,7 @@ public static ApiVersion getCurrentVersion() {
public static final String PUT_BLOCK_LIST = "PutBlockList";

/**
* Value that differentiates categories of the http_status.
* Value that differentiates categories of the HTTP status.
* <pre>
* 100 - 199 : Informational responses
* 200 - 299 : Successful responses
Expand All @@ -249,6 +251,28 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;

/**
* XML version declaration for the block list.
*/
public static final String XML_VERSION = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>%n";

/**
* Start tag for the block list XML.
*/
public static final String BLOCK_LIST_START_TAG = "<BlockList>%n";

/**
* End tag for the block list XML.
*/
public static final String BLOCK_LIST_END_TAG = "</BlockList>%n";

/**
* Format string for the latest block in the block list XML.
* The placeholder will be replaced with the block identifier.
*/
public static final String LATEST_BLOCK_FORMAT = "<Latest>%s</Latest>%n";


/**
* List of configurations that are related to Customer-Provided-Keys.
* <ol>
Expand Down Expand Up @@ -289,6 +313,12 @@ public static ApiVersion getCurrentVersion() {
public static final String APACHE_IMPL = "Apache";
public static final String JDK_FALLBACK = "JDK_fallback";
public static final String KEEP_ALIVE_CACHE_CLOSED = "KeepAliveCache is closed";
public static final String DFS_FLUSH = "D";
public static final String DFS_APPEND = "D";
public static final String BLOB_FLUSH = "B";
public static final String BLOB_APPEND = "B";
public static final String FALLBACK_FLUSH = "FB";
public static final String FALLBACK_APPEND = "FB";

private AbfsHttpConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public final class FileSystemConfigurations {
*/
public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;

/**
* Length of the block ID used for appends.
*/
public static final int BLOCK_ID_LENGTH = 60;

/**
* Buffer blocks to disk.
* Capacity is limited to available disk space.
Expand Down
Loading

0 comments on commit c7e1b66

Please sign in to comment.