Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor ProxyServer to use channels instead of sockets #184

Open
wants to merge 38 commits into
base: postgresql-dialect
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
cf0c568
chore: refactor ProxyServer to use channels instead of sockets
olavloite Jun 14, 2022
16eeecc
fix: delete socket file on exit
olavloite Jun 14, 2022
716cc11
test: run integration tests against native image
olavloite Jun 15, 2022
63e6c0c
test: fix Go tests + increase heap memory for native image
olavloite Jun 15, 2022
d3cb9c8
build: build native image for pull requests
olavloite Jun 15, 2022
0502835
build: build native image on mac
olavloite Jun 15, 2022
c3c8cf1
fix: use same instance for both PGAdapter and the tests
olavloite Jun 15, 2022
d07aaa2
fix: missing backslashes at the end of lines
olavloite Jun 15, 2022
acb8988
fix: buffer size must be at least 65k on MacOS
olavloite Jun 16, 2022
40eef3b
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jun 16, 2022
8639a9e
build: also run native image unit tests
olavloite Jun 17, 2022
41a5906
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jun 17, 2022
338bf7c
build: use Java 17 for integration tests to enable native image testing
olavloite Jun 17, 2022
2c64f88
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jun 18, 2022
1d1dec8
fix: add release version + parameterize junixsockets version
olavloite Jun 18, 2022
c68e11a
Merge branch 'refactor-to-channels' of github.com:GoogleCloudPlatform…
olavloite Jun 18, 2022
17d07eb
test: add more tests
olavloite Jun 18, 2022
a0aa5d6
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jun 23, 2022
b426d22
perf: try to increase buffer size
olavloite Jun 24, 2022
4b279b7
build: add automated ycsb run
olavloite Jun 25, 2022
7d9d1a6
build: automate ycsb runs
olavloite Jun 27, 2022
1c277b3
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jun 29, 2022
45385c4
build: update ycsb script
olavloite Jun 29, 2022
c1ac96b
fix: use 0 instead of NaN
olavloite Jun 29, 2022
3f91983
fix: remove $ from set command
olavloite Jun 29, 2022
3a47142
fix: actually use batch size
olavloite Jun 29, 2022
b9dced6
feat: add additional benchmark runs
olavloite Jun 29, 2022
13d4f01
fix: remove files before appending
olavloite Jun 29, 2022
cc9ad50
feat: add run command for native image
olavloite Jun 29, 2022
f53c692
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jun 29, 2022
9bdea78
test: fix assertion as connection can be closed by error
olavloite Jun 30, 2022
e8f1b3e
Merge branch 'refactor-to-channels' of github.com:GoogleCloudPlatform…
olavloite Jun 30, 2022
591e90d
build: move ycsb scripts to separate directory
olavloite Jun 30, 2022
ff7701c
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jun 30, 2022
f2eeaa4
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jun 30, 2022
09e2da4
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jul 4, 2022
6ca83a4
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Jul 16, 2022
b389979
Merge branch 'postgresql-dialect' into refactor-to-channels
olavloite Sep 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
uses: actions/setup-java@v3
with:
distribution: zulu
java-version: 8
java-version: 17
- run: java -version
- name: Setup Go
uses: actions/setup-go@v3
Expand All @@ -100,6 +100,8 @@ jobs:
psql -h /pg -U postgres -c "CREATE DATABASE pgadapter"
- name: Run unit tests
run: mvn test -B -Ptest-all
- name: Run unit tests for native image
run: mvn test -B -Pnative-image
- name: Setup GCloud
uses: google-github-actions/setup-gcloud@v0
with:
Expand Down
65 changes: 65 additions & 0 deletions .github/workflows/native-image.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
on:
# This allows manual activation of this action for testing.
workflow_dispatch:
pull_request:
schedule:
- cron: '0 2 * * 1,2,3,4,5'
name: native-image
env:
GOOGLE_CLOUD_PROJECT: "span-cloud-testing"
GOOGLE_CLOUD_INSTANCE: "pgadapter-testing"
GOOGLE_CLOUD_DATABASE: "testdb_integration"
GOOGLE_CLOUD_ENDPOINT: "spanner.googleapis.com"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fine to to put these in public repo?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endpoint is public information. It's for example also included in the generated client for each programming language.
The project, instance and database names are also included in many other repositories.

jobs:
check-env:
outputs:
has-key: ${{ steps.project-id.outputs.defined }}
runs-on: ubuntu-latest
steps:
- id: project-id
env:
GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
if: "${{ env.GCP_PROJECT_ID != '' }}"
run: echo "::set-output name=defined::true"

