Skip to content

Commit

Permalink
Addressed Steve's comments about renaming and adjusting tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rajdchak committed Jan 24, 2025
1 parent 1ee9f11 commit ed310a0
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,31 @@
* under the License.
*/

package org.apache.hadoop.fs.s3a;
package org.apache.hadoop.fs.s3a.impl.streams;

import java.io.EOFException;
import java.io.IOException;

import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

public class S3ASeekableInputStream extends ObjectInputStream implements StreamCapabilities {
public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities {

private S3SeekableInputStream inputStream;
private long lastReadCurrentPos = 0;
private volatile boolean closed;

public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableInputStream.class);
public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class);

public S3ASeekableInputStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
super(parameters);
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.hadoop.fs.s3a.impl.streams;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3ASeekableInputStream;
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
Expand All @@ -30,14 +29,14 @@

import static org.apache.hadoop.fs.s3a.Constants.*;

public class S3ASeekableInputStreamFactory extends AbstractObjectInputStreamFactory {
public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {

private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
private boolean requireCrt;

public S3ASeekableInputStreamFactory() {
super("S3ASeekableInputStreamFactory");
public AnalyticsStreamFactory() {
super("AnalyticsStreamFactory");
}

@Override
Expand All @@ -61,7 +60,7 @@ public void bind(final StreamFactoryCallbacks factoryCallbacks) throws Exception

@Override
public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException {
return new S3ASeekableInputStream(
return new AnalyticsStream(
parameters,
s3SeekableInputStreamFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public enum InputStreamType {
* The analytics input stream.
*/
Analytics("analytics", c ->
new S3ASeekableInputStreamFactory());
new AnalyticsStreamFactory());

/**
* Name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static ObjectInputStreamFactory createStreamFactory(final Configuration c
InputStreamType defaultStream = InputStreamType.DEFAULT_STREAM_TYPE;

if(conf.getEnum(INPUT_STREAM_TYPE, InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics) {
LOG.info("Using S3SeekableInputStream");
LOG.info("Using AnalyticsStream");
defaultStream = InputStreamType.Analytics;

} else if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;

import org.assertj.core.api.Assertions;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
Expand Down Expand Up @@ -92,11 +94,13 @@ public void testConnectorFrameworkConfigurable(boolean useCrtClient) {
S3SeekableInputStreamConfiguration configuration =
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);

assertSame("S3ASeekableInputStream configuration is not set to expected value",
PrefetchMode.ALL, configuration.getLogicalIOConfiguration().getPrefetchingMode());
Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode())
.as("AnalyticsStream configuration is not set to expected value")
.isSameAs(PrefetchMode.ALL);

assertEquals("S3ASeekableInputStream configuration is not set to expected value",
1, configuration.getPhysicalIOConfiguration().getBlobStoreCapacity());
Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity())
.as("AnalyticsStream configuration is not set to expected value")
.isEqualTo(1);
}

@Test
Expand All @@ -110,7 +114,7 @@ public void testConnectorFrameworkConfigurableWithCrtClient() throws IOException
}

@Test
public void testInvalidConfigurationThrows() {
public void testInvalidConfigurationThrows() throws Exception {
describe("Verify S3 connector framework throws with invalid configuration");

Configuration conf = getConfiguration();
Expand All @@ -121,8 +125,8 @@ public void testInvalidConfigurationThrows() {

ConnectorConfiguration connectorConfiguration =
new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
assertThrows("S3ASeekableInputStream illegal configuration does not throw",
IllegalArgumentException.class, () ->
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() ->
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void testCostOfCreatingMagicFile() throws Throwable {
describe("Files created under magic paths skip existence checks and marker deletes");

skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"S3ASeekableInputStream does not support InputStreamStatistics");
"AnalyticsStream does not support InputStreamStatistics");
S3AFileSystem fs = getFileSystem();
Path destFile = methodSubPath("file.txt");
fs.delete(destFile.getParent(), true);
Expand Down Expand Up @@ -250,7 +250,7 @@ public void testCostOfSavingLoadingPendingFile() throws Throwable {
describe("Verify costs of saving .pending file under a magic path");

skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"S3ASeekableInputStream does not support InputStreamStatistics");
"AnalyticsStream does not support InputStreamStatistics");
S3AFileSystem fs = getFileSystem();
Path partDir = methodSubPath("file.pending");
Path destFile = new Path(partDir, "file.pending");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void setup() throws Exception {
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
super.setup();
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"S3ASeekableInputStream does not support InputStreamStatistics");
"AnalyticsStream does not support InputStreamStatistics");
jobId = randomJobId();
attempt0 = "attempt_" + jobId + "_m_000000_0";
taskAttempt0 = TaskAttemptID.forName(attempt0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected String getCommitterName() {
public void setup() throws Exception {
super.setup();
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"S3ASeekableInputStream does not support InputStreamStatistics");
"AnalyticsStream does not support InputStreamStatistics");
CommitUtils.verifyIsMagicCommitFS(getFileSystem());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
*/
@Test
public void testBytesReadWithStream() throws IOException {
// Assertions will fail as {@link S3ASeekableInputStream}
// Assertions will fail as {@link AnalyticsStream}
// do not support S3AFileSystemStatistics yet.
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"S3SeekableStream does not support File System Statistics");
Expand Down

0 comments on commit ed310a0

Please sign in to comment.