diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/AwsChunkedInputStream.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/AwsChunkedInputStream.java new file mode 100644 index 00000000..c83b01c3 --- /dev/null +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/rest/AwsChunkedInputStream.java @@ -0,0 +1,255 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.s3.proxy.server.rest; + +import com.google.common.base.Splitter; +import io.trino.s3.proxy.server.signing.ChunkSigningSession; +import org.apache.commons.httpclient.util.EncodingUtil; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; +import static org.apache.commons.httpclient.HttpParser.parseHeaders; + +// based/copied on Apache Commons ChunkedInputStream +public class AwsChunkedInputStream + extends InputStream +{ + private final InputStream delegate; + private final Optional chunkSigningSession; + + private int chunkSize; + private int position; + private boolean latent = true; + private boolean eof; + private boolean closed; + + public AwsChunkedInputStream(InputStream delegate, Optional chunkSigningSession) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.chunkSigningSession = requireNonNull(chunkSigningSession, "chunkSigningSession is null"); + } + + public int read() + throws IOException + { + checkState(!closed, "Stream is closed"); + + if (eof) { + return -1; + } + if (position >= chunkSize) { + nextChunk(); + if (eof) { + return -1; + } + } + position++; + int i = delegate.read(); + if (i >= 0) { + chunkSigningSession.ifPresent(session -> session.write((byte) (i & 0xff))); + } + return i; + } + + public int read(byte[] b, int off, int len) + throws IOException + { + checkState(!closed, "Stream is closed"); + + if (eof) { + return -1; + } + if (position >= chunkSize) { + nextChunk(); + if (eof) { + return -1; + } + } + + len = Math.min(len, chunkSize - position); + int count = delegate.read(b, off, len); + position += count; + + chunkSigningSession.ifPresent(session -> session.write(b, off, count)); + + return count; + } + + private void readCRLF() + throws IOException + { + int cr = delegate.read(); + int lf = delegate.read(); + if ((cr != '\r') || (lf != '\n')) { + throw new IOException("CRLF expected at end of chunk: " + cr + "/" + lf); + } + } + + private void nextChunk() + throws IOException + { + if (!latent) { + readCRLF(); + } + + ChunkMetadata metadata = chunkMetadata(delegate); + chunkSigningSession.ifPresent(session -> { + String chunkSignature = metadata.chunkSignature().orElseThrow(() -> new UncheckedIOException(new IOException("Chunk is missing a signature: " + metadata.dataString))); + session.startChunk(chunkSignature); + }); + + chunkSize = metadata.chunkSize; + latent = false; + position = 0; + if (chunkSize == 0) { + chunkSigningSession.ifPresent(ChunkSigningSession::complete); + eof = true; + parseHeaders(delegate, "UTF-8"); + } + } + + private record ChunkMetadata(String dataString, int chunkSize, Optional chunkSignature) + { + private ChunkMetadata + { + requireNonNull(dataString, "dataString is null"); + requireNonNull(chunkSignature, "chunkSignature is null"); + } + } + + private static ChunkMetadata chunkMetadata(InputStream in) + throws IOException + { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + // States: 0=normal, 1=\r was scanned, 2=inside quoted string, -1=end + int state = 0; + while (state != -1) { + int b = in.read(); + if (b == -1) { + throw new IOException("chunked stream ended unexpectedly"); + } + switch (state) { + case 0: + switch (b) { + case '\r': + state = 1; + break; + case '\"': + state = 2; + /* fall through */ + default: + outputStream.write(b); + } + break; + + case 1: + if (b == '\n') { + state = -1; + } + else { + // this was not CRLF + throw new IOException("Protocol violation: Unexpected single newline character in chunk size"); + } + break; + + case 2: + switch (b) { + case '\\': + b = in.read(); + outputStream.write(b); + break; + case '\"': + state = 0; + /* fall through */ + default: + outputStream.write(b); + } + break; + default: + throw new RuntimeException("assertion failed"); + } + } + + String dataString = EncodingUtil.getAsciiString(outputStream.toByteArray()); + + String chunkSizeString; + Optional chunkSignature; + + int separatorIndex = dataString.indexOf(';'); + if (separatorIndex > 0) { + chunkSizeString = dataString.substring(0, separatorIndex).trim(); + + if ((separatorIndex + 1) < dataString.length()) { + String remainder = dataString.substring(separatorIndex + 1).trim(); + chunkSignature = Splitter.on(';').trimResults().withKeyValueSeparator('=').split(remainder) + .entrySet() + .stream() + .filter(entry -> entry.getKey().equalsIgnoreCase("chunk-signature")) + .map(Map.Entry::getValue) + .findFirst(); + } + else { + chunkSignature = Optional.empty(); + } + } + else { + chunkSizeString = dataString.trim(); + chunkSignature = Optional.empty(); + } + + int chunkSize; + try { + chunkSize = Integer.parseInt(chunkSizeString, 16); + } + catch (NumberFormatException e) { + throw new IOException("Bad chunk size: " + chunkSizeString); + } + + return new ChunkMetadata(dataString, chunkSize, chunkSignature); + } + + public void close() + throws IOException + { + if (!closed) { + try { + if (!eof) { + exhaustInputStream(this); + } + } + finally { + eof = true; + closed = true; + } + } + } + + @SuppressWarnings("StatementWithEmptyBody") + private static void exhaustInputStream(InputStream inStream) + throws IOException + { + // read and discard the remainder of the message + byte[] buffer = new byte[8192]; + while (inStream.read(buffer) >= 0) { + // NOP + } + } +} diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/ChunkSigner.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/ChunkSigner.java new file mode 100644 index 00000000..6e298285 --- /dev/null +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/ChunkSigner.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.s3.proxy.server.signing; + +import com.google.common.hash.HashCode; +import io.trino.s3.proxy.server.credentials.Credential; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.signer.internal.AbstractAws4Signer; +import software.amazon.awssdk.auth.signer.internal.Aws4SignerRequestParams; +import software.amazon.awssdk.auth.signer.internal.SigningAlgorithm; +import software.amazon.awssdk.auth.signer.internal.chunkedencoding.AwsS3V4ChunkSigner; +import software.amazon.awssdk.utils.BinaryUtils; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; + +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; + +import static io.trino.s3.proxy.server.signing.Signer.signingKey; + +/** + * Extracted/copied from {@link AwsS3V4ChunkSigner} and sigv4-streaming. + * We don't want to use {@link AwsS3V4ChunkSigner} directly as it works with {@code byte[]} and doesn't allow for streaming. + */ +class ChunkSigner +{ + private static final String CHUNK_STRING_TO_SIGN_PREFIX = "AWS4-HMAC-SHA256-PAYLOAD"; + + private final String dateTime; + private final String keyPath; + private final Mac hmacSha256; + + ChunkSigner(Credential credential, InternalSigningContext signingContext) + { + this.dateTime = signingContext.dateTime(); + this.keyPath = signingContext.keyPath(); + + AwsBasicCredentials awsBasicCredentials = AwsBasicCredentials.create(credential.accessKey(), credential.secretKey()); + Aws4SignerRequestParams signerRequestParams = new Aws4SignerRequestParams(signingContext.signingParams()); + byte[] signingKey = signingKey(awsBasicCredentials, signerRequestParams); + + try { + String signingAlgorithm = SigningAlgorithm.HmacSHA256.toString(); + this.hmacSha256 = Mac.getInstance(signingAlgorithm); + hmacSha256.init(new SecretKeySpec(signingKey, signingAlgorithm)); + } + catch (NoSuchAlgorithmException e) { + throw new IllegalStateException(e); + } + catch (InvalidKeyException e) { + throw new IllegalArgumentException(e); + } + } + + String signChunk(HashCode hashCode, String previousSignature) + { + String chunkStringToSign = + CHUNK_STRING_TO_SIGN_PREFIX + "\n" + + dateTime + "\n" + + keyPath + "\n" + + previousSignature + "\n" + + AbstractAws4Signer.EMPTY_STRING_SHA256_HEX + "\n" + + hashCode.toString(); + try { + byte[] bytes = hmacSha256.doFinal(chunkStringToSign.getBytes(StandardCharsets.UTF_8)); + return BinaryUtils.toHex(bytes); + } + catch (Exception e) { + throw new RuntimeException("Could not sign chunk", e); + } + } +} diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/ChunkSigningSession.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/ChunkSigningSession.java new file mode 100644 index 00000000..62145975 --- /dev/null +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/ChunkSigningSession.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.s3.proxy.server.signing; + +public interface ChunkSigningSession +{ + void startChunk(String expectedSignature); + + void complete(); + + void write(byte b); + + void write(byte[] b, int off, int len); +} diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/ChunkSigningSessionImpl.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/ChunkSigningSessionImpl.java new file mode 100644 index 00000000..5bc89159 --- /dev/null +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/ChunkSigningSessionImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.s3.proxy.server.signing; + +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import io.airlift.log.Logger; +import jakarta.ws.rs.WebApplicationException; + +import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED; +import static java.util.Objects.requireNonNull; + +@SuppressWarnings("UnstableApiUsage") +class ChunkSigningSessionImpl + implements ChunkSigningSession +{ + private static final Logger log = Logger.get(ChunkSigningSessionImpl.class); + + private final ChunkSigner chunkSigner; + private String previousSignature; + private String expectedSignature; + private Hasher hasher; + + ChunkSigningSessionImpl(ChunkSigner chunkSigner, String seed) + { + this.chunkSigner = requireNonNull(chunkSigner, "chunkSigner is null"); + previousSignature = requireNonNull(seed, "seed is null"); + } + + @Override + public void startChunk(String expectedSignature) + { + complete(); + + hasher = Hashing.sha256().newHasher(); + this.expectedSignature = requireNonNull(expectedSignature, "expectedSignature is null"); + } + + @Override + public void complete() + { + if ((hasher == null) || (expectedSignature == null)) { + return; + } + + String thisSignature = chunkSigner.signChunk(hasher.hash(), previousSignature); + if (!thisSignature.equals(expectedSignature)) { + log.debug("Chunk signature does not match expected signature. Expected: %s, Actual: %s", expectedSignature, thisSignature); + throw new WebApplicationException(UNAUTHORIZED); + } + previousSignature = expectedSignature; + + hasher = null; + expectedSignature = null; + } + + @Override + public void write(byte b) + { + hasher.putByte(b); + } + + @Override + public void write(byte[] b, int off, int len) + { + hasher.putBytes(b, off, len); + } +} diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningContext.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningContext.java new file mode 100644 index 00000000..91048242 --- /dev/null +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/InternalSigningContext.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.s3.proxy.server.signing; + +import software.amazon.awssdk.auth.signer.params.AwsS3V4SignerParams; + +import static java.util.Objects.requireNonNull; + +class InternalSigningContext + implements SigningContext +{ + private final String authorization; + private final AwsS3V4SignerParams signingParams; + private final String signature; + private final String dateTime; + private final String keyPath; + + InternalSigningContext(String authorization, AwsS3V4SignerParams signingParams, String signature, String dateTime, String keyPath) + { + this.authorization = requireNonNull(authorization, "authorization is null"); + this.signingParams = requireNonNull(signingParams, "signingParams is null"); + this.signature = requireNonNull(signature, "signature is null"); + this.dateTime = requireNonNull(dateTime, "dateTime is null"); + this.keyPath = requireNonNull(keyPath, "keyPath is null"); + } + + @Override + public String authorization() + { + return authorization; + } + + AwsS3V4SignerParams signingParams() + { + return signingParams; + } + + String signature() + { + return signature; + } + + String dateTime() + { + return dateTime; + } + + String keyPath() + { + return keyPath; + } +} diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/Signer.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/Signer.java index e60ca632..27daae2e 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/Signer.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/Signer.java @@ -13,11 +13,14 @@ */ package io.trino.s3.proxy.server.signing; +import com.google.common.base.Splitter; import io.airlift.log.Logger; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.MultivaluedMap; import jakarta.ws.rs.core.Response; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.signer.internal.Aws4SignerRequestParams; import software.amazon.awssdk.auth.signer.params.AwsS3V4SignerParams; import software.amazon.awssdk.http.SdkHttpFullRequest; import software.amazon.awssdk.http.SdkHttpMethod; @@ -31,7 +34,9 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Locale; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import static io.trino.s3.proxy.server.signing.Signers.OVERRIDE_CONTENT_HASH; import static io.trino.s3.proxy.server.signing.Signers.aws4Signer; @@ -47,6 +52,11 @@ final class Signer private Signer() {} + static byte[] signingKey(AwsCredentials credentials, Aws4SignerRequestParams signerRequestParams) + { + return aws4Signer.signingKey(credentials, signerRequestParams); + } + static SigningContext sign( String serviceName, URI requestURI, @@ -131,7 +141,25 @@ static SigningContext sign( return new WebApplicationException(Response.Status.BAD_REQUEST); }); - return () -> authorization; + return buildInternalSigningContext(authorization, signingParams, xAmzDate); + } + + private static InternalSigningContext buildInternalSigningContext(String authorization, AwsS3V4SignerParams signingParams, String xAmzDate) + { + Map parts = Splitter.on(',').trimResults().withKeyValueSeparator('=').split(authorization); + + String credential = Optional.ofNullable(parts.get("AWS4-HMAC-SHA256 Credential")).orElseThrow(() -> { + log.debug("Signer did not generate \"Credential\" in authorization"); + return new WebApplicationException(Response.Status.BAD_REQUEST); + }); + String keyPath = Splitter.on('/').splitToStream(credential).skip(1).collect(Collectors.joining("/")); + + String signature = Optional.ofNullable(parts.get("Signature")).orElseThrow(() -> { + log.debug("Signer did not generate \"Signature\" in authorization"); + return new WebApplicationException(Response.Status.BAD_REQUEST); + }); + + return new InternalSigningContext(authorization, signingParams, signature, xAmzDate, keyPath); } private static boolean isLegacy(SigningHeaders signingHeaders) diff --git a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/SigningController.java b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/SigningController.java index 934c2829..249c0b4d 100644 --- a/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/SigningController.java +++ b/trino-s3-proxy/src/main/java/io/trino/s3/proxy/server/signing/SigningController.java @@ -99,6 +99,20 @@ public SigningMetadata validateAndParseAuthorization(Request request, SigningSer }).orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); } + public ChunkSigningSession startChunkSigningSession( + SigningMetadata metadata, + Function credentialsSupplier) + { + Credential credential = credentialsSupplier.apply(metadata.credentials()); + SigningContext signingContext = metadata.signingContext().orElseThrow(() -> new IllegalArgumentException("Metadata does not contain a signing context")); + if (!(signingContext instanceof InternalSigningContext internalSigningContext)) { + throw new IllegalArgumentException("Singing context is an unexpected type: " + signingContext.getClass()); + } + + ChunkSigner chunkSigner = new ChunkSigner(credential, internalSigningContext); + return new ChunkSigningSessionImpl(chunkSigner, internalSigningContext.signature()); + } + private SigningContext internalSignRequest( SigningMetadata metadata, Function credentialsSupplier,