build-native-image:
needs: [check-env]
if: needs.check-env.outputs.has-key == 'true'
timeout-minutes: 60
runs-on: macos-latest
steps:
- uses: actions/checkout@v2
- uses: graalvm/setup-graalvm@v1
with:
version: 'latest'
java-version: '17'
components: 'native-image'
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup GCloud
uses: google-github-actions/setup-gcloud@v0
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}
service_account_key: ${{ secrets.JSON_SERVICE_ACCOUNT_CREDENTIALS }}
export_default_credentials: true
- name: Set up swap space
if: runner.os == 'Linux'
uses: pierotofy/[email protected]
with:
swap-size-gb: 12
- name: Build and run PGAdapter native image
run: |
mvn package -Passembly -Pnative-image -DskipTests
cd "target/pgadapter"
native-image -J-Xmx14g -H:IncludeResources=".*metadata.*json$" -jar pgadapter.jar --no-fallback
./pgadapter -p ${{env.GOOGLE_CLOUD_PROJECT}} -i ${{env.GOOGLE_CLOUD_INSTANCE}} &
- name: Run integration tests
run: |
mvn verify \
-Dclirr.skip=true \
-DskipITs=false \
-DPG_ADAPTER_PROJECT="${{env.GOOGLE_CLOUD_PROJECT}}" \
-DPG_ADAPTER_INSTANCE="${{env.GOOGLE_CLOUD_INSTANCE}}" \
-DPG_ADAPTER_DATABASE="${{env.GOOGLE_CLOUD_DATABASE}}" \
-DPG_ADAPTER_ADDRESS="localhost" \
-DPG_ADAPTER_SOCKET_DIR="/tmp" \
-DPG_ADAPTER_PORT="5432"
54 changes: 54 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,51 @@
</plugins>
</build>
</profile>
<profile>
<id>native-image</id>
<dependencies>
<!--
Change the scope of the junixsocket dependencies to test, as we don't need them in the
final build, but we do need them for some tests.
-->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
<artifactId>junixsocket-core</artifactId>
<version>${junixsocket.version}</version>
<type>pom</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
<artifactId>junixsocket-common</artifactId>
<version>${junixsocket.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes combine.self="override">
<exclude>com/google/cloud/spanner/pgadapter/Java8ServerSocketFactory.java</exclude>
</excludes>
<source>17</source>
<target>17</target>
<release>17</release>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<build>
Expand Down Expand Up @@ -321,6 +366,15 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>com/google/cloud/spanner/pgadapter/Java17ServerSocketFactory.java</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.InetSocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.Channels;
import java.nio.channels.SocketChannel;
import java.security.SecureRandom;
import java.text.MessageFormat;
import java.util.HashMap;
Expand Down Expand Up @@ -83,7 +86,15 @@ public class ConnectionHandler extends Thread {
private static final String CHANNEL_PROVIDER_PROPERTY = "CHANNEL_PROVIDER";

private final ProxyServer server;
private final Socket socket;
private final ByteChannel socketChannel;

/**
* Remote address of the client that is connected. This is null for Unix domain socket
* connections.
*/
@Nullable private final InetSocketAddress remoteAddress;

private final String remoteClient;
private final Map<String, IntermediatePreparedStatement> statementsMap = new HashMap<>();
private final Map<String, IntermediatePortalStatement> portalsMap = new HashMap<>();
private static final Map<Integer, IntermediateStatement> activeStatementsMap =
Expand All @@ -103,18 +114,30 @@ public class ConnectionHandler extends Thread {
private WellKnownClient wellKnownClient;
private ExtendedQueryProtocolHandler extendedQueryProtocolHandler;

ConnectionHandler(ProxyServer server, Socket socket) {
ConnectionHandler(ProxyServer server, ByteChannel byteChannel) throws IOException {
super("ConnectionHandler-" + CONNECTION_HANDLER_ID_GENERATOR.incrementAndGet());
this.server = server;
this.socket = socket;
this.socketChannel = byteChannel;
// byteChannel is an instance of SocketChannel in case of a TCP connection.
// Unix domain socket connections use other implementations. The exact implementation depends on
// whether the connection uses the third-party AFUNIXSocketFactory or the Java 16+ Unix domain
// socket implementation.
if (byteChannel instanceof SocketChannel
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the cases when byteChannel is not of SocketChannel type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment to explain. SocketChannel is only used by TCP connections. Unix domain sockets use other implementations.

&& ((SocketChannel) byteChannel).getRemoteAddress() instanceof InetSocketAddress) {
SocketChannel socketChannel = (SocketChannel) byteChannel;
this.remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
this.remoteClient = remoteAddress.getAddress().getHostAddress();
} else {
this.remoteAddress = null;
this.remoteClient = "(local)";
}
this.secret = new SecureRandom().nextInt();
setDaemon(true);
logger.log(
Level.INFO,
() ->
String.format(
"Connection handler with ID %s created for client %s",
getName(), socket.getInetAddress().getHostAddress()));
"Connection handler with ID %s created for client %s", getName(), remoteClient));
}

@InternalApi
Expand Down Expand Up @@ -217,15 +240,16 @@ public void run() {
Level.INFO,
() ->
String.format(
"Connection handler with ID %s starting for client %s",
getName(), socket.getInetAddress().getHostAddress()));
"Connection handler with ID %s starting for client %s", getName(), remoteClient));

