Skip to content

Commit

Permalink
Create initial S3 endpoint resource/handler
Browse files Browse the repository at this point in the history
Closes #8
  • Loading branch information
Randgalt committed May 23, 2024
1 parent b76c00d commit e029ef0
Show file tree
Hide file tree
Showing 14 changed files with 566 additions and 40 deletions.
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@ Proxy for S3
To run testing server:

```shell
./mvnw package
./mvnw -pl trino-s3-proxy exec:java -Dexec.classpathScope=test -Dexec.mainClass=io.trino.s3.proxy.server.TestingTrinoS3ProxyServer
./mvnw clean package
./mvnw -pl trino-s3-proxy exec:java -Dexec.classpathScope=test -Dexec.mainClass=io.trino.s3.proxy.server.TestingTrinoS3ProxyServer <fake access key> <fake secret key> <real S3 access key> <real S3 secret key>
```

Make note of the logged `Endpoint`

To test out proxied APIs:

First, set up AWS CLI:

```shell
> aws configure
AWS Access Key ID [None]: enter the fake access key
AWS Secret Access Key [None]: enter the fake secret key
Default region name [None]: <enter>
Default output format [None]: <enter>
```

Now, make calls. E.g.

```shell
aws --endpoint-url http://<endpoint>/api/v1/s3Proxy/s3 s3 ls s3://<s3bucket>/<dir>

aws --endpoint-url http://<endpoint>/api/v1/s3Proxy/s3 s3 cp s3://<s3bucket>/<dir>/<file> .
```
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
<dep.airlift.version>245</dep.airlift.version>
<dep.aws-sdk.version>2.25.32</dep.aws-sdk.version>
<dep.airlift-units.version>1.10</dep.airlift-units.version>
<dep.jersey.version>3.1.6</dep.jersey.version>
</properties>

<dependencyManagement>
Expand All @@ -73,6 +74,12 @@
<artifactId>units</artifactId>
<version>${dep.airlift-units.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
<version>${dep.jersey.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
15 changes: 15 additions & 0 deletions trino-s3-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>event</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-client</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-server</artifactId>
Expand Down Expand Up @@ -69,11 +74,21 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,36 @@

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.trino.s3.proxy.server.credentials.SigningController;
import io.trino.s3.proxy.server.credentials.SigningControllerConfig;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient.ForProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyResource;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.http.client.HttpClientBinder.httpClientBinder;
import static io.airlift.jaxrs.JaxrsBinder.jaxrsBinder;

public class TrinoS3ProxyServerModule
implements Module
{
@Override
public void configure(Binder binder)
public final void configure(Binder binder)
{
jaxrsBinder(binder).bind(TrinoS3ProxyResource.class);

configBinder(binder).bindConfig(SigningControllerConfig.class);
binder.bind(SigningController.class).in(Scopes.SINGLETON);

// TODO config, etc.
httpClientBinder(binder).bindHttpClient("ProxyClient", ForProxyClient.class);
binder.bind(TrinoS3ProxyClient.class).in(Scopes.SINGLETON);

moduleSpecificBinding(binder);
}

protected void moduleSpecificBinding(Binder binder)
{
// default does nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
*/
package io.trino.s3.proxy.server.credentials;

import com.google.common.base.Splitter;
import com.google.inject.Inject;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.MultivaluedMap;
import jakarta.ws.rs.core.Response;

import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

Expand All @@ -36,19 +37,56 @@ public SigningController(CredentialsController credentialsController, SigningCon
maxClockDrift = signingControllerConfig.getMaxClockDrift().toJavaTime();
}

public String signRequest(
public Optional<SigningMetadata> signingMetadataFromRequest(
Function<Credentials, Credentials.Credential> credentialsSupplier,
URI requestURI,
MultivaluedMap<String, String> requestHeaders,
MultivaluedMap<String, String> queryParameters,
String httpMethod,
String encodedPath,
String region,
String accessKey)
String encodedPath)
{
String authorization = requestHeaders.getFirst("Authorization");
if (authorization == null) {
return Optional.empty();
}

List<String> authorizationParts = Splitter.on(",").trimResults().splitToList(authorization);
if (authorizationParts.isEmpty()) {
return Optional.empty();
}

String credential = authorizationParts.getFirst();
List<String> credentialParts = Splitter.on("=").splitToList(credential);
if (credentialParts.size() < 2) {
return Optional.empty();
}

String credentialValue = credentialParts.get(1);
List<String> credentialValueParts = Splitter.on("/").splitToList(credentialValue);
if (credentialValueParts.size() < 3) {
return Optional.empty();
}

String emulatedAccessKey = credentialValueParts.getFirst();
String region = credentialValueParts.get(2);

Optional<String> session = Optional.ofNullable(requestHeaders.getFirst("x-amz-security-token"));

Credentials credentials = credentialsController.credentials(accessKey, session)
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
return credentialsController.credentials(emulatedAccessKey, session)
.map(credentials -> new SigningMetadata(credentials, session, region))
.filter(metadata -> isValidAuthorization(metadata, credentialsSupplier, authorization, requestURI, requestHeaders, queryParameters, httpMethod, encodedPath));
}

public String signRequest(
SigningMetadata metadata,
Function<Credentials, Credentials.Credential> credentialsSupplier,
URI requestURI,
MultivaluedMap<String, String> requestHeaders,
MultivaluedMap<String, String> queryParameters,
String httpMethod,
String encodedPath)
{
Credentials.Credential credential = credentialsSupplier.apply(metadata.credentials());

return Signer.sign(
"s3",
Expand All @@ -57,10 +95,24 @@ public String signRequest(
queryParameters,
httpMethod,
encodedPath,
region,
accessKey,
credentials.emulated().secretKey(),
metadata.region(),
credential.accessKey(),
credential.secretKey(),
maxClockDrift,
Optional.empty());
}

private boolean isValidAuthorization(
SigningMetadata metadata,
Function<Credentials, Credentials.Credential> credentialsSupplier,
String authorizationHeader,
URI requestURI,
MultivaluedMap<String, String> requestHeaders,
MultivaluedMap<String, String> queryParameters,
String httpMethod,
String encodedPath)
{
String expectedAuthorization = signRequest(metadata, credentialsSupplier, requestURI, requestHeaders, queryParameters, httpMethod, encodedPath);
return authorizationHeader.equals(expectedAuthorization);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.s3.proxy.server.credentials;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

public record SigningMetadata(Credentials credentials, Optional<String> session, String region)
{
public SigningMetadata
{
requireNonNull(credentials, "credentials is null");
requireNonNull(session, "session is null");
requireNonNull(region, "region is null");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.s3.proxy.server.rest;

import com.google.common.io.ByteStreams;
import io.airlift.http.client.HeaderName;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.ResponseHandler;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.core.StreamingOutput;

import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.airlift.http.client.ResponseHandlerUtils.propagate;
import static java.util.Objects.requireNonNull;

class StreamingResponseHandler
implements ResponseHandler<Void, RuntimeException>
{
private final AsyncResponse asyncResponse;
private final Duration maxWaitForResponse;

StreamingResponseHandler(AsyncResponse asyncResponse, Duration maxWaitForResponse)
{
this.asyncResponse = requireNonNull(asyncResponse, "asyncResponse is null");
this.maxWaitForResponse = requireNonNull(maxWaitForResponse, "maxWaitForResponse is null");
}

@Override
public Void handleException(Request request, Exception exception)
throws RuntimeException
{
throw propagate(request, exception);
}

@Override
public Void handle(Request request, Response response)
throws RuntimeException
{
CountDownLatch latch = new CountDownLatch(1);
StreamingOutput streamingOutput = output -> {
try {
InputStream inputStream = response.getInputStream();
ByteStreams.copy(inputStream, output);
}
finally {
latch.countDown();
}
};

jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()).entity(streamingOutput);
response.getHeaders()
.keySet()
.stream()
.map(HeaderName::toString)
.forEach(name -> response.getHeaders(name).forEach(value -> responseBuilder.header(name, value)));

try {
asyncResponse.resume(responseBuilder.build());
}
catch (Exception e) {
throw new WebApplicationException(e, jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR);
}
try {
if (!latch.await(maxWaitForResponse.toMillis(), TimeUnit.MILLISECONDS)) {
throw new WebApplicationException(jakarta.ws.rs.core.Response.Status.REQUEST_TIMEOUT);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new WebApplicationException(jakarta.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE);
}

return null;
}
}
Loading

0 comments on commit e029ef0

Please sign in to comment.