Skip to content

Commit

Permalink
Create initial S3 endpoint resource/handler
Browse files Browse the repository at this point in the history
Closes #8
  • Loading branch information
Randgalt committed May 21, 2024
1 parent 07d580f commit 367da6b
Show file tree
Hide file tree
Showing 15 changed files with 563 additions and 28 deletions.
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@ Proxy for S3
To run testing server:

```shell
./mvnw package
./mvnw -pl trino-s3-proxy exec:java -Dexec.classpathScope=test -Dexec.mainClass=io.trino.s3.proxy.server.TestingTrinoS3ProxyServer
./mvnw clean package
./mvnw -pl trino-s3-proxy exec:java -Dexec.classpathScope=test -Dexec.mainClass=io.trino.s3.proxy.server.TestingTrinoS3ProxyServer <fake access key> <fake secret key> <real S3 access key> <real S3 secret key>
```

Make note of the logged `Endpoint`

To test out proxied APIs:

First, set up AWS CLI:

```shell
> aws configure
AWS Access Key ID [None]: enter the fake access key
AWS Secret Access Key [None]: enter the fake secret key
Default region name [None]: <enter>
Default output format [None]: <enter>
```

Now, make calls. E.g.

```shell
aws --endpoint-url http://<endpoint>/api/v1/s3Proxy/s3 s3 ls s3://<s3bucket>/<dir>

aws --endpoint-url http://<endpoint>/api/v1/s3Proxy/s3 s3 cp s3://<s3bucket>/<dir>/<file> .
```
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

<dep.airlift.version>245</dep.airlift.version>
<dep.aws-sdk.version>2.25.32</dep.aws-sdk.version>
<dep.jersey.version>3.1.6</dep.jersey.version>
</properties>