try (ConnectionMetadata connectionMetadata =
new ConnectionMetadata(this.socket.getInputStream(), this.socket.getOutputStream())) {
new ConnectionMetadata(
Channels.newInputStream(socketChannel), Channels.newOutputStream(socketChannel))) {
this.connectionMetadata = connectionMetadata;
if (!server.getOptions().disableLocalhostCheck()
&& !this.socket.getInetAddress().isAnyLocalAddress()
&& !this.socket.getInetAddress().isLoopbackAddress()) {
&& this.remoteAddress != null
&& !this.remoteAddress.getAddress().isAnyLocalAddress()
&& !this.remoteAddress.getAddress().isLoopbackAddress()) {
handleError(
PGException.newBuilder()
.setMessage("This proxy may only be accessed from localhost.")
Expand Down Expand Up @@ -268,15 +292,15 @@ public void run() {
() ->
String.format(
"Exception on connection handler with ID %s for client %s: %s",
getName(), socket.getInetAddress().getHostAddress(), e));
getName(), remoteClient, e));
} finally {
logger.log(
Level.INFO, () -> String.format("Closing connection handler with ID %s", getName()));
try {
if (this.spannerConnection != null) {
this.spannerConnection.close();
}
this.socket.close();
this.socketChannel.close();
} catch (SpannerException | IOException e) {
logger.log(
Level.WARNING,
Expand Down Expand Up @@ -332,9 +356,7 @@ public void handleTerminate() {
void terminate() {
handleTerminate();
try {
if (!socket.isClosed()) {
socket.close();
}
socketChannel.close();
} catch (IOException exception) {
logger.log(
Level.WARNING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.cloud.spanner.pgadapter;

import static java.net.StandardSocketOptions.SO_RCVBUF;

import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardProtocolFamily;
import java.net.UnixDomainSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.file.Files;
import java.nio.file.Path;

// If your IDE is complaining about this file not being compatible with the language level
// of the project, then that is an indication that the IDE did not pick up the exclusion of this
// file that is defined in the pom.xml.
// This is a known issue in IntelliJ: https://youtrack.jetbrains.com/issue/IDEA-87868
//
// Follow these steps to make IntelliJ ignore this compilation error:
// 1. Go to Settings.
// 2. Select Build, Execution, Deployment| Compiler | Excludes
// 3. Add this file to the list of excludes.
// See also https://www.jetbrains.com/help/idea/specifying-compilation-settings.html#5a737cfc

class Java17ServerSocketFactory implements ServerSocketFactory {

public Java17ServerSocketFactory() {}

public ServerSocketChannel createTcpServerSocketChannel(OptionsMetadata options)
throws IOException {
ServerSocketChannel channel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(options.getProxyPort());
channel.configureBlocking(true);
channel.bind(address, options.getMaxBacklog());

return channel;
}

public ServerSocketChannel creatUnixDomainSocketChannel(OptionsMetadata options, int localPort)
throws IOException {
File socketFile = new File(options.getSocketFile(localPort));
Path path = Path.of(socketFile.toURI());
if (socketFile.getParentFile() != null && !socketFile.getParentFile().exists()) {
socketFile.mkdirs();
}

// deleteOnExit() does not work when PGAdapter is built as a native image. It also does not get
// deleted if the JVM crashes. We therefore need to try to delete the file at startup to ensure
// we don't get a lot of 'address already in use' errors.
try {
Files.deleteIfExists(path);
} catch (IOException ignore) {
// Ignore and let the Unix domain socket subsystem throw an 'Address in use' error.
}

UnixDomainSocketAddress address = UnixDomainSocketAddress.of(path);
ServerSocketChannel domainSocketChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX);
domainSocketChannel.configureBlocking(true);
domainSocketChannel.setOption(SO_RCVBUF, 65536);
domainSocketChannel.bind(address, options.getMaxBacklog());

return domainSocketChannel;
}
}
Loading