Skip to content

Commit

Permalink
Updated S3A integration to follow stream factory callbacks
Browse files Browse the repository at this point in the history
Dummy
  • Loading branch information
rajdchak committed Jan 24, 2025
1 parent 0dd1a7a commit 1ee9f11
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,6 @@ protected void serviceInit(final Configuration conf) throws Exception {
// create and register the stream factory, which will
// then follow the service lifecycle
objectInputStreamFactory = createStreamFactory(conf);
if(conf.getEnum(INPUT_STREAM_TYPE, InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics) {
final S3AsyncClient s3AsyncClient = getOrCreateAsyncCRTClient(conf);
objectInputStreamFactory = createStreamFactory(conf, s3AsyncClient);
} else {
objectInputStreamFactory = createStreamFactory(conf);
}
addService(objectInputStreamFactory);

// init all child services, including the stream factory
Expand All @@ -250,23 +244,6 @@ protected void serviceInit(final Configuration conf) throws Exception {
finishStreamFactoryInit();
}



private S3AsyncClient getOrCreateAsyncCRTClient(final Configuration conf) throws Exception {
final S3AsyncClient s3AsyncClient;
boolean analyticsAcceleratorCRTEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
LOG.info("Using S3SeekableInputStream");
if(analyticsAcceleratorCRTEnabled) {
LOG.info("Using S3 CRT client for analytics accelerator S3");
s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
} else {
LOG.info("Using S3 async client for analytics accelerator S3");
s3AsyncClient = getOrCreateAsyncClient();
}
return s3AsyncClient;
}

@Override
protected void serviceStart() throws Exception {
super.serviceStart();
Expand Down Expand Up @@ -969,7 +946,7 @@ public File createTemporaryFileForWriting(String pathStr,
* All stream factory initialization required after {@code Service.init()},
* after all other services have themselves been initialized.
*/
private void finishStreamFactoryInit() {
private void finishStreamFactoryInit() throws Exception {
// must be on be invoked during service initialization
Preconditions.checkState(isInState(STATE.INITED),
"Store is in wrong state: %s", getServiceState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected AbstractObjectInputStreamFactory(final String name) {
* @param factoryCallbacks callbacks needed by the factories.
*/
@Override
public void bind(final StreamFactoryCallbacks factoryCallbacks) {
public void bind(final StreamFactoryCallbacks factoryCallbacks) throws Exception {
// must be on be invoked during service initialization
Preconditions.checkState(isInState(STATE.INITED),
"Input Stream factory %s is in wrong state: %s",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hadoop.fs.s3a.impl.streams;

import java.util.function.BiFunction;
import java.util.function.Function;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -44,14 +43,15 @@ public enum InputStreamType {
/**
* The analytics input stream.
*/
Analytics("analytics", (c, factoryParams) -> new S3ASeekableInputStreamFactory(factoryParams.getS3AsyncClient()));
Analytics("analytics", c ->
new S3ASeekableInputStreamFactory());

/**
* Name.
*/
private final String name;

private final BiFunction<Configuration, FactoryParams, ObjectInputStreamFactory> factory;
private final Function<Configuration, ObjectInputStreamFactory> factory;
/**
* String name.
* @return the name
Expand All @@ -60,11 +60,7 @@ public String getName() {
return name;
}

InputStreamType(String name, Function<Configuration, ObjectInputStreamFactory> factory) {
this(name, (c, s) -> factory.apply(c));
}

InputStreamType(String name, BiFunction<Configuration, FactoryParams, ObjectInputStreamFactory> factory) {
InputStreamType(String name, final Function<Configuration, ObjectInputStreamFactory> factory) {
this.name = name;
this.factory = factory;
}
Expand All @@ -73,7 +69,7 @@ public String getName() {
* Factory constructor.
* @return the factory associated with this stream type.
*/
public BiFunction<Configuration, FactoryParams, ObjectInputStreamFactory> factory() {
public Function<Configuration, ObjectInputStreamFactory> factory() {
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface ObjectInputStreamFactory
* and {@code start()}.
* @param callbacks extra initialization parameters
*/
void bind(StreamFactoryCallbacks callbacks);
void bind(StreamFactoryCallbacks callbacks) throws Exception;

/**
* Create a new input stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3ASeekableInputStream;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
Expand All @@ -33,25 +32,31 @@

public class S3ASeekableInputStreamFactory extends AbstractObjectInputStreamFactory {

private final S3AsyncClient s3AsyncClient;
private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
private boolean requireCrt;

public S3ASeekableInputStreamFactory(S3AsyncClient s3AsyncClient) {
public S3ASeekableInputStreamFactory() {
super("S3ASeekableInputStreamFactory");
this.s3AsyncClient = s3AsyncClient;
}

@Override
protected void serviceInit(final Configuration conf) throws Exception {
super.serviceInit(conf);
ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
this.seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.s3SeekableInputStreamFactory =
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(this.s3AsyncClient),
seekableInputStreamConfiguration);
this.requireCrt = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
}

@Override
public void bind(final StreamFactoryCallbacks factoryCallbacks) throws Exception {
super.bind(factoryCallbacks);
this.s3SeekableInputStreamFactory = new S3SeekableInputStreamFactory(
new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
seekableInputStreamConfiguration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a.impl.streams;

import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +38,8 @@ public final class StreamIntegration {
LoggerFactory.getLogger(
"org.apache.hadoop.conf.Configuration.deprecation");

public static final Logger LOG = LoggerFactory.getLogger(StreamIntegration.class);

/**
* Warn once on use of prefetch boolean flag rather than enum.
*/
Expand All @@ -54,7 +57,12 @@ public static ObjectInputStreamFactory createStreamFactory(final Configuration c
// work out the default stream; this includes looking at the
// deprecated prefetch enabled key to see if it is set.
InputStreamType defaultStream = InputStreamType.DEFAULT_STREAM_TYPE;
if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {

if(conf.getEnum(INPUT_STREAM_TYPE, InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics) {
LOG.info("Using S3SeekableInputStream");
defaultStream = InputStreamType.Analytics;

} else if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {

// prefetch enabled, warn (once) then change it to be the default.
WARN_PREFETCH_KEY.info("Using {} is deprecated: choose the appropriate stream in {}",
Expand All @@ -66,22 +74,7 @@ public static ObjectInputStreamFactory createStreamFactory(final Configuration c
// the default...then instantiate it.
return conf.getEnum(INPUT_STREAM_TYPE, defaultStream)
.factory()
.apply(conf, null);
.apply(conf);
}

/**
* Create the s3 seekable input stream factory.
* @param conf configuration
* @param s3AsyncClient s3 async client
* @return a stream factory.
*/
public static ObjectInputStreamFactory createStreamFactory(final Configuration conf, final S3AsyncClient s3AsyncClient) {
FactoryParams factoryParams = FactoryParams.builder()
.withS3AsyncClient(s3AsyncClient)
.build();
InputStreamType defaultStream = InputStreamType.DEFAULT_STREAM_TYPE;
return conf.getEnum(INPUT_STREAM_TYPE, defaultStream)
.factory()
.apply(conf, factoryParams); }

}

0 comments on commit 1ee9f11

Please sign in to comment.