<dependencyManagement>
Expand All @@ -66,6 +67,12 @@
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
<version>${dep.jersey.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
15 changes: 15 additions & 0 deletions trino-s3-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>event</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-client</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-server</artifactId>
Expand All @@ -59,11 +64,21 @@
<artifactId>node</artifactId>
</dependency>

<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,33 @@

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.trino.s3.proxy.server.credentials.SigningController;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyClient.ForProxyClient;
import io.trino.s3.proxy.server.rest.TrinoS3ProxyResource;

import static io.airlift.http.client.HttpClientBinder.httpClientBinder;
import static io.airlift.jaxrs.JaxrsBinder.jaxrsBinder;

public class TrinoS3ProxyServerModule
implements Module
{
@Override
public void configure(Binder binder)
public final void configure(Binder binder)
{
jaxrsBinder(binder).bind(TrinoS3ProxyResource.class);
binder.bind(SigningController.class).in(Scopes.SINGLETON);

// TODO config, etc.
httpClientBinder(binder).bindHttpClient("ProxyClient", ForProxyClient.class);
binder.bind(TrinoS3ProxyClient.class).in(Scopes.SINGLETON);

moduleSpecificBinding(binder);
}

protected void moduleSpecificBinding(Binder binder)
{
// default does nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@
public interface CredentialsController
{
Optional<Credentials> credentials(String emulatedAccessKey);

void upsertCredentials(Credentials credentials);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
*/
package io.trino.s3.proxy.server.credentials;

import com.google.common.base.Splitter;
import com.google.inject.Inject;
import jakarta.ws.rs.core.MultivaluedMap;

import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

Expand All @@ -31,17 +34,57 @@ public SigningController(CredentialsController credentialsController)
this.credentialsController = requireNonNull(credentialsController, "credentialsController is null");
}

public Optional<SigningMetadata> signingMetadataFromRequest(
Function<Credentials, Credentials.Credential> credentialsSupplier,
URI requestURI,
MultivaluedMap<String, String> requestHeaders,
MultivaluedMap<String, String> queryParameters,
String httpMethod,
String encodedPath)
{
String authorization = requestHeaders.getFirst("Authorization");
if (authorization == null) {
return Optional.empty();
}

List<String> authorizationParts = Splitter.on(",").trimResults().splitToList(authorization);
if (authorizationParts.isEmpty()) {
return Optional.empty();
}

String credential = authorizationParts.getFirst();
List<String> credentialParts = Splitter.on("=").splitToList(credential);
if (credentialParts.size() < 2) {
return Optional.empty();
}

String credentialValue = credentialParts.get(1);
List<String> credentialValueParts = Splitter.on("/").splitToList(credentialValue);
if (credentialValueParts.size() < 3) {
return Optional.empty();
}

String emulatedAccessKey = credentialValueParts.getFirst();
String region = credentialValueParts.get(2);

return credentialsController.credentials(emulatedAccessKey)
.map(credentials -> new SigningMetadata(credentials, region))
.filter(metadata -> {
String expectedAuthorization = signRequest(metadata, Credentials::emulated, requestURI, requestHeaders, queryParameters, httpMethod, encodedPath);
return authorization.equals(expectedAuthorization);
});
}

public String signRequest(
SigningMetadata metadata,
Function<Credentials, Credentials.Credential> credentialsSupplier,
URI requestURI,
MultivaluedMap<String, String> requestHeaders,
MultivaluedMap<String, String> queryParameters,
String httpMethod,
String encodedPath,
String region,
String accessKey)
String encodedPath)
{
// TODO
Credentials credentials = credentialsController.credentials(accessKey).orElseThrow();
Credentials.Credential credential = credentialsSupplier.apply(metadata.credentials());

return Signer.sign(
"s3",
Expand All @@ -50,9 +93,9 @@ public String signRequest(
queryParameters,
httpMethod,
encodedPath,
region,
accessKey,
credentials.emulated().secretKey(),
metadata.region(),
credential.accessKey(),
credential.secretKey(),
Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.s3.proxy.server.credentials;

import static java.util.Objects.requireNonNull;

public record SigningMetadata(Credentials credentials, String region)
{
public SigningMetadata
{
requireNonNull(credentials, "credentials is null");
requireNonNull(region, "region is null");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.s3.proxy.server.rest;

import com.google.common.io.ByteStreams;
import io.airlift.http.client.HeaderName;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.ResponseHandler;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.core.StreamingOutput;

import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.airlift.http.client.ResponseHandlerUtils.propagate;
import static java.util.Objects.requireNonNull;

class StreamingResponseHandler
implements ResponseHandler<Void, RuntimeException>
{
private final AsyncResponse asyncResponse;
private final Duration maxWaitForResponse;

StreamingResponseHandler(AsyncResponse asyncResponse, Duration maxWaitForResponse)
{
this.asyncResponse = requireNonNull(asyncResponse, "asyncResponse is null");
this.maxWaitForResponse = requireNonNull(maxWaitForResponse, "maxWaitForResponse is null");
}

@Override
public Void handleException(Request request, Exception exception)
throws RuntimeException
{
throw propagate(request, exception);
}

@Override
public Void handle(Request request, Response response)
throws RuntimeException
{
CountDownLatch latch = new CountDownLatch(1);
StreamingOutput streamingOutput = output -> {
try {
InputStream inputStream = response.getInputStream();
ByteStreams.copy(inputStream, output);
}
finally {
latch.countDown();
}
};

jakarta.ws.rs.core.Response.ResponseBuilder responseBuilder = jakarta.ws.rs.core.Response.status(response.getStatusCode()).entity(streamingOutput);
response.getHeaders()
.keySet()
.stream()
.map(HeaderName::toString)
.forEach(name -> response.getHeaders(name).forEach(value -> responseBuilder.header(name, value)));

try {
asyncResponse.resume(responseBuilder.build());
}
catch (Exception e) {
throw new WebApplicationException(e, jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR);
}
try {
if (!latch.await(maxWaitForResponse.toMillis(), TimeUnit.MILLISECONDS)) {
throw new WebApplicationException(jakarta.ws.rs.core.Response.Status.REQUEST_TIMEOUT);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new WebApplicationException(jakarta.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE);
}

return null;
}
}
Loading

0 comments on commit 367da6b

Please sign in to comment.