software.amazon.awssdk.crt
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
index e9f5313273580..8862d06ba0502 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
@@ -75,6 +75,10 @@
import org.apache.hadoop.fs.s3a.UploadInfo;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
+import org.apache.hadoop.fs.s3a.impl.streams.StreamThreadOptions;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
@@ -82,6 +86,7 @@
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.RateLimiting;
import org.apache.hadoop.util.functional.Tuples;
@@ -227,6 +232,9 @@ public class S3AStoreImpl
@Override
protected void serviceInit(final Configuration conf) throws Exception {
+ // create and register the stream factory, which will
+ // then follow the service lifecycle
+ objectInputStreamFactory = createStreamFactory(conf);
if(conf.getEnum(INPUT_STREAM_TYPE, InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics) {
final S3AsyncClient s3AsyncClient = getOrCreateAsyncCRTClient(conf);
objectInputStreamFactory = createStreamFactory(conf, s3AsyncClient);
@@ -235,10 +243,15 @@ protected void serviceInit(final Configuration conf) throws Exception {
}
addService(objectInputStreamFactory);
- // init all child services
+ // init all child services, including the stream factory
super.serviceInit(conf);
+
+ // pass down extra information to the stream factory.
+ finishStreamFactoryInit();
}
+
+
private S3AsyncClient getOrCreateAsyncCRTClient(final Configuration conf) throws Exception {
final S3AsyncClient s3AsyncClient;
boolean analyticsAcceleratorCRTEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
@@ -260,9 +273,8 @@ protected void serviceStart() throws Exception {
initLocalDirAllocator();
}
-
/**
- * Return the store capabilities.
+ * Return the store path capabilities.
* If the object stream factory is non-null, hands off the
* query to that factory if not handled here.
* @param path path to query the capability of.
@@ -279,6 +291,7 @@ public boolean hasPathCapability(final Path path, final String capability) {
}
}
+
/**
* Return the capabilities of input streams created
* through the store.
@@ -948,6 +961,25 @@ public File createTemporaryFileForWriting(String pathStr,
return File.createTempFile(prefix, null, dir);
}
+ /*
+ =============== BEGIN ObjectInputStreamFactory ===============
+ */
+
+ /**
+ * All stream factory initialization required after {@code Service.init()},
+ * after all other services have themselves been initialized.
+ */
+ private void finishStreamFactoryInit() {
+ // must be on be invoked during service initialization
+ Preconditions.checkState(isInState(STATE.INITED),
+ "Store is in wrong state: %s", getServiceState());
+ Preconditions.checkState(clientManager.isInState(STATE.INITED),
+ "Client Manager is in wrong state: %s", clientManager.getServiceState());
+
+ // finish initialization and pass down callbacks to self
+ objectInputStreamFactory.bind(new FactoryCallbacks());
+ }
+
@Override /* ObjectInputStreamFactory */
public ObjectInputStream readObject(ObjectReadParameters parameters)
throws IOException {
@@ -959,4 +991,32 @@ public ObjectInputStream readObject(ObjectReadParameters parameters)
public StreamThreadOptions threadRequirements() {
return objectInputStreamFactory.threadRequirements();
}
+
+ /**
+ * This operation is not implemented, as
+ * is this class which invokes it on the actual factory.
+ * @param callbacks factory callbacks.
+ * @throws UnsupportedOperationException always
+ */
+ @Override /* ObjectInputStreamFactory */
+ public void bind(final StreamFactoryCallbacks callbacks) {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ /**
+ * Callbacks from {@link ObjectInputStreamFactory} instances.
+ */
+ private class FactoryCallbacks implements StreamFactoryCallbacks {
+
+ @Override
+ public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException {
+ // Needs support of the CRT before the requireCRT can be used
+ LOG.debug("Stream factory requested async client");
+ return clientManager().getOrCreateAsyncClient();
+ }
+ }
+
+ /*
+ =============== END ObjectInputStreamFactory ===============
+ */
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
index d3b6c67113997..7c20f7d66f61b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.Preconditions;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
@@ -34,6 +35,33 @@ protected AbstractObjectInputStreamFactory(final String name) {
super(name);
}
+ /**
+ * Callbacks.
+ */
+ private StreamFactoryCallbacks callbacks;
+
+ /**
+ * Bind to the callbacks.
+ *
+ * The base class checks service state then stores
+ * the callback interface.
+ * @param factoryCallbacks callbacks needed by the factories.
+ */
+ @Override
+ public void bind(final StreamFactoryCallbacks factoryCallbacks) {
+ // must be on be invoked during service initialization
+ Preconditions.checkState(isInState(STATE.INITED),
+ "Input Stream factory %s is in wrong state: %s",
+ this, getServiceState());
+ this.callbacks = factoryCallbacks;
+ }
+
+ /**
+ * Return base capabilities of all stream factories,
+ * defined what the base ObjectInputStream class does.
+ * @param capability string to query the stream support for.
+ * @return true if implemented
+ */
@Override
public boolean hasCapability(final String capability) {
switch (toLowerCase(capability)) {
@@ -45,4 +73,11 @@ public boolean hasCapability(final String capability) {
}
}
+ /**
+ * Get the factory callbacks.
+ * @return callbacks.
+ */
+ public StreamFactoryCallbacks callbacks() {
+ return callbacks;
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
index 1293bd4f92a06..d8fe87f9cf7fd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
@@ -20,6 +20,8 @@
import java.io.IOException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.service.Service;
@@ -38,6 +40,14 @@
public interface ObjectInputStreamFactory
extends Service, StreamCapabilities {
+ /**
+ * Set extra initialization parameters.
+ * This MUST ONLY be invoked between {@code init()}
+ * and {@code start()}.
+ * @param callbacks extra initialization parameters
+ */
+ void bind(StreamFactoryCallbacks callbacks);
+
/**
* Create a new input stream.
* There is no requirement to actually contact the store; this is generally done
@@ -55,5 +65,18 @@ ObjectInputStream readObject(ObjectReadParameters parameters)
*/
StreamThreadOptions threadRequirements();
+ /**
+ * Callbacks for stream factories.
+ */
+ interface StreamFactoryCallbacks {
+
+ /**
+ * Get the Async S3Client, raising a failure to create as an IOException.
+ * @param requireCRT is the CRT required.
+ * @return the Async S3 client
+ * @throws IOException failure to create the client.
+ */
+ S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws IOException;
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
index 46dd3266cab4c..b922b2f7f3d3c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
@@ -50,12 +50,20 @@ public final class StreamIntegration {
*/
public static ObjectInputStreamFactory createStreamFactory(final Configuration conf) {
// choose the default input stream type
+
+ // work out the default stream; this includes looking at the
+ // deprecated prefetch enabled key to see if it is set.
InputStreamType defaultStream = InputStreamType.DEFAULT_STREAM_TYPE;
if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {
+
+ // prefetch enabled, warn (once) then change it to be the default.
WARN_PREFETCH_KEY.info("Using {} is deprecated: choose the appropriate stream in {}",
PREFETCH_ENABLED_KEY, INPUT_STREAM_TYPE);
defaultStream = InputStreamType.Prefetch;
}
+
+ // retrieve the enum value, returning the configured value or
+ // the default...then instantiate it.
return conf.getEnum(INPUT_STREAM_TYPE, defaultStream)
.factory()
.apply(conf, null);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/PrefetchingInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/PrefetchingInputStreamFactory.java
index b490294b7696d..4109580c4ce5d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/PrefetchingInputStreamFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/PrefetchingInputStreamFactory.java
@@ -62,7 +62,7 @@ protected void serviceInit(final Configuration conf) throws Exception {
long prefetchBlockSizeLong =
longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, 1);
checkState(prefetchBlockSizeLong < Integer.MAX_VALUE,
- "S3A prefatch block size exceeds int limit");
+ "S3A prefetch block size exceeds int limit");
prefetchBlockSize = (int) prefetchBlockSizeLong;
prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
@@ -87,4 +87,5 @@ public ObjectInputStream readObject(final ObjectReadParameters parameters) throw
public StreamThreadOptions threadRequirements() {
return new StreamThreadOptions(prefetchBlockCount, 0, true, false);
}
+
}