From 62f1f9cc0e1c016e7aee13ad0c66f4e05d6d8511 Mon Sep 17 00:00:00 2001 From: Daniel Sagenschneider Date: Mon, 27 Aug 2018 21:54:18 +0800 Subject: [PATCH 1/3] NetworkInputStream for reading direct from buffers --- .../sql2/communication/BEFrame.java | 62 --- .../sql2/communication/BEFrameParser.java | 128 +++--- .../sql2/communication/BEFrameReader.java | 85 ---- .../sql2/communication/NetworkConnection.java | 71 ++- .../communication/NetworkInputStream.java | 414 ++++++++++++++++++ .../communication/NetworkReadContext.java | 20 +- .../sql2/communication/TableCell.java | 14 +- .../network/AuthenticationResponse.java | 20 +- .../communication/network/BindResponse.java | 10 +- .../network/DescribeResponse.java | 16 +- .../network/ExecuteResponse.java | 18 +- .../network/NetworkConnectRequest.java | 16 +- .../communication/network/ParseResponse.java | 10 +- .../network/ReadyForQueryResponse.java | 10 +- .../packets/AuthenticationRequest.java | 37 +- .../packets/CommandComplete.java | 60 ++- .../sql2/communication/packets/DataRow.java | 54 +-- .../communication/packets/ErrorPacket.java | 49 ++- .../packets/ParameterStatus.java | 29 +- .../communication/packets/RowDescription.java | 39 +- .../packets/parsers/BinaryParser.java | 138 +++--- .../packets/parts/ColumnTypes.java | 24 +- .../sql2/communication/BEFrameReaderTest.java | 34 +- .../packets/RowDescriptionTest.java | 25 +- .../sql2/execution/NioLoopTest.java | 12 +- 25 files changed, 862 insertions(+), 533 deletions(-) delete mode 100644 src/main/java/org/postgresql/sql2/communication/BEFrame.java delete mode 100644 src/main/java/org/postgresql/sql2/communication/BEFrameReader.java create mode 100644 src/main/java/org/postgresql/sql2/communication/NetworkInputStream.java diff --git a/src/main/java/org/postgresql/sql2/communication/BEFrame.java b/src/main/java/org/postgresql/sql2/communication/BEFrame.java deleted file mode 100644 index 1e11c01..0000000 --- a/src/main/java/org/postgresql/sql2/communication/BEFrame.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.postgresql.sql2.communication; - -public class BEFrame { - public enum BackendTag { - AUTHENTICATION('R'), - CANCELLATION_KEY_DATA('K'), - BIND_COMPLETE('2'), - CLOSE_COMPLETE('3'), - COMMAND_COMPLETE('C'), - COPY_DATA('d'), - COPY_DONE('c'), - COPY_IN_RESPONSE('G'), - COPY_OUT_RESPONSE('H'), - COPY_BOTH_RESPONSE('W'), - DATA_ROW('D'), - EMPTY_QUERY_RESPONSE('I'), - ERROR_RESPONSE('E'), - FUNCTION_CALL_RESPONSE('V'), - NEGOTIATE_PROTOCOL_VERSION('v'), - NO_DATA('n'), - NOTICE_RESPONSE('N'), - NOTIFICATION_RESPONSE('A'), - PARAM_DESCRIPTION('t'), - PARAM_STATUS('S'), - PARSE_COMPLETE('1'), - PORTAL_SUSPENDED('s'), - READY_FOR_QUERY('Z'), - ROW_DESCRIPTION('T'); - - private char tag; - - BackendTag(char tag) { - this.tag = tag; - } - - public static BackendTag lookup(byte input) { - for (BackendTag bt : values()) { - if (input == bt.tag) { - return bt; - } - } - throw new IllegalArgumentException("There is no backend server tag that matches byte " + input); - } - } - - private BackendTag tag; - private byte[] payload; - - public BEFrame(byte tag, byte[] payload) { - this.tag = BackendTag.lookup(tag); - this.payload = payload; - } - - public BackendTag getTag() { - return tag; - } - - // TODO make this InputStream from PooledByteBuffer instances (avoids unnecessary copies) - public byte[] getPayload() { - return payload; - } -} diff --git a/src/main/java/org/postgresql/sql2/communication/BEFrameParser.java b/src/main/java/org/postgresql/sql2/communication/BEFrameParser.java index 36ee97b..0052940 100644 --- a/src/main/java/org/postgresql/sql2/communication/BEFrameParser.java +++ b/src/main/java/org/postgresql/sql2/communication/BEFrameParser.java @@ -1,6 +1,6 @@ package org.postgresql.sql2.communication; -import java.nio.ByteBuffer; +import java.io.IOException; import org.postgresql.sql2.util.BinaryHelper; @@ -8,6 +8,32 @@ * Reads bytes from the stream from the server and produces packages on a stack */ public class BEFrameParser { + + public static final char AUTHENTICATION = 'R'; + public static final char CANCELLATION_KEY_DATA = 'K'; + public static final char BIND_COMPLETE = '2'; + public static final char CLOSE_COMPLETE = '3'; + public static final char COMMAND_COMPLETE = 'C'; + public static final char COPY_DATA = 'd'; + public static final char COPY_DONE = 'c'; + public static final char COPY_IN_RESPONSE = 'G'; + public static final char COPY_OUT_RESPONSE = 'H'; + public static final char COPY_BOTH_RESPONSE = 'W'; + public static final char DATA_ROW = 'D'; + public static final char EMPTY_QUERY_RESPONSE = 'I'; + public static final char ERROR_RESPONSE = 'E'; + public static final char FUNCTION_CALL_RESPONSE = 'V'; + public static final char NEGOTIATE_PROTOCOL_VERSION = 'v'; + public static final char NO_DATA = 'n'; + public static final char NOTICE_RESPONSE = 'N'; + public static final char NOTIFICATION_RESPONSE = 'A'; + public static final char PARAM_DESCRIPTION = 't'; + public static final char PARAM_STATUS = 'S'; + public static final char PARSE_COMPLETE = '1'; + public static final char PORTAL_SUSPENDED = 's'; + public static final char READY_FOR_QUERY = 'Z'; + public static final char ROW_DESCRIPTION = 'T'; + private enum States { BETWEEN, READ_TAG, READ_LEN1, READ_LEN2, READ_LEN3, READ_LEN4 } @@ -20,67 +46,61 @@ private enum States { private byte len3; private byte len4; private int payloadLength; - private int payloadRead; - - // TODO wrap ByteBuffer instances in InputStream for the payload (save copy to - // unnecessary array) - private byte[] payload; - - private int consumedBytes = 0; - - public int getConsumedBytes() { - return this.consumedBytes; - } - public BEFrame parseBEFrame(ByteBuffer readBuffer, int position, int bytesRead) { - this.consumedBytes = 0; - for (int i = position; i < bytesRead; i++) { - this.consumedBytes++; - switch (state) { - case BETWEEN: - tag = readBuffer.get(i); - state = States.READ_TAG; - break; - case READ_TAG: - len1 = readBuffer.get(i); - state = States.READ_LEN1; - break; - case READ_LEN1: - len2 = readBuffer.get(i); - state = States.READ_LEN2; - break; - case READ_LEN2: - len3 = readBuffer.get(i); - state = States.READ_LEN3; - break; - case READ_LEN3: - len4 = readBuffer.get(i); + public boolean parseBEFrame(NetworkInputStream inputStream) throws IOException { - payloadLength = BinaryHelper.readInt(len1, len2, len3, len4); - payload = new byte[payloadLength - 4]; - payloadRead = 0; - if (payloadLength - 4 == 0) { // no payload sent, so we short cut this here - state = States.BETWEEN; - return new BEFrame(tag, payload); - } else { + // Read frame header (tag and length) + if (this.state != States.READ_LEN4) { + READ_HEADER: while (inputStream.available() > 0) { + switch (state) { + case BETWEEN: + tag = (byte) inputStream.read(); + state = States.READ_TAG; + break; + case READ_TAG: + len1 = (byte) inputStream.read(); + state = States.READ_LEN1; + break; + case READ_LEN1: + len2 = (byte) inputStream.read(); + state = States.READ_LEN2; + break; + case READ_LEN2: + len3 = (byte) inputStream.read(); + state = States.READ_LEN3; + break; + case READ_LEN3: + len4 = (byte) inputStream.read(); + // -4 to ignore payload length + payloadLength = BinaryHelper.readInt(len1, len2, len3, len4) - 4; state = States.READ_LEN4; + break READ_HEADER; + case READ_LEN4: + break READ_HEADER; } - break; - case READ_LEN4: - payload[payloadRead] = readBuffer.get(i); - payloadRead++; - if (payloadRead == payloadLength - 4) { - state = States.BETWEEN; + } + } - // Have data to process - return new BEFrame(tag, payload); - } - break; + // Wait until all frame bytes available + if (this.state == States.READ_LEN4) { + if (this.payloadLength <= inputStream.available()) { + + // Reset for next frame + this.state = States.BETWEEN; + return true; } } // As here, buffer underflow - return null; + return false; + } + + public char getTag() { + return (char) this.tag; + } + + public int getPayloadLength() { + return this.payloadLength; } -} +} \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/BEFrameReader.java b/src/main/java/org/postgresql/sql2/communication/BEFrameReader.java deleted file mode 100644 index d39e683..0000000 --- a/src/main/java/org/postgresql/sql2/communication/BEFrameReader.java +++ /dev/null @@ -1,85 +0,0 @@ -package org.postgresql.sql2.communication; - -import org.postgresql.sql2.util.BinaryHelper; - -import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * Reads bytes from the stream from the server and produces packages on a stack - */ -public class BEFrameReader { - private enum States { - BETWEEN, - READ_TAG, - READ_LEN1, - READ_LEN2, - READ_LEN3, - READ_LEN4 - } - - private States state = States.BETWEEN; - - private byte tag; - private byte len1; - private byte len2; - private byte len3; - private byte len4; - private int payloadLength; - private int payloadRead; - private byte[] payload; - - private Queue frames = new ConcurrentLinkedQueue<>(); - - public void updateState(ByteBuffer readBuffer, int bytesRead) { - readBuffer.flip(); - - for(int i = 0; i < bytesRead; i++) { - switch (state){ - case BETWEEN: - tag = readBuffer.get(); - state = States.READ_TAG; - break; - case READ_TAG: - len1 = readBuffer.get(); - state = States.READ_LEN1; - break; - case READ_LEN1: - len2 = readBuffer.get(); - state = States.READ_LEN2; - break; - case READ_LEN2: - len3 = readBuffer.get(); - state = States.READ_LEN3; - break; - case READ_LEN3: - len4 = readBuffer.get(); - - payloadLength = BinaryHelper.readInt(len1, len2, len3, len4); - payload = new byte[payloadLength - 4]; - payloadRead = 0; - - if(payloadLength - 4 == 0) { // no payload sent, so we short cut this here - frames.add(new BEFrame(tag, payload)); - state = States.BETWEEN; - } else { - state = States.READ_LEN4; - } - break; - case READ_LEN4: - payload[payloadRead] = readBuffer.get(); - payloadRead++; - if(payloadRead == payloadLength - 4) { - frames.add(new BEFrame(tag, payload)); - state = States.BETWEEN; - } - break; - } - } - } - - public BEFrame popFrame() { - return frames.poll(); - } -} diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java b/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java index 21a4b16..d8e6f6f 100644 --- a/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java +++ b/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java @@ -33,6 +33,10 @@ public class NetworkConnection implements NioService, NetworkConnectContext, Net private final NioLoop loop; + private final NetworkInputStream inputStream = new NetworkInputStream(); + + private final ByteBufferPool bufferPool; + private final ByteBufferPoolOutputStream outputStream; private final Queue priorityRequestQueue = new LinkedList<>(); @@ -79,6 +83,7 @@ public NetworkConnection(Map properties, PGConnectio this.properties = properties; this.connection = connection; this.loop = loop; + this.bufferPool = bufferPool; this.outputStream = new ByteBufferPoolOutputStream(bufferPool); } @@ -271,9 +276,9 @@ private void handleWrite(Queue requests) throws Exception { } /** - * {@link BEFrame} for {@link NetworkReadContext}. + * Current {@link PooledByteBuffer} for reading data. */ - private BEFrame beFrame = null; + private PooledByteBuffer currentReadBuffer = null; /** * Allows {@link NetworkReadContext} to specify if write required. @@ -304,8 +309,11 @@ private NetworkResponse getAwaitingResponse() { @Override public void handleRead() throws IOException { - // TODO use pooled byte buffers - ByteBuffer readBuffer = ByteBuffer.allocate(1024); + // Ensure have read buffer + if (this.currentReadBuffer == null) { + this.currentReadBuffer = this.bufferPool.getPooledByteBuffer(); + this.currentReadBuffer.getByteBuffer().clear(); + } // Reset for reads int bytesRead = -1; @@ -313,16 +321,28 @@ public void handleRead() throws IOException { try { // Consume data on the socket - while ((bytesRead = this.socketChannel.read(readBuffer)) > 0) { + int position = this.currentReadBuffer.getByteBuffer().position(); + while ((bytesRead = this.socketChannel.read(this.currentReadBuffer.getByteBuffer().slice())) > 0) { + + // Update position (as slice required for direct buffers) + this.currentReadBuffer.getByteBuffer().position(this.currentReadBuffer.getByteBuffer().position() + bytesRead); - // Setup for consuming parts - readBuffer.flip(); - int position = 0; + // Determine if filled the read buffer + boolean isFilled = this.currentReadBuffer.getByteBuffer().remaining() == 0; + + // Add the buffer to input stream + this.inputStream.appendBuffer(this.currentReadBuffer, position, bytesRead, isFilled); + + // Obtain new current buffer if filled + if (isFilled) { + this.currentReadBuffer = this.bufferPool.getPooledByteBuffer(); + } // Service the BE frames - BEFrame frame; - while ((frame = this.parser.parseBEFrame(readBuffer, position, bytesRead)) != null) { - position += this.parser.getConsumedBytes(); + while (this.parser.parseBEFrame(this.inputStream)) { + + // Specify frame size + this.inputStream.setBytesToEndOfStream(this.parser.getPayloadLength()); // Obtain the awaiting response NetworkResponse awaitingResponse = this.getAwaitingResponse(); @@ -330,20 +350,19 @@ public void handleRead() throws IOException { // Ensure have awaiting response if (awaitingResponse == null) { throw new IllegalStateException( - "No awaiting " + NetworkResponse.class.getSimpleName() + " for tag " + frame.getTag()); + "No awaiting " + NetworkResponse.class.getSimpleName() + " for tag " + this.parser.getTag()); } // Handle frame - switch (frame.getTag()) { + switch (this.parser.getTag()) { - case ERROR_RESPONSE: + case BEFrameParser.ERROR_RESPONSE: // Handle error - this.immediateResponse = awaitingResponse.handleException(new ErrorPacket(frame.getPayload())); + this.immediateResponse = awaitingResponse.handleException(new ErrorPacket(this)); break; default: // Provide frame to awaiting response - this.beFrame = frame; this.immediateResponse = awaitingResponse.read(this); } @@ -354,10 +373,10 @@ public void handleRead() throws IOException { // Flag to write (as very likely have writes) this.isWriteRequired = true; } - } - // Clear buffer for re-use - readBuffer.clear(); + // Clear frame size to parse next frame + this.inputStream.clearFrame(); + } } } catch (NotYetConnectedException | ClosedChannelException ignore) { ignore.printStackTrace(); @@ -416,8 +435,18 @@ public Map getProperties() { */ @Override - public BEFrame getBEFrame() { - return this.beFrame; + public char getFrameTag() { + return this.parser.getTag(); + } + + @Override + public int getPayloadLength() { + return this.parser.getPayloadLength(); + } + + @Override + public NetworkInputStream getPayload() { + return this.inputStream; } @Override diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkInputStream.java b/src/main/java/org/postgresql/sql2/communication/NetworkInputStream.java new file mode 100644 index 0000000..5db78c8 --- /dev/null +++ b/src/main/java/org/postgresql/sql2/communication/NetworkInputStream.java @@ -0,0 +1,414 @@ +package org.postgresql.sql2.communication; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; + +import org.postgresql.sql2.buffer.PooledByteBuffer; +import org.postgresql.sql2.communication.network.NetworkConnectRequest; + +/** + * Network {@link InputStream}. + * + * @author Daniel Sagenschneider + */ +public class NetworkInputStream extends InputStream { + + /** + * Maximum length of bytes (and by this the maximum number of characters). + */ + private static int maxStringLength = 4096; + + /** + * Specifies the maximum byte length of any string. + * + * @param length Maximum byte length of any string. + */ + public static void setStringLength(int length) { + maxStringLength = length; + } + + /** + * {@link ThreadLocal} {@link CharBuffer} to re-use to reduce memory creation + * (and garbage collection). + */ + private static final ThreadLocal threadLocalState = new ThreadLocal() { + @Override + protected ThreadLocalState initialValue() { + return new ThreadLocalState(maxStringLength); + } + }; + + /** + * {@link ThreadLocal} state. + */ + private static class ThreadLocalState { + + /** + * Re-useable {@link CharBuffer} for reduced memory. + */ + private final CharBuffer charBuffer; + + /** + * String decoder. + */ + private final CharsetDecoder stringDecoder = Charset.forName(NetworkConnectRequest.CHARSET).newDecoder(); + + /** + * Instantiate. + * + * @param charBufferLength Length of the {@link CharBuffer}. + */ + private ThreadLocalState(int charBufferLength) { + this.charBuffer = CharBuffer.allocate(charBufferLength); + } + } + + /** + * Head {@link StreamSegment}. + */ + private StreamSegment head = null; + + /** + * Tail {@link StreamSegment}. + */ + private StreamSegment tail = null; + + /** + * Position within the head {@link StreamSegment}. + */ + private int headPosition; + + /** + * Number of bytes currently buffered. + */ + private int bufferedByteCount = 0; + + /** + * Position within the frame. + */ + private int framePosition = 0; + + /** + * Frame payload size. + */ + private int framePayloadSize = -1; + + /** + * Appends the {@link PooledByteBuffer}. + * + * @param buffer {@link PooledByteBuffer}. + */ + public void appendBuffer(PooledByteBuffer buffer, int offset, int length, boolean isRelease) { + StreamSegment segment = new StreamSegment(buffer, offset, length, isRelease); + this.bufferedByteCount += length; + if (this.head == null) { + this.head = segment; + this.tail = segment; + } else { + this.tail.next = segment; + this.tail = segment; + } + } + + /** + * Specifies the bytes to end of stream. + * + * @param byteCount Byte count to end of stream. + */ + public void setBytesToEndOfStream(int byteCount) { + this.framePayloadSize = byteCount; + this.framePosition = 0; + } + + /** + * Clear frame. + */ + public void clearFrame() throws IOException { + + // Consume remaining bytes of frame + for (int i = this.framePosition; i < this.framePayloadSize; i++) { + this.read(); // consume byte + } + + // No frame active + this.framePayloadSize = -1; + } + + /** + * Obtains the head {@link StreamSegment} remaining length. + * + * @return Head {@link StreamSegment} remaining length. + */ + private int getHeadRemainingLength() { + return this.head.length - this.headPosition; + } + + /* + * ================== InputStream ====================== + */ + + @Override + public int read() throws IOException { + for (;;) { + + // Determine if end of frame + if ((this.framePayloadSize != -1) && (this.framePosition >= this.framePayloadSize)) { + return -1; // end of frame + } + + // Determine if end of stream + if (this.head == null) { + return -1; + } + + // Obtain the next byte + if (this.headPosition < this.head.length) { + int index = this.head.offset + (this.headPosition++); + this.framePosition++; + return this.head.buffer.getByteBuffer().get(index); + } + + // Completed the segment + PooledByteBuffer releaseBuffer = null; + if (this.head.isRelease) { + releaseBuffer = this.head.buffer; + } + + // Move to next segment for reading + this.bufferedByteCount -= this.head.length; + this.head = this.head.next; + this.headPosition = 0; + + // Release buffer after move (so next not corrupted) + if (releaseBuffer != null) { + releaseBuffer.release(); + } + } + } + + @Override + public int available() throws IOException { + if (this.framePayloadSize == -1) { + // Return all available + return Math.max(0, (this.bufferedByteCount - this.headPosition)); + } else { + // Return remaining for frame + return (this.framePayloadSize - this.framePosition); + } + } + + /** + * Reads in an {@link Integer} value. + * + * @return {@link Integer} value or -1 if end of stream. + * @throws IOException If fails to read {@link Integer} value. + */ + public int readInteger() throws IOException { + + // Ensure only reading within frame + if (this.framePayloadSize == -1) { + throw new IOException("Attempting to read integer outside frame"); + } + + // Determine if can read directly + int segmentRemaining = this.getHeadRemainingLength(); + if (segmentRemaining >= 4) { + int value = this.head.buffer.getByteBuffer().getInt(this.head.offset + this.headPosition); + this.headPosition += 4; + this.framePosition += 4; + return value; + } + + // Read in the integer + int value = 0; + for (int i = 0; i < 4; i++) { + + // Read the next byte + int nextByte = this.read(); + if (nextByte == -1) { + return -1; // end of stream + } + + // Obtain the value + value <<= 8; + value += nextByte; + } + + // Return the value + return value; + } + + /** + * Reads in an {@link Short} value. + * + * @return {@link Short} value or -1 if end of stream. + * @throws IOException If fails to read {@link Short} value. + */ + public short readShort() throws IOException { + + // Ensure only reading within frame + if (this.framePayloadSize == -1) { + throw new IOException("Attempting to read short outside frame"); + } + + // Determine if can read directly + int segmentRemaining = this.getHeadRemainingLength(); + if (segmentRemaining >= 2) { + short value = this.head.buffer.getByteBuffer().getShort(this.head.offset + this.headPosition); + this.headPosition += 2; + this.framePosition += 2; + return value; + } + + // Read in the short + short value = 0; + for (int i = 0; i < 2; i++) { + + // Read the next byte + int nextByte = this.read(); + if (nextByte == -1) { + return -1; // end of stream + } + + // Obtain the value + value <<= 8; + value += nextByte; + } + + // Return the value + return value; + } + + /** + * Reads in a {@link String} value. + * + * @return {@link String} value or null if end of stream. + * @throws IOException If fails to read {@link String} value. + */ + public String readString() throws IOException { + + // Ensure only reading within frame + if (this.framePayloadSize == -1) { + throw new IOException("Attempting to read string outside frame"); + } + + // Obtain the char buffer (ready for use) + ThreadLocalState state = threadLocalState.get(); + state.charBuffer.clear(); + + // Decode the content into the char buffer + boolean isComplete = false; + while (!isComplete) { + + // Determine number of bytes to read + int stringLength = this.getHeadRemainingLength(); + int frameRemaining = this.framePayloadSize - this.framePosition; + isComplete = (frameRemaining <= stringLength); + boolean isSegmentConsumed = true; + + // Scan through segment for terminating null + FOUND_TERMINATOR: for (int i = this.headPosition; i < (this.head.length); i++) { + if (this.head.buffer.getByteBuffer().get(this.head.offset + i) == 0) { + // Terminating string + stringLength = i - this.headPosition; + isComplete = true; + isSegmentConsumed = false; + break FOUND_TERMINATOR; + } + } + + // Slice up buffer to content + ByteBuffer input = this.head.buffer.getByteBuffer().duplicate(); + input.position(this.head.offset + this.headPosition); + input.limit(input.position() + stringLength); + + // Decode the content into the char buffer + CoderResult result = state.stringDecoder.decode(input, state.charBuffer, isComplete); + if (result.isError()) { + throw new IOException("Failed to read string: " + result); + } + + // Increment positions + stringLength++; // include terminating null byte + this.headPosition += stringLength; + this.framePosition += stringLength; + + // Move to next segment (if not yet complete) + if (isSegmentConsumed) { + + // Completed the segment + PooledByteBuffer releaseBuffer = null; + if (this.head.isRelease) { + releaseBuffer = this.head.buffer; + } + + // Move to next segment for reading + this.bufferedByteCount -= this.head.length; + this.head = this.head.next; + this.headPosition = 0; + + // Release buffer after move (so next not corrupted) + if (releaseBuffer != null) { + releaseBuffer.release(); + } + } + } + + // Flip to get content just decoded + state.charBuffer.flip(); + + // Return the string value + return state.charBuffer.toString(); + } + + /** + * Segment of the {@link ByteBuffer} for the sequence. + */ + private class StreamSegment { + + /** + * {@link PooledByteBuffer} for this segment. + */ + private final PooledByteBuffer buffer; + + /** + * Offset into the {@link ByteBuffer} for this segment. + */ + private final int offset; + + /** + * Length of data from the {@link ByteBuffer} for this segment. + */ + private final int length; + + /** + * Indicates if to release {@link ByteBuffer} once complete. + */ + private final boolean isRelease; + + /** + * Next {@link StreamSegment}. + */ + private StreamSegment next = null; + + /** + * Instantiate. + * + * @param buffer {@link PooledByteBuffer} for this segment. + * @param offset Offset into the {@link ByteBuffer} for this segment. + * @param length Length of data from the {@link ByteBuffer} for this segment. + */ + private StreamSegment(PooledByteBuffer buffer, int offset, int length, boolean isRelease) { + this.buffer = buffer; + this.offset = offset; + this.length = length; + this.isRelease = isRelease; + } + } + +} \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java b/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java index 9ccb492..bcc3cfe 100644 --- a/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java +++ b/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java @@ -10,11 +10,25 @@ public interface NetworkReadContext extends NetworkContext { /** - * Obtains the {@link BEFrame} just read. + * Obtains the frame tag. * - * @return {@link BEFrame} just read. + * @return Frame tag. */ - BEFrame getBEFrame(); + char getFrameTag(); + + /** + * Obtains the payload length. + * + * @return Payload length. + */ + int getPayloadLength(); + + /** + * Obtains the payload. + * + * @return Payload. + */ + NetworkInputStream getPayload(); /** * Obtains the {@link PreparedStatementCache}. diff --git a/src/main/java/org/postgresql/sql2/communication/TableCell.java b/src/main/java/org/postgresql/sql2/communication/TableCell.java index a3b759d..b62f5a8 100644 --- a/src/main/java/org/postgresql/sql2/communication/TableCell.java +++ b/src/main/java/org/postgresql/sql2/communication/TableCell.java @@ -4,14 +4,10 @@ public class TableCell { private byte[] bytes; - private int start; - private int stop; private ColumnDescription columnDescription; - public TableCell(byte[] bytes, int start, int stop, ColumnDescription columnDescription) { + public TableCell(byte[] bytes, ColumnDescription columnDescription) { this.bytes = bytes; - this.start = start; - this.stop = stop; this.columnDescription = columnDescription; } @@ -19,14 +15,6 @@ public byte[] getBytes() { return bytes; } - public int getStart() { - return start; - } - - public int getStop() { - return stop; - } - public ColumnDescription getColumnDescription() { return columnDescription; } diff --git a/src/main/java/org/postgresql/sql2/communication/network/AuthenticationResponse.java b/src/main/java/org/postgresql/sql2/communication/network/AuthenticationResponse.java index 4a269bb..389bf69 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/AuthenticationResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/AuthenticationResponse.java @@ -4,7 +4,7 @@ import java.util.function.Consumer; import org.postgresql.sql2.PGConnectionProperties; -import org.postgresql.sql2.communication.BEFrame; +import org.postgresql.sql2.communication.BEFrameParser; import org.postgresql.sql2.communication.NetworkReadContext; import org.postgresql.sql2.communication.NetworkResponse; import org.postgresql.sql2.communication.packets.AuthenticationRequest; @@ -27,11 +27,10 @@ public AuthenticationResponse(ConnectSubmission connectSubmission) { @Override public NetworkResponse read(NetworkReadContext context) throws IOException { // Expecting authentication challenge - BEFrame frame = context.getBEFrame(); - switch (frame.getTag()) { + switch (context.getFrameTag()) { - case AUTHENTICATION: - AuthenticationRequest authentication = new AuthenticationRequest(frame.getPayload()); + case BEFrameParser.AUTHENTICATION: + AuthenticationRequest authentication = new AuthenticationRequest(context); switch (authentication.getType()) { case SUCCESS: @@ -43,21 +42,22 @@ public NetworkResponse read(NetworkReadContext context) throws IOException { throw new IllegalStateException("Unhandled authentication " + authentication.getType()); } - case PARAM_STATUS: + case BEFrameParser.PARAM_STATUS: // Load parameters for connection - ParameterStatus paramStatus = new ParameterStatus(frame.getPayload()); + ParameterStatus paramStatus = new ParameterStatus(context); context.setProperty(PGConnectionProperties.lookup(paramStatus.getName()), paramStatus.getValue()); return this; - case CANCELLATION_KEY_DATA: + case BEFrameParser.CANCELLATION_KEY_DATA: // TODO handle cancellation key return this; - case READY_FOR_QUERY: + case BEFrameParser.READY_FOR_QUERY: return null; default: - throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName()); + throw new IllegalStateException( + "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName()); } } diff --git a/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java b/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java index 821f6f9..b60f42c 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java @@ -2,7 +2,7 @@ import java.io.IOException; -import org.postgresql.sql2.communication.BEFrame; +import org.postgresql.sql2.communication.BEFrameParser; import org.postgresql.sql2.communication.NetworkReadContext; import org.postgresql.sql2.communication.NetworkResponse; @@ -19,14 +19,14 @@ public BindResponse(Portal portal) { @Override public NetworkResponse read(NetworkReadContext context) throws IOException { - BEFrame frame = context.getBEFrame(); - switch (frame.getTag()) { + switch (context.getFrameTag()) { - case BIND_COMPLETE: + case BEFrameParser.BIND_COMPLETE: return null; // Nothing further default: - throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName()); + throw new IllegalStateException( + "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName()); } } diff --git a/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java b/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java index 69c125d..bdfd07e 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java @@ -2,7 +2,7 @@ import java.io.IOException; -import org.postgresql.sql2.communication.BEFrame; +import org.postgresql.sql2.communication.BEFrameParser; import org.postgresql.sql2.communication.NetworkReadContext; import org.postgresql.sql2.communication.NetworkResponse; import org.postgresql.sql2.communication.packets.RowDescription; @@ -20,22 +20,22 @@ public DescribeResponse(Portal portal) { @Override public NetworkResponse read(NetworkReadContext context) throws IOException { - BEFrame frame = context.getBEFrame(); - switch (frame.getTag()) { + switch (context.getFrameTag()) { - case NO_DATA: + case BEFrameParser.NO_DATA: return null; - case PARAM_DESCRIPTION: + case BEFrameParser.PARAM_DESCRIPTION: return this; // wait on row description - case ROW_DESCRIPTION: - RowDescription rowDescription = new RowDescription(frame.getPayload()); + case BEFrameParser.ROW_DESCRIPTION: + RowDescription rowDescription = new RowDescription(context.getPayload()); this.portal.getQuery().setRowDescription(rowDescription); return null; // nothing further default: - throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName()); + throw new IllegalStateException( + "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName()); } } diff --git a/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java b/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java index 86d0edd..54a805c 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java @@ -2,7 +2,7 @@ import java.io.IOException; -import org.postgresql.sql2.communication.BEFrame; +import org.postgresql.sql2.communication.BEFrameParser; import org.postgresql.sql2.communication.NetworkReadContext; import org.postgresql.sql2.communication.NetworkResponse; import org.postgresql.sql2.communication.packets.CommandComplete; @@ -21,25 +21,25 @@ public ExecuteResponse(Portal portal) { @Override public NetworkResponse read(NetworkReadContext context) throws IOException { - BEFrame frame = context.getBEFrame(); - switch (frame.getTag()) { + switch (context.getFrameTag()) { - case DATA_ROW: - DataRow dataRow = new DataRow(frame.getPayload(), this.portal.getQuery().getRowDescription().getDescriptions(), + case BEFrameParser.DATA_ROW: + DataRow dataRow = new DataRow(context, this.portal.getQuery().getRowDescription().getDescriptions(), this.portal.nextRowNumber()); this.portal.addDataRow(dataRow); return this; - case COMMAND_COMPLETE: - CommandComplete complete = new CommandComplete(frame.getPayload()); + case BEFrameParser.COMMAND_COMPLETE: + CommandComplete complete = new CommandComplete(context); this.portal.commandComplete(complete, context.getSocketChannel()); return this; - case READY_FOR_QUERY: + case BEFrameParser.READY_FOR_QUERY: return null; default: - throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName()); + throw new IllegalStateException( + "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName()); } } diff --git a/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java b/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java index b946c64..e551d7c 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java +++ b/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java @@ -5,7 +5,7 @@ import java.util.Map; import org.postgresql.sql2.PGConnectionProperties; -import org.postgresql.sql2.communication.BEFrame; +import org.postgresql.sql2.communication.BEFrameParser; import org.postgresql.sql2.communication.NetworkConnect; import org.postgresql.sql2.communication.NetworkConnectContext; import org.postgresql.sql2.communication.NetworkOutputStream; @@ -26,6 +26,8 @@ */ public class NetworkConnectRequest implements NetworkConnect, NetworkRequest, NetworkResponse { + public static final String CHARSET = "UTF8"; + /** * {@link ConnectSubmission}. */ @@ -81,7 +83,7 @@ public NetworkRequest write(NetworkWriteContext context) throws IOException { wire.write("application_name"); wire.write("java_sql2_client"); wire.write("client_encoding"); - wire.write("UTF8"); + wire.write(CHARSET); wire.writeTerminator(); wire.completePacket(); @@ -103,11 +105,10 @@ public NetworkResponse getRequiredResponse() { public NetworkResponse read(NetworkReadContext context) throws IOException { // Expecting authentication challenge - BEFrame frame = context.getBEFrame(); - switch (frame.getTag()) { + switch (context.getFrameTag()) { - case AUTHENTICATION: - AuthenticationRequest authentication = new AuthenticationRequest(frame.getPayload()); + case BEFrameParser.AUTHENTICATION: + AuthenticationRequest authentication = new AuthenticationRequest(context); switch (authentication.getType()) { case MD5: @@ -125,7 +126,8 @@ public NetworkResponse read(NetworkReadContext context) throws IOException { } default: - throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName()); + throw new IllegalStateException( + "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName()); } } diff --git a/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java b/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java index d8314d3..9bf4411 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java @@ -2,7 +2,7 @@ import java.io.IOException; -import org.postgresql.sql2.communication.BEFrame; +import org.postgresql.sql2.communication.BEFrameParser; import org.postgresql.sql2.communication.NetworkReadContext; import org.postgresql.sql2.communication.NetworkResponse; @@ -19,15 +19,15 @@ public ParseResponse(Portal portal) { @Override public NetworkResponse read(NetworkReadContext context) throws IOException { - BEFrame frame = context.getBEFrame(); - switch (frame.getTag()) { + switch (context.getFrameTag()) { - case PARSE_COMPLETE: + case BEFrameParser.PARSE_COMPLETE: this.portal.getQuery().flagParsed(); return null; // nothing further default: - throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName()); + throw new IllegalStateException( + "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName()); } } diff --git a/src/main/java/org/postgresql/sql2/communication/network/ReadyForQueryResponse.java b/src/main/java/org/postgresql/sql2/communication/network/ReadyForQueryResponse.java index 6d9eb1c..b58df8f 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/ReadyForQueryResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/ReadyForQueryResponse.java @@ -2,7 +2,7 @@ import java.io.IOException; -import org.postgresql.sql2.communication.BEFrame; +import org.postgresql.sql2.communication.BEFrameParser; import org.postgresql.sql2.communication.NetworkReadContext; import org.postgresql.sql2.communication.NetworkResponse; @@ -20,14 +20,14 @@ public NetworkResponse handleException(Throwable ex) { @Override public NetworkResponse read(NetworkReadContext context) throws IOException { - BEFrame frame = context.getBEFrame(); - switch (frame.getTag()) { + switch (context.getFrameTag()) { - case READY_FOR_QUERY: + case BEFrameParser.READY_FOR_QUERY: return null; // Nothing further default: - throw new IllegalStateException("Invalid tag '" + frame.getTag() + "' for " + this.getClass().getSimpleName()); + throw new IllegalStateException( + "Invalid tag '" + context.getFrameTag() + "' for " + this.getClass().getSimpleName()); } } diff --git a/src/main/java/org/postgresql/sql2/communication/packets/AuthenticationRequest.java b/src/main/java/org/postgresql/sql2/communication/packets/AuthenticationRequest.java index cf28063..2300bae 100644 --- a/src/main/java/org/postgresql/sql2/communication/packets/AuthenticationRequest.java +++ b/src/main/java/org/postgresql/sql2/communication/packets/AuthenticationRequest.java @@ -1,20 +1,14 @@ package org.postgresql.sql2.communication.packets; -import org.postgresql.sql2.util.BinaryHelper; +import java.io.IOException; + +import org.postgresql.sql2.communication.NetworkInputStream; +import org.postgresql.sql2.communication.NetworkReadContext; public class AuthenticationRequest { public enum Types { - SUCCESS(0), - KERBEROS_V5(2), - CLEAR_TEXT(3), - MD5(5), - SCM_CREDENTIAL(6), - GSS(7), - GSS_CONTINUE(8), - SSPI(9), - SASL(10), - SASL_CONTINUE(11), - SASL_FINAL(12); + SUCCESS(0), KERBEROS_V5(2), CLEAR_TEXT(3), MD5(5), SCM_CREDENTIAL(6), GSS(7), GSS_CONTINUE(8), SSPI(9), SASL(10), + SASL_CONTINUE(11), SASL_FINAL(12); private int value; @@ -23,8 +17,8 @@ public enum Types { } public static Types lookup(int input) { - for(Types t : values()) { - if(t.value == input) + for (Types t : values()) { + if (t.value == input) return t; } @@ -35,13 +29,14 @@ public static Types lookup(int input) { private Types type; private byte[] salt = new byte[4]; - public AuthenticationRequest(byte[] bytes) { - type = Types.lookup(BinaryHelper.readInt(bytes[0], bytes[1], bytes[2], bytes[3])); - if(type == Types.MD5) { - salt[0] = bytes[4]; - salt[1] = bytes[5]; - salt[2] = bytes[6]; - salt[3] = bytes[7]; + public AuthenticationRequest(NetworkReadContext context) throws IOException { + NetworkInputStream input = context.getPayload(); + type = Types.lookup(input.readInteger()); + if (type == Types.MD5) { + salt[0] = (byte) input.read(); + salt[1] = (byte) input.read(); + salt[2] = (byte) input.read(); + salt[3] = (byte) input.read(); } } diff --git a/src/main/java/org/postgresql/sql2/communication/packets/CommandComplete.java b/src/main/java/org/postgresql/sql2/communication/packets/CommandComplete.java index 651b914..3c1b9d8 100644 --- a/src/main/java/org/postgresql/sql2/communication/packets/CommandComplete.java +++ b/src/main/java/org/postgresql/sql2/communication/packets/CommandComplete.java @@ -1,64 +1,56 @@ package org.postgresql.sql2.communication.packets; -import java.nio.charset.StandardCharsets; +import java.io.IOException; + +import org.postgresql.sql2.communication.NetworkReadContext; public class CommandComplete { public enum Types { - INSERT, - DELETE, - CREATE_TABLE, - CREATE_TYPE, - START_TRANSACTION, - ROLLBACK, - COMMIT, - UPDATE, - SELECT, - MOVE, - FETCH, - COPY + INSERT, DELETE, CREATE_TABLE, CREATE_TYPE, START_TRANSACTION, ROLLBACK, COMMIT, UPDATE, SELECT, MOVE, FETCH, COPY } + private int numberOfRowsAffected; private Types type; - public CommandComplete(byte[] payload) { - String message = new String(payload, StandardCharsets.UTF_8); + public CommandComplete(NetworkReadContext context) throws IOException { + String message = context.getPayload().readString(); - if(message.startsWith("INSERT")) { + if (message.startsWith("INSERT")) { type = Types.INSERT; - numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1)); - } else if(message.startsWith("DELETE")) { + numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length())); + } else if (message.startsWith("DELETE")) { type = Types.DELETE; - numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1)); - } else if(message.startsWith("CREATE TABLE")) { + numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length())); + } else if (message.startsWith("CREATE TABLE")) { type = Types.CREATE_TABLE; numberOfRowsAffected = 0; - } else if(message.startsWith("CREATE TYPE")) { + } else if (message.startsWith("CREATE TYPE")) { type = Types.CREATE_TYPE; numberOfRowsAffected = 0; - } else if(message.startsWith("START TRANSACTION")) { + } else if (message.startsWith("START TRANSACTION")) { type = Types.START_TRANSACTION; numberOfRowsAffected = 0; - } else if(message.startsWith("ROLLBACK")) { + } else if (message.startsWith("ROLLBACK")) { type = Types.ROLLBACK; numberOfRowsAffected = 0; - } else if(message.startsWith("COMMIT")) { + } else if (message.startsWith("COMMIT")) { type = Types.COMMIT; numberOfRowsAffected = 0; - } else if(message.startsWith("UPDATE")) { + } else if (message.startsWith("UPDATE")) { type = Types.UPDATE; - numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1)); - } else if(message.startsWith("SELECT")) { + numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length())); + } else if (message.startsWith("SELECT")) { type = Types.SELECT; - numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1)); - } else if(message.startsWith("MOVE")) { + numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length())); + } else if (message.startsWith("MOVE")) { type = Types.MOVE; - numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1)); - } else if(message.startsWith("FETCH")) { + numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length())); + } else if (message.startsWith("FETCH")) { type = Types.FETCH; - numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1)); - } else if(message.startsWith("COPY")) { + numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length())); + } else if (message.startsWith("COPY")) { type = Types.COPY; - numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length() - 1)); + numberOfRowsAffected = Integer.parseInt(message.substring(message.lastIndexOf(" ") + 1, message.length())); } } diff --git a/src/main/java/org/postgresql/sql2/communication/packets/DataRow.java b/src/main/java/org/postgresql/sql2/communication/packets/DataRow.java index 364210f..7dc578a 100644 --- a/src/main/java/org/postgresql/sql2/communication/packets/DataRow.java +++ b/src/main/java/org/postgresql/sql2/communication/packets/DataRow.java @@ -2,10 +2,14 @@ import jdk.incubator.sql2.Result; import jdk.incubator.sql2.SqlType; + +import org.postgresql.sql2.communication.NetworkInputStream; +import org.postgresql.sql2.communication.NetworkReadContext; import org.postgresql.sql2.communication.TableCell; import org.postgresql.sql2.communication.packets.parts.ColumnDescription; import org.postgresql.sql2.util.BinaryHelper; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -16,21 +20,19 @@ public class DataRow implements Result.RowColumn, Result.OutColumn { private long rowNumber; private int currentPos = 0; - public DataRow(byte[] bytes, ColumnDescription[] description, long rowNumber) { + public DataRow(NetworkReadContext context, ColumnDescription[] description, long rowNumber) throws IOException { this.rowNumber = rowNumber; + NetworkInputStream input = context.getPayload(); - short numOfColumns = BinaryHelper.readShort(bytes[0], bytes[1]); - int pos = 2; - int columnPos = 1; + short numOfColumns = input.readShort(); columns = new HashMap<>(numOfColumns); columnNames = new HashMap<>(numOfColumns); - for(int i = 0; i < numOfColumns; i++) { - int length = BinaryHelper.readInt(bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]); - pos += 4; - columnNames.put(description[i].getName().toLowerCase(), columnPos); - columns.put(columnPos, new TableCell(bytes, pos, pos + length, description[i])); - pos += length; - columnPos++; + for (int i = 0; i < numOfColumns; i++) { + int length = input.readInteger(); + columnNames.put(description[i].getName().toLowerCase(), i); + byte[] cellBytes = new byte[length]; + input.read(cellBytes); + columns.put(i, new TableCell(cellBytes, description[i])); } } @@ -48,20 +50,21 @@ public void cancel() { public T get(Class type) { TableCell tc = columns.get(currentPos); - if(tc == null) { + if (tc == null) { throw new IllegalArgumentException("no column with position " + currentPos); } - if(tc.getStart() > tc.getStop()) { // handle the null special case + // handle the null special case + if (tc.getBytes().length == 0) { return null; } - switch (tc.getColumnDescription().getFormatCode()){ - case TEXT: - String data = new String(BinaryHelper.subBytes(tc.getBytes(), tc.getStart(), tc.getStop()), StandardCharsets.UTF_8); - return (T)tc.getColumnDescription().getColumnType().getTextParser().apply(data); - case BINARY: - return (T)tc.getColumnDescription().getColumnType().getBinaryParser().apply(tc.getBytes(), tc.getStart(), tc.getStop()); + switch (tc.getColumnDescription().getFormatCode()) { + case TEXT: + String data = new String(tc.getBytes(), StandardCharsets.UTF_8); + return (T) tc.getColumnDescription().getColumnType().getTextParser().apply(data); + case BINARY: + return (T) tc.getColumnDescription().getColumnType().getBinaryParser().apply(tc.getBytes()); } return null; @@ -84,7 +87,7 @@ public int absoluteIndex() { @Override public SqlType sqlType() { - if(!columns.containsKey(currentPos)) { + if (!columns.containsKey(currentPos)) { throw new IllegalArgumentException("no column with id " + currentPos); } @@ -93,7 +96,7 @@ public SqlType sqlType() { @Override public Class javaType() { - if(!columns.containsKey(currentPos)) { + if (!columns.containsKey(currentPos)) { throw new IllegalArgumentException("no column with id " + currentPos); } @@ -103,7 +106,7 @@ public Class javaType() { @Override public long length() { TableCell tc = columns.get(currentPos); - return tc.getStop() - tc.getStart(); + return tc.getBytes().length; } @Override @@ -115,7 +118,7 @@ public int numberOfValuesRemaining() { public Column at(String id) { Integer newPos = columnNames.get(id.toLowerCase()); - if(newPos == null) { + if (newPos == null) { throw new IllegalArgumentException("no column with id " + id); } @@ -125,7 +128,7 @@ public Column at(String id) { @Override public Column at(int index) { - if(!columns.containsKey(index)) { + if (!columns.containsKey(index)) { throw new IllegalArgumentException("no column with index " + index); } @@ -155,4 +158,5 @@ public Column clone() { return row; } -} + +} \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/packets/ErrorPacket.java b/src/main/java/org/postgresql/sql2/communication/packets/ErrorPacket.java index e998f45..5e4e932 100644 --- a/src/main/java/org/postgresql/sql2/communication/packets/ErrorPacket.java +++ b/src/main/java/org/postgresql/sql2/communication/packets/ErrorPacket.java @@ -1,46 +1,47 @@ package org.postgresql.sql2.communication.packets; -import org.postgresql.sql2.communication.packets.parts.ErrorResponseField; -import org.postgresql.sql2.util.BinaryHelper; - +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.postgresql.sql2.communication.NetworkInputStream; +import org.postgresql.sql2.communication.NetworkReadContext; +import org.postgresql.sql2.communication.packets.parts.ErrorResponseField; + public class ErrorPacket extends Exception { - - private static List parseFields(byte[] payload) { - List fields = new ArrayList<>(); - List nullPositions = new ArrayList<>(); - for(int i = 0; i < payload.length; i++) { - if(payload[i] == 0) { - nullPositions.add(i); - } - } + private static List parseFields(NetworkReadContext context) throws IOException { + + List fields = new ArrayList<>(); - for(int i = 0; i < nullPositions.size() - 2; i++) { - fields.add(new ErrorResponseField(ErrorResponseField.Types.lookup(payload[nullPositions.get(i) + 1]), - new String(BinaryHelper.subBytes(payload, nullPositions.get(i) + 2, nullPositions.get(i + 1))))); + // Parse out the fields + NetworkInputStream input = context.getPayload(); + int errorType; + while ((errorType = input.read()) != -1) { + String message = input.readString(); + fields.add(new ErrorResponseField(ErrorResponseField.Types.lookup(errorType), message)); } + + // Return the fields return fields; } - + private static String getField(ErrorResponseField.Types type, List fields) { - for(ErrorResponseField field : fields) { - if(type == field.getType()) { + for (ErrorResponseField field : fields) { + if (type == field.getType()) { return field.getMessage(); } } return null; } - + private List fields; - public ErrorPacket(byte[] payload) { - this(parseFields(payload)); + public ErrorPacket(NetworkReadContext context) throws IOException { + this(parseFields(context)); } - + private ErrorPacket(List fields) { super(getField(ErrorResponseField.Types.MESSAGE, fields)); this.fields = fields; @@ -51,8 +52,8 @@ public List getFields() { } public String getField(ErrorResponseField.Types type) { - for(ErrorResponseField field : fields) { - if(type == field.getType()) { + for (ErrorResponseField field : fields) { + if (type == field.getType()) { return field.getMessage(); } } diff --git a/src/main/java/org/postgresql/sql2/communication/packets/ParameterStatus.java b/src/main/java/org/postgresql/sql2/communication/packets/ParameterStatus.java index f1ee9e2..34325bb 100644 --- a/src/main/java/org/postgresql/sql2/communication/packets/ParameterStatus.java +++ b/src/main/java/org/postgresql/sql2/communication/packets/ParameterStatus.java @@ -1,31 +1,18 @@ package org.postgresql.sql2.communication.packets; -import org.postgresql.sql2.util.BinaryHelper; +import java.io.IOException; + +import org.postgresql.sql2.communication.NetworkInputStream; +import org.postgresql.sql2.communication.NetworkReadContext; public class ParameterStatus { private String name; private String value; - public ParameterStatus(byte[] payload) { - int firstNullPos = 0; - int secondNullPos = 0; - - for(int i = 0; i < payload.length; i++) { - if(payload[i] == 0) { - firstNullPos = i; - break; - } - } - - for(int i = firstNullPos; i < payload.length; i++) { - if(payload[i] == 0) { - secondNullPos = i; - break; - } - } - - name = new String(BinaryHelper.subBytes(payload, 0, firstNullPos)); - value = new String(BinaryHelper.subBytes(payload, firstNullPos, secondNullPos)); + public ParameterStatus(NetworkReadContext context) throws IOException { + NetworkInputStream input = context.getPayload(); + name = input.readString(); + value = input.readString(); } public String getName() { diff --git a/src/main/java/org/postgresql/sql2/communication/packets/RowDescription.java b/src/main/java/org/postgresql/sql2/communication/packets/RowDescription.java index f2e6bb4..22daa88 100644 --- a/src/main/java/org/postgresql/sql2/communication/packets/RowDescription.java +++ b/src/main/java/org/postgresql/sql2/communication/packets/RowDescription.java @@ -1,35 +1,28 @@ package org.postgresql.sql2.communication.packets; -import org.postgresql.sql2.communication.packets.parts.ColumnDescription; -import org.postgresql.sql2.util.BinaryHelper; +import java.io.IOException; -import java.nio.charset.StandardCharsets; +import org.postgresql.sql2.communication.NetworkInputStream; +import org.postgresql.sql2.communication.NetworkReadContext; +import org.postgresql.sql2.communication.packets.parts.ColumnDescription; public class RowDescription { private ColumnDescription[] descriptions; - public RowDescription(byte[] bytes) { - short numOfColumns = BinaryHelper.readShort(bytes[0], bytes[1]); - int pos = 2; + public RowDescription(NetworkInputStream input) throws IOException { + short numOfColumns = input.readShort(); descriptions = new ColumnDescription[numOfColumns]; - for(int i = 0; i < numOfColumns; i++) { - int nameEnd = BinaryHelper.nextNullBytePos(bytes, pos); - String name = new String(BinaryHelper.subBytes(bytes, pos, nameEnd), StandardCharsets.UTF_8); - pos = nameEnd + 1; - int objectIdOfTable = BinaryHelper.readInt(bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]); - pos += 4; - short attributeNumberOfColumn = BinaryHelper.readShort(bytes[pos], bytes[pos + 1]); - pos += 2; - int fieldOId = BinaryHelper.readInt(bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]); - pos += 4; - short dataTypeSize = BinaryHelper.readShort(bytes[pos], bytes[pos + 1]); - pos += 2; - int typeModifier = BinaryHelper.readInt(bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]); - pos += 4; - short formatCode = BinaryHelper.readShort(bytes[pos], bytes[pos + 1]); - pos += 2; + for (int i = 0; i < numOfColumns; i++) { + String name = input.readString(); + int objectIdOfTable = input.readInteger(); + short attributeNumberOfColumn = input.readShort(); + int fieldOId = input.readInteger(); + short dataTypeSize = input.readShort(); + int typeModifier = input.readInteger(); + short formatCode = input.readShort(); - descriptions[i] = new ColumnDescription(name, objectIdOfTable, attributeNumberOfColumn, fieldOId, dataTypeSize, typeModifier, formatCode); + descriptions[i] = new ColumnDescription(name, objectIdOfTable, attributeNumberOfColumn, fieldOId, dataTypeSize, + typeModifier, formatCode); } } diff --git a/src/main/java/org/postgresql/sql2/communication/packets/parsers/BinaryParser.java b/src/main/java/org/postgresql/sql2/communication/packets/parsers/BinaryParser.java index dbf2ebb..7d61e7b 100644 --- a/src/main/java/org/postgresql/sql2/communication/packets/parsers/BinaryParser.java +++ b/src/main/java/org/postgresql/sql2/communication/packets/parsers/BinaryParser.java @@ -3,275 +3,275 @@ import org.postgresql.sql2.util.BinaryHelper; public class BinaryParser { - public static Object boolsend(byte[] bytes, Integer start, Integer end) { + public static Object boolsend(byte[] bytes) { return null; } - public static Object byteasend(byte[] bytes, Integer start, Integer end) { + public static Object byteasend(byte[] bytes) { return null; } - public static Object charsend(byte[] bytes, Integer start, Integer end) { + public static Object charsend(byte[] bytes) { return null; } - public static Object namesend(byte[] bytes, Integer start, Integer end) { + public static Object namesend(byte[] bytes) { return null; } - public static Object int8send(byte[] bytes, Integer start, Integer end) { + public static Object int8send(byte[] bytes) { return null; } - public static Object int2send(byte[] bytes, Integer start, Integer end) { + public static Object int2send(byte[] bytes) { return null; } - public static Object int2vectorsend(byte[] bytes, Integer start, Integer end) { + public static Object int2vectorsend(byte[] bytes) { return null; } - public static Object int4send(byte[] bytes, Integer start, Integer end) { - return BinaryHelper.readInt(bytes[start], bytes[start + 1], bytes[start + 2], bytes[start + 3]); + public static Object int4send(byte[] bytes) { + return BinaryHelper.readInt(bytes[0], bytes[1], bytes[2], bytes[3]); } - public static Object regprocsend(byte[] bytes, Integer start, Integer end) { + public static Object regprocsend(byte[] bytes) { return null; } - public static Object oidsend(byte[] bytes, Integer start, Integer end) { + public static Object oidsend(byte[] bytes) { return null; } - public static Object tidsend(byte[] bytes, Integer start, Integer end) { + public static Object tidsend(byte[] bytes) { return null; } - public static Object xidsend(byte[] bytes, Integer start, Integer end) { + public static Object xidsend(byte[] bytes) { return null; } - public static Object cidsend(byte[] bytes, Integer start, Integer end) { + public static Object cidsend(byte[] bytes) { return null; } - public static Object oidvectorsend(byte[] bytes, Integer start, Integer end) { + public static Object oidvectorsend(byte[] bytes) { return null; } - public static Object pg_ddl_command_send(byte[] bytes, Integer start, Integer end) { + public static Object pg_ddl_command_send(byte[] bytes) { return null; } - public static Object json_send(byte[] bytes, Integer start, Integer end) { + public static Object json_send(byte[] bytes) { return null; } - public static Object xml_send(byte[] bytes, Integer start, Integer end) { + public static Object xml_send(byte[] bytes) { return null; } - public static Object pg_node_tree_send(byte[] bytes, Integer start, Integer end) { + public static Object pg_node_tree_send(byte[] bytes) { return null; } - public static Object point_send(byte[] bytes, Integer start, Integer end) { + public static Object point_send(byte[] bytes) { return null; } - public static Object lseg_send(byte[] bytes, Integer start, Integer end) { + public static Object lseg_send(byte[] bytes) { return null; } - public static Object path_send(byte[] bytes, Integer start, Integer end) { + public static Object path_send(byte[] bytes) { return null; } - public static Object box_send(byte[] bytes, Integer start, Integer end) { + public static Object box_send(byte[] bytes) { return null; } - public static Object poly_send(byte[] bytes, Integer start, Integer end) { + public static Object poly_send(byte[] bytes) { return null; } - public static Object line_send(byte[] bytes, Integer start, Integer end) { + public static Object line_send(byte[] bytes) { return null; } - public static Object cidr_send(byte[] bytes, Integer start, Integer end) { + public static Object cidr_send(byte[] bytes) { return null; } - public static Object float4send(byte[] bytes, Integer start, Integer end) { + public static Object float4send(byte[] bytes) { return null; } - public static Object float8send(byte[] bytes, Integer start, Integer end) { + public static Object float8send(byte[] bytes) { return null; } - public static Object abstimesend(byte[] bytes, Integer start, Integer end) { + public static Object abstimesend(byte[] bytes) { return null; } - public static Object reltimesend(byte[] bytes, Integer start, Integer end) { + public static Object reltimesend(byte[] bytes) { return null; } - public static Object tintervalsend(byte[] bytes, Integer start, Integer end) { + public static Object tintervalsend(byte[] bytes) { return null; } - public static Object unknownsend(byte[] bytes, Integer start, Integer end) { + public static Object unknownsend(byte[] bytes) { return null; } - public static Object circle_send(byte[] bytes, Integer start, Integer end) { + public static Object circle_send(byte[] bytes) { return null; } - public static Object cash_send(byte[] bytes, Integer start, Integer end) { + public static Object cash_send(byte[] bytes) { return null; } - public static Object macaddr_send(byte[] bytes, Integer start, Integer end) { + public static Object macaddr_send(byte[] bytes) { return null; } - public static Object inet_send(byte[] bytes, Integer start, Integer end) { + public static Object inet_send(byte[] bytes) { return null; } - public static Object bpcharsend(byte[] bytes, Integer start, Integer end) { + public static Object bpcharsend(byte[] bytes) { return null; } - public static Object varcharsend(byte[] bytes, Integer start, Integer end) { + public static Object varcharsend(byte[] bytes) { return null; } - public static Object date_send(byte[] bytes, Integer start, Integer end) { + public static Object date_send(byte[] bytes) { return null; } - public static Object time_send(byte[] bytes, Integer start, Integer end) { + public static Object time_send(byte[] bytes) { return null; } - public static Object timestamp_send(byte[] bytes, Integer start, Integer end) { + public static Object timestamp_send(byte[] bytes) { return null; } - public static Object timestamptz_send(byte[] bytes, Integer start, Integer end) { + public static Object timestamptz_send(byte[] bytes) { return null; } - public static Object interval_send(byte[] bytes, Integer start, Integer end) { + public static Object interval_send(byte[] bytes) { return null; } - public static Object timetz_send(byte[] bytes, Integer start, Integer end) { + public static Object timetz_send(byte[] bytes) { return null; } - public static Object bit_send(byte[] bytes, Integer start, Integer end) { + public static Object bit_send(byte[] bytes) { return null; } - public static Object varbit_send(byte[] bytes, Integer start, Integer end) { + public static Object varbit_send(byte[] bytes) { return null; } - public static Object numeric_send(byte[] bytes, Integer start, Integer end) { + public static Object numeric_send(byte[] bytes) { return null; } - public static Object textsend(byte[] bytes, Integer start, Integer end) { + public static Object textsend(byte[] bytes) { return null; } - public static Object regproceduresend(byte[] bytes, Integer start, Integer end) { + public static Object regproceduresend(byte[] bytes) { return null; } - public static Object regopersend(byte[] bytes, Integer start, Integer end) { + public static Object regopersend(byte[] bytes) { return null; } - public static Object regoperatorsend(byte[] bytes, Integer start, Integer end) { + public static Object regoperatorsend(byte[] bytes) { return null; } - public static Object regclasssend(byte[] bytes, Integer start, Integer end) { + public static Object regclasssend(byte[] bytes) { return null; } - public static Object regtypesend(byte[] bytes, Integer start, Integer end) { + public static Object regtypesend(byte[] bytes) { return null; } - public static Object cstring_send(byte[] bytes, Integer start, Integer end) { + public static Object cstring_send(byte[] bytes) { return null; } - public static Object anyarray_send(byte[] bytes, Integer start, Integer end) { + public static Object anyarray_send(byte[] bytes) { return null; } - public static Object void_send(byte[] bytes, Integer start, Integer end) { + public static Object void_send(byte[] bytes) { return null; } - public static Object uuid_send(byte[] bytes, Integer start, Integer end) { + public static Object uuid_send(byte[] bytes) { return null; } - public static Object txid_snapshot_send(byte[] bytes, Integer start, Integer end) { + public static Object txid_snapshot_send(byte[] bytes) { return null; } - public static Object pg_lsn_send(byte[] bytes, Integer start, Integer end) { + public static Object pg_lsn_send(byte[] bytes) { return null; } - public static Object tsvectorsend(byte[] bytes, Integer start, Integer end) { + public static Object tsvectorsend(byte[] bytes) { return null; } - public static Object tsquerysend(byte[] bytes, Integer start, Integer end) { + public static Object tsquerysend(byte[] bytes) { return null; } - public static Object regconfigsend(byte[] bytes, Integer start, Integer end) { + public static Object regconfigsend(byte[] bytes) { return null; } - public static Object regdictionarysend(byte[] bytes, Integer start, Integer end) { + public static Object regdictionarysend(byte[] bytes) { return null; } - public static Object jsonb_send(byte[] bytes, Integer start, Integer end) { + public static Object jsonb_send(byte[] bytes) { return null; } - public static Object range_send(byte[] bytes, Integer start, Integer end) { + public static Object range_send(byte[] bytes) { return null; } - public static Object regnamespacesend(byte[] bytes, Integer start, Integer end) { + public static Object regnamespacesend(byte[] bytes) { return null; } - public static Object regrolesend(byte[] bytes, Integer start, Integer end) { + public static Object regrolesend(byte[] bytes) { return null; } - public static Object array_send(byte[] bytes, Integer start, Integer end) { + public static Object array_send(byte[] bytes) { return null; } - public static Object record_send(byte[] bytes, Integer start, Integer end) { + public static Object record_send(byte[] bytes) { return null; } } diff --git a/src/main/java/org/postgresql/sql2/communication/packets/parts/ColumnTypes.java b/src/main/java/org/postgresql/sql2/communication/packets/parts/ColumnTypes.java index 18fc6de..1b3d07a 100644 --- a/src/main/java/org/postgresql/sql2/communication/packets/parts/ColumnTypes.java +++ b/src/main/java/org/postgresql/sql2/communication/packets/parts/ColumnTypes.java @@ -1,9 +1,5 @@ package org.postgresql.sql2.communication.packets.parts; -import org.postgresql.sql2.communication.packets.parsers.BinaryParser; -import org.postgresql.sql2.communication.packets.parsers.TextParser; -import org.postgresql.sql2.util.TriFunction; - import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; @@ -12,6 +8,9 @@ import java.time.OffsetTime; import java.util.function.Function; +import org.postgresql.sql2.communication.packets.parsers.BinaryParser; +import org.postgresql.sql2.communication.packets.parsers.TextParser; + public enum ColumnTypes { BOOL(16, TextParser::boolout, BinaryParser::boolsend, Boolean.class, PGAdbaType.BOOLEAN), BYTEA(17, TextParser::byteaout, BinaryParser::byteasend, byte[].class, PGAdbaType.BINARY), @@ -99,7 +98,8 @@ public enum ColumnTypes { _TIMESTAMP(1115, TextParser::array_out, BinaryParser::array_send, LocalDateTime[].class, PGAdbaType.ARRAY), _DATE(1182, TextParser::array_out, BinaryParser::array_send, LocalDate[].class, PGAdbaType.ARRAY), _TIME(1183, TextParser::array_out, BinaryParser::array_send, LocalTime.class, PGAdbaType.ARRAY), - TIMESTAMPTZ(1184, TextParser::timestamptz_out, BinaryParser::timestamptz_send, OffsetDateTime.class, PGAdbaType.TIMESTAMP_WITH_TIME_ZONE), + TIMESTAMPTZ(1184, TextParser::timestamptz_out, BinaryParser::timestamptz_send, OffsetDateTime.class, + PGAdbaType.TIMESTAMP_WITH_TIME_ZONE), _TIMESTAMPTZ(1185, TextParser::array_out, BinaryParser::array_send, OffsetDateTime[].class, PGAdbaType.ARRAY), INTERVAL(1186, TextParser::interval_out, BinaryParser::interval_send, null, null), _INTERVAL(1187, TextParser::array_out, BinaryParser::array_send, null, null), @@ -132,8 +132,7 @@ public enum ColumnTypes { VOID(2278, TextParser::void_out, BinaryParser::void_send, null, null), TRIGGER(2279, TextParser::trigger_out, null, null, null), LANGUAGE_HANDLER(2280, TextParser::language_handler_out, null, null, null), - INTERNAL(2281, TextParser::internal_out, null, null, null), - OPAQUE(2282, TextParser::opaque_out, null, null, null), + INTERNAL(2281, TextParser::internal_out, null, null, null), OPAQUE(2282, TextParser::opaque_out, null, null, null), ANYELEMENT(2283, TextParser::anyelement_out, null, null, null), _RECORD(2287, TextParser::array_out, BinaryParser::array_send, null, null), ANYNONARRAY(2776, TextParser::anynonarray_out, null, null, null), @@ -182,11 +181,12 @@ public enum ColumnTypes { private final int oid; private final Function textParser; - private final TriFunction binaryParser; + private final Function binaryParser; private final Class c; private final PGAdbaType type; - ColumnTypes(int oid, Function textParser, TriFunction binaryParser, Class c, PGAdbaType type) { + ColumnTypes(int oid, Function textParser, Function binaryParser, Class c, + PGAdbaType type) { this.oid = oid; this.textParser = textParser; this.binaryParser = binaryParser; @@ -195,8 +195,8 @@ public enum ColumnTypes { } public static ColumnTypes lookup(int oid) { - for(ColumnTypes ct : values()) { - if(ct.oid == oid) + for (ColumnTypes ct : values()) { + if (ct.oid == oid) return ct; } @@ -207,7 +207,7 @@ public Function getTextParser() { return textParser; } - public TriFunction getBinaryParser() { + public Function getBinaryParser() { return binaryParser; } diff --git a/src/test/java/org/postgresql/sql2/communication/BEFrameReaderTest.java b/src/test/java/org/postgresql/sql2/communication/BEFrameReaderTest.java index f576e83..ca57b61 100644 --- a/src/test/java/org/postgresql/sql2/communication/BEFrameReaderTest.java +++ b/src/test/java/org/postgresql/sql2/communication/BEFrameReaderTest.java @@ -1,13 +1,15 @@ package org.postgresql.sql2.communication; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.postgresql.sql2.buffer.PooledByteBuffer; public class BEFrameReaderTest { @@ -33,15 +35,27 @@ public static Collection data() { @ParameterizedTest @MethodSource("data") - public void parseNetworkPayload(String packetName, byte[] packet) { - BEFrameReader instance = new BEFrameReader(); + public void parseNetworkPayload(String packetName, byte[] packet) throws IOException { + BEFrameParser instance = new BEFrameParser(); + // Load the data ByteBuffer bb = ByteBuffer.allocate(1024); bb.put(packet); - instance.updateState(bb, packet.length); - - BEFrame sp = instance.popFrame(); - - assertNotNull(sp, packetName + " could not be parsed"); + NetworkInputStream input = new NetworkInputStream(); + input.appendBuffer(new PooledByteBuffer() { + @Override + public ByteBuffer getByteBuffer() { + return bb; + } + + @Override + public void release() { + } + + }, 0, packet.length, false); + + boolean isFrameAvailable = instance.parseBEFrame(input); + + assertTrue(packetName + " could not be parsed", isFrameAvailable); } } \ No newline at end of file diff --git a/src/test/java/org/postgresql/sql2/communication/packets/RowDescriptionTest.java b/src/test/java/org/postgresql/sql2/communication/packets/RowDescriptionTest.java index ca3d8eb..2df2628 100644 --- a/src/test/java/org/postgresql/sql2/communication/packets/RowDescriptionTest.java +++ b/src/test/java/org/postgresql/sql2/communication/packets/RowDescriptionTest.java @@ -1,19 +1,40 @@ package org.postgresql.sql2.communication.packets; import org.junit.jupiter.api.Test; +import org.postgresql.sql2.buffer.PooledByteBuffer; +import org.postgresql.sql2.communication.NetworkInputStream; import org.postgresql.sql2.communication.packets.parts.ColumnTypes; import org.postgresql.sql2.communication.packets.parts.FormatCodeTypes; import static org.junit.jupiter.api.Assertions.assertEquals; +import java.io.IOException; +import java.nio.ByteBuffer; + public class RowDescriptionTest { @Test - public void describeSelect1() { + public void describeSelect1() throws IOException { byte[] bytes = new byte[] {0x00, 0x01, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x04, (byte)0xff, (byte)0xff, (byte)0xff, (byte)0xff, 0x00, 0x00}; - RowDescription instance = new RowDescription(bytes); + // Load the data + ByteBuffer bb = ByteBuffer.allocate(1024); + bb.put(bytes); + NetworkInputStream input = new NetworkInputStream(); + input.appendBuffer(new PooledByteBuffer() { + @Override + public ByteBuffer getByteBuffer() { + return bb; + } + + @Override + public void release() { + } + + }, 0, bytes.length, false); + + RowDescription instance = new RowDescription(input); assertEquals(1, instance.getDescriptions().length); assertEquals("t", instance.getDescriptions()[0].getName()); diff --git a/src/test/java/org/postgresql/sql2/execution/NioLoopTest.java b/src/test/java/org/postgresql/sql2/execution/NioLoopTest.java index d2c0319..32736b4 100644 --- a/src/test/java/org/postgresql/sql2/execution/NioLoopTest.java +++ b/src/test/java/org/postgresql/sql2/execution/NioLoopTest.java @@ -29,6 +29,8 @@ public class NioLoopTest { public static PostgreSQLContainer postgres = DatabaseHolder.getCached(); + private static final int TIMEOUT = 10000; // + private static Builder createDataSource() { return DataSourceFactory .newFactory("org.postgresql.sql2.PGDataSourceFactory").builder().url("jdbc:postgresql://" @@ -42,7 +44,7 @@ public void ensureDefaultNioLoop() throws Exception { Connection connection = dataSource.getConnection(); Submission submission = connection.rowOperation("SELECT 1 as t") .collect(CollectorUtils.singleCollector(Integer.class)).submit(); - Integer result = submission.getCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + Integer result = submission.getCompletionStage().toCompletableFuture().get(TIMEOUT, TimeUnit.SECONDS); assertEquals("Incorrect result", Integer.valueOf(1), result); } } @@ -56,7 +58,7 @@ public void provideNioLoop() throws Exception { // Undertake single request Submission submission = connection.rowOperation("SELECT 1 as t") .collect(CollectorUtils.singleCollector(Integer.class)).submit(); - Integer result = submission.getCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + Integer result = submission.getCompletionStage().toCompletableFuture().get(TIMEOUT, TimeUnit.SECONDS); assertEquals("Incorrect result", Integer.valueOf(1), result); // Ensure provided NioLoop used @@ -79,7 +81,7 @@ public void pipelineQueries() throws Exception { // Ensure obtain all results for (int i = 0; i < QUERY_COUNT; i++) { - Integer result = submissions[i].getCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + Integer result = submissions[i].getCompletionStage().toCompletableFuture().get(TIMEOUT, TimeUnit.SECONDS); assertEquals("Incorrect result", Integer.valueOf(1), result); } } @@ -101,7 +103,7 @@ public void reuseNioLoopBetweenConnections() throws Exception { // Ensure obtain all results for (int i = 0; i < CONNECTION_COUNT; i++) { - Integer result = submissions[i].getCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + Integer result = submissions[i].getCompletionStage().toCompletableFuture().get(TIMEOUT, TimeUnit.SECONDS); assertEquals("Incorrect result", Integer.valueOf(1), result); } } @@ -126,7 +128,7 @@ public void reuseNioLoopBetweenDataSources() throws Exception { // Ensure obtain all results for (int i = 0; i < dataSources.length; i++) { - Integer result = submissions[i].getCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + Integer result = submissions[i].getCompletionStage().toCompletableFuture().get(TIMEOUT, TimeUnit.SECONDS); assertEquals("Incorrect result", Integer.valueOf(1), result); } } From 4fac651f555a4c868b2a02164114c05aa35d92c2 Mon Sep 17 00:00:00 2001 From: Daniel Sagenschneider Date: Mon, 27 Aug 2018 22:02:46 +0800 Subject: [PATCH 2/3] Providing deploy script for temporary use --- .gitignore | 1 + deploy-pom.xml | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 deploy-pom.xml diff --git a/.gitignore b/.gitignore index 5e1fe69..5a07279 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ out/ /.project /.settings/ /bin/ +/target/ diff --git a/deploy-pom.xml b/deploy-pom.xml new file mode 100644 index 0000000..a85fe7d --- /dev/null +++ b/deploy-pom.xml @@ -0,0 +1,58 @@ + + + 4.0.0 + org.postgresql + pgsql2 + 0.0.1 + PostgreSql ADBA + Deploys the PostgreSql pgsql2 to OfficeFloor sourceforge for use + + + + + UTF-8 + 1.0-SNAPSHOT + + + + repo.officefloor.sf.net + Maven repository on SourceForge - http://www.officefloor.net/maven2 + scp://sagenschneider@shell.sourceforge.net:/home/frs/project/officefloor/maven/maven2 + + + + + + org.apache.maven.wagon + wagon-ssh + 3.1.0 + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.8.2 + + + Upload pgsql2 + validate + + deploy-file + + + ${settings.localRepository}/org/postgresql/pgsql2/${pgsql2.version}/pgsql2-${pgsql2.version}.jar + repo.officefloor.sf.net + scp://sagenschneider@shell.sourceforge.net:/home/frs/project/officefloor/maven/maven2 + org.postgresql + pgsql2 + 0.0.1 + + + + + + + \ No newline at end of file From d501c4d4ce9240ed88a488880a9f98cca0db2b53 Mon Sep 17 00:00:00 2001 From: Daniel Sagenschneider Date: Sat, 8 Sep 2018 01:51:33 +0800 Subject: [PATCH 3/3] Tidy up for reducing number of requests --- .../org/postgresql/sql2/PgConnection.java | 230 ++++++++++-------- .../sql2/communication/NetworkConnection.java | 6 +- .../communication/NetworkReadContext.java | 6 +- .../communication/NetworkWriteContext.java | 6 +- ...dStatementCache.java => QueryFactory.java} | 25 +- .../network/AbstractPortalResponse.java | 31 --- .../network/AbstractQueryResponse.java | 72 ++++++ .../communication/network/BindRequest.java | 19 +- .../communication/network/BindResponse.java | 6 +- .../network/DescribeRequest.java | 42 +++- .../network/DescribeResponse.java | 9 +- .../communication/network/ExecuteRequest.java | 13 +- .../network/ExecuteResponse.java | 14 +- .../network/NetworkConnectRequest.java | 2 +- .../communication/network/ParseRequest.java | 56 ++--- .../communication/network/ParseResponse.java | 8 +- .../sql2/communication/network/Portal.java | 183 +------------- .../sql2/communication/network/Query.java | 126 +++++++--- .../communication/network/QueryReuse.java | 140 +++++++++++ .../communication/network/SyncRequest.java | 11 +- .../sql2/communication/QueryFactoryTest.java | 41 ++++ .../sql2/execution/NioLoopTest.java | 10 +- .../sql2/util/PreparedStatementCacheTest.java | 28 --- 23 files changed, 601 insertions(+), 483 deletions(-) rename src/main/java/org/postgresql/sql2/communication/{PreparedStatementCache.java => QueryFactory.java} (56%) delete mode 100644 src/main/java/org/postgresql/sql2/communication/network/AbstractPortalResponse.java create mode 100644 src/main/java/org/postgresql/sql2/communication/network/AbstractQueryResponse.java create mode 100644 src/main/java/org/postgresql/sql2/communication/network/QueryReuse.java create mode 100644 src/test/java/org/postgresql/sql2/communication/QueryFactoryTest.java delete mode 100644 src/test/java/org/postgresql/sql2/util/PreparedStatementCacheTest.java diff --git a/src/main/java/org/postgresql/sql2/PgConnection.java b/src/main/java/org/postgresql/sql2/PgConnection.java index a43db95..4f8e35f 100644 --- a/src/main/java/org/postgresql/sql2/PgConnection.java +++ b/src/main/java/org/postgresql/sql2/PgConnection.java @@ -64,14 +64,14 @@ public class PgConnection extends PgOperationGroup implements Co /** * Meant for internal usage, use the connection builder instead. + * * @param properties connection properties * @param dataSource datasource that this connection is a part of - * @param loop the nioLoop is what transports data + * @param loop the nioLoop is what transports data * @param bufferPool Pool of {@link PooledByteBuffer} instances. * @throws IOException if there is problems with opening a socket channel */ - public PgConnection(Map properties, PgDataSource dataSource, - NioLoop loop, + public PgConnection(Map properties, PgDataSource dataSource, NioLoop loop, ByteBufferPool bufferPool) throws IOException { this.properties = properties; this.dataSource = dataSource; @@ -82,74 +82,86 @@ public PgConnection(Map properties, PgDataSource dat } /** - * Returns an {@link Operation} that connects this {@link Connection} to a server. If the - * Operation completes successfully and the lifecycle is {@link Lifecycle#NEW} -> {@link - * Lifecycle#OPEN}. If lifecycle is {@link Lifecycle#NEW_INACTIVE} -> {@link - * Lifecycle#INACTIVE}. If the {@link Operation} completes exceptionally the lifecycle -> - * {@link Lifecycle#CLOSED}. The lifecycle must be {@link Lifecycle#NEW} or {@link - * Lifecycle#NEW_INACTIVE} when the {@link Operation} is executed. Otherwise the {@link Operation} - * will complete exceptionally with {@link SqlException}. + * Returns an {@link Operation} that connects this {@link Connection} to a + * server. If the Operation completes successfully and the lifecycle is + * {@link Lifecycle#NEW} -> {@link Lifecycle#OPEN}. If lifecycle is + * {@link Lifecycle#NEW_INACTIVE} -> {@link Lifecycle#INACTIVE}. If the + * {@link Operation} completes exceptionally the lifecycle -> + * {@link Lifecycle#CLOSED}. The lifecycle must be {@link Lifecycle#NEW} or + * {@link Lifecycle#NEW_INACTIVE} when the {@link Operation} is executed. + * Otherwise the {@link Operation} will complete exceptionally with + * {@link SqlException}. * - *

Note: It is highly recommended to use the {@link Connection#connect()} convenience method or to - * use {@link DataSource#getConnection} which itself calls {@link Connection#connect()}. Unless - * there is a specific need, do not call this method directly. + *

+ * Note: It is highly recommended to use the {@link Connection#connect()} + * convenience method or to use {@link DataSource#getConnection} which itself + * calls {@link Connection#connect()}. Unless there is a specific need, do not + * call this method directly. * - *

This method exists partially to clearly explain that while creating a {@link Connection} is - * non-blocking, the act of connecting to the server may block and so is executed asynchronously. - * We could write a bunch of text saying this but defining this method is more explicit. Given the - * {@link Connection#connect()} convenience methods there's probably not much reason to use this - * method, but on the other hand, who knows, so here it is. + *

+ * This method exists partially to clearly explain that while creating a + * {@link Connection} is non-blocking, the act of connecting to the server may + * block and so is executed asynchronously. We could write a bunch of text + * saying this but defining this method is more explicit. Given the + * {@link Connection#connect()} convenience methods there's probably not much + * reason to use this method, but on the other hand, who knows, so here it is. * - * @return an {@link Operation} that connects this {@link Connection} to a server. - * @throws IllegalStateException if this {@link Connection} is in a lifecycle state other than - * {@link Lifecycle#NEW}. + * @return an {@link Operation} that connects this {@link Connection} to a + * server. + * @throws IllegalStateException if this {@link Connection} is in a lifecycle + * state other than {@link Lifecycle#NEW}. */ @Override public Operation connectOperation() { if (lifecycle != Lifecycle.NEW) { - throw new IllegalStateException( - "only connections in state NEW are allowed to start connecting"); + throw new IllegalStateException("only connections in state NEW are allowed to start connecting"); } return new PgConnectOperation(this, groupSubmission); } /** - * Returns an {@link Operation} that verifies that the resources are available and operational. - * Successful completion of that {@link Operation} implies that at some point between the - * beginning and end of the {@link Operation} the Connection was working properly to the extent - * specified by {@code depth}. There is no guarantee that the {@link Connection} is still working - * after completion. + * Returns an {@link Operation} that verifies that the resources are available + * and operational. Successful completion of that {@link Operation} implies that + * at some point between the beginning and end of the {@link Operation} the + * Connection was working properly to the extent specified by {@code depth}. + * There is no guarantee that the {@link Connection} is still working after + * completion. * - * @param depth how completely to check that resources are available and operational. Not {@code - * null}. + * @param depth how completely to check that resources are available and + * operational. Not {@code + * null} . * @return an {@link Operation} that will validate this {@link Connection} * @throws IllegalStateException if this Connection is not active */ @Override public Operation validationOperation(Connection.Validation depth) { if (!lifecycle.isOpen() || !lifecycle.isActive()) { - throw new IllegalStateException( - "connection lifecycle in state: " + lifecycle + " and not open for new work"); + throw new IllegalStateException("connection lifecycle in state: " + lifecycle + " and not open for new work"); } return new PgValidationOperation(this, depth); } /** - * Create an {@link Operation} to close this {@link Connection}. When the {@link Operation} is - * executed, if this {@link Connection} is open -> {@link Lifecycle#CLOSING}. If this {@link - * Connection} is closed executing the returned {@link Operation} is a noop. When the queue is - * empty and all resources released -> {@link Lifecycle#CLOSED}. + * Create an {@link Operation} to close this {@link Connection}. When the + * {@link Operation} is executed, if this {@link Connection} is open -> + * {@link Lifecycle#CLOSING}. If this {@link Connection} is closed executing the + * returned {@link Operation} is a noop. When the queue is empty and all + * resources released -> {@link Lifecycle#CLOSED}. * - *

A close {@link Operation} is never skipped. Even when the {@link Connection} is dependent, the - * default, and an {@link Operation} completes exceptionally, a close {@link Operation} is still - * executed. If the {@link Connection} is parallel, a close {@link Operation} is not executed so - * long as there are other {@link Operation}s or the {@link Connection} is held; for more {@link - * Operation}s. + *

+ * A close {@link Operation} is never skipped. Even when the {@link Connection} + * is dependent, the default, and an {@link Operation} completes exceptionally, + * a close {@link Operation} is still executed. If the {@link Connection} is + * parallel, a close {@link Operation} is not executed so long as there are + * other {@link Operation}s or the {@link Connection} is held; for more + * {@link Operation}s. * - *

Note: It is highly recommended to use try with resources or the {@link Connection#close()} - * convenience method. Unless there is a specific need, do not call this method directly. + *

+ * Note: It is highly recommended to use try with resources or the + * {@link Connection#close()} convenience method. Unless there is a specific + * need, do not call this method directly. * * @return an {@link Operation} that will close this {@link Connection}. * @throws IllegalStateException if the Connection is not active @@ -169,17 +181,17 @@ public Operation closeOperation() { /** * Create a new {@link OperationGroup} for this {@link Connection}. * - * @param the result type of the member {@link Operation}s of the returned {@link - * OperationGroup} - * @param the result type of the collected results of the member {@link Operation}s + * @param the result type of the member {@link Operation}s of the returned + * {@link OperationGroup} + * @param the result type of the collected results of the member + * {@link Operation}s * @return a new {@link OperationGroup}. * @throws IllegalStateException if this Connection is not active */ @Override public OperationGroup operationGroup() { if (!lifecycle.isOpen() || !lifecycle.isActive()) { - throw new IllegalStateException( - "connection lifecycle in state: " + lifecycle + " and not open for new work"); + throw new IllegalStateException("connection lifecycle in state: " + lifecycle + " and not open for new work"); } if (logger.isLoggable(Level.CONFIG)) { @@ -190,11 +202,14 @@ public OperationGroup operationGroup() { } /** - * Returns a new {@link Transaction} that can be used as an argument to a commit Operation. + * Returns a new {@link Transaction} that can be used as an argument to a commit + * Operation. * - *

It is most likely an error to call this within an error handler, or any handler as it is very - * likely that when the handler is executed the next submitted endTransaction {@link Operation} - * will have been created with a different Transaction. + *

+ * It is most likely an error to call this within an error handler, or any + * handler as it is very likely that when the handler is executed the next + * submitted endTransaction {@link Operation} will have been created with a + * different Transaction. * * @return a new {@link Transaction}. Not retained. * @throws IllegalStateException if this Connection is not active @@ -205,8 +220,8 @@ public Transaction transaction() { } /** - * Register a listener that will be called whenever there is a change in the lifecycle of this - * {@link Connection}. + * Register a listener that will be called whenever there is a change in the + * lifecycle of this {@link Connection}. * * @param listener Can be {@code null}. * @throws IllegalStateException if this Connection is not active @@ -225,9 +240,10 @@ public Connection registerLifecycleListener(Connection.ConnectionLifecycleListen } /** - * Removes a listener that was registered by calling registerLifecycleListener.Sometime after this - * method is called the listener will stop receiving lifecycle events. If the listener is not - * registered, this is a no-op. + * Removes a listener that was registered by calling + * registerLifecycleListener.Sometime after this method is called the listener + * will stop receiving lifecycle events. If the listener is not registered, this + * is a no-op. * * @param listener Not {@code null}. * @return this Connection @@ -257,12 +273,13 @@ public Lifecycle getConnectionLifecycle() { } /** - * Terminate this {@link Connection}. If lifecycle is {@link Lifecycle#NEW}, {@link - * Lifecycle#OPEN}, {@link Lifecycle#INACTIVE} or {@link Lifecycle#CLOSING} -> {@link - * Lifecycle#ABORTING} If lifecycle is {@link Lifecycle#ABORTING} or {@link Lifecycle#CLOSED} this - * is a noop. If an {@link Operation} is currently executing, terminate it immediately. Remove all - * remaining {@link Operation}s from the queue. {@link Operation}s are not skipped. They are just - * removed from the queue. + * Terminate this {@link Connection}. If lifecycle is {@link Lifecycle#NEW}, + * {@link Lifecycle#OPEN}, {@link Lifecycle#INACTIVE} or + * {@link Lifecycle#CLOSING} -> {@link Lifecycle#ABORTING} If lifecycle is + * {@link Lifecycle#ABORTING} or {@link Lifecycle#CLOSED} this is a noop. If an + * {@link Operation} is currently executing, terminate it immediately. Remove + * all remaining {@link Operation}s from the queue. {@link Operation}s are not + * skipped. They are just removed from the queue. * * @return this {@link Connection} */ @@ -279,13 +296,15 @@ public Connection abort() { } /** - * Return the set of properties configured on this {@link Connection} excepting any sensitive - * properties. Neither the key nor the value for sensitive properties are included in the result. - * Properties (other than sensitive properties) that have default values are included even when - * not explicitly set. Properties that have no default value and are not set explicitly are not + * Return the set of properties configured on this {@link Connection} excepting + * any sensitive properties. Neither the key nor the value for sensitive + * properties are included in the result. Properties (other than sensitive + * properties) that have default values are included even when not explicitly + * set. Properties that have no default value and are not set explicitly are not * included. * - * @return a {@link Map} of property, value. Not modifiable. May be retained. Not {@code null}. + * @return a {@link Map} of property, value. Not modifiable. May be retained. + * Not {@code null}. * @throws IllegalStateException if this Connection is not active */ @Override @@ -294,7 +313,8 @@ public Map getProperties() { } /** - * Returns a {@link ShardingKey.Builder} that is valid for this {@link Connection}. + * Returns a {@link ShardingKey.Builder} that is valid for this + * {@link Connection}. * * @return a {@link ShardingKey.Builder} for this {@link Connection} */ @@ -309,10 +329,11 @@ public Connection requestHook(Consumer request) { } /** - * Make this {@link Connection} ready for use. A newly created {@link Connection} is active. - * Calling this method on a {@link Connection} that is active is a no-op. If the lifecycle is - * {@link Lifecycle#INACTIVE} -> {@link Lifecycle#OPEN}. If the lifecycle is {@link - * Lifecycle#NEW_INACTIVE} -> {@link Lifecycle#NEW}. + * Make this {@link Connection} ready for use. A newly created + * {@link Connection} is active. Calling this method on a {@link Connection} + * that is active is a no-op. If the lifecycle is {@link Lifecycle#INACTIVE} + * -> {@link Lifecycle#OPEN}. If the lifecycle is + * {@link Lifecycle#NEW_INACTIVE} -> {@link Lifecycle#NEW}. * * @return this {@link Connection} * @throws IllegalStateException if this {@link Connection} is closed. @@ -330,23 +351,28 @@ public Connection activate() { } /** - * Makes this {@link Connection} inactive. After a call to this method previously submitted - * Operations will be executed normally. If the lifecycle is {@link Lifecycle#NEW} -> {@link - * Lifecycle#NEW_INACTIVE}. if the lifecycle is {@link Lifecycle#OPEN} -> {@link - * Lifecycle#INACTIVE}. If the lifecycle is {@link Lifecycle#INACTIVE} or {@link - * Lifecycle#NEW_INACTIVE} this method is a no-op. After calling this method calling any method - * other than {@link Connection#deactivate}, {@link Connection#activate}, {@link - * Connection#abort}, or {@link Connection#getConnectionLifecycle} or submitting any member {@link - * Operation} will throw {@link IllegalStateException}. Local {@link Connection} state not created - * by {@link Connection.Builder} may not be preserved. + * Makes this {@link Connection} inactive. After a call to this method + * previously submitted Operations will be executed normally. If the lifecycle + * is {@link Lifecycle#NEW} -> {@link Lifecycle#NEW_INACTIVE}. if the + * lifecycle is {@link Lifecycle#OPEN} -> {@link Lifecycle#INACTIVE}. If the + * lifecycle is {@link Lifecycle#INACTIVE} or {@link Lifecycle#NEW_INACTIVE} + * this method is a no-op. After calling this method calling any method other + * than {@link Connection#deactivate}, {@link Connection#activate}, + * {@link Connection#abort}, or {@link Connection#getConnectionLifecycle} or + * submitting any member {@link Operation} will throw + * {@link IllegalStateException}. Local {@link Connection} state not created by + * {@link Connection.Builder} may not be preserved. * - *

Any implementation of a {@link Connection} pool is by default required to call {@code - * deactivate} when putting a {@link Connection} into a pool. The implementation is required to - * call {@code activate} when removing a {@link Connection} from a pool so the {@link Connection} - * can be used. An implementation of a {@link Connection} pool may have an optional mode where it - * does not call {@code deactivate}/{@code activate} as required above. The behavior of the pool - * and {@link Connection}s cached in the pool in such a mode is entirely implementation - * dependent. + *

+ * Any implementation of a {@link Connection} pool is by default required to + * call {@code + * deactivate} when putting a {@link Connection} into a pool. The implementation + * is required to call {@code activate} when removing a {@link Connection} from + * a pool so the {@link Connection} can be used. An implementation of a + * {@link Connection} pool may have an optional mode where it does not call + * {@code deactivate}/{@code activate} as required above. The behavior of the + * pool and {@link Connection}s cached in the pool in such a mode is entirely + * implementation dependent. * * @return this {@link Connection} * @throws IllegalStateException if this {@link Connection} is closed @@ -399,20 +425,18 @@ public void sendNetworkConnect(NetworkConnect connect) { */ public void submit(PgSubmission submission) { switch (submission.getCompletionType()) { - case LOCAL: - case CATCH: - sendNetworkRequest(new ImmediateComplete(submission)); - break; - case GROUP: - if (lastSubmission != null) { - ((CompletableFuture) lastSubmission.getCompletionStage()).thenApply(a -> - submission.finish(null)); - } - break; - - default: - Portal portal = new Portal(submission); - sendNetworkRequest(new ParseRequest<>(portal)); + case LOCAL: + case CATCH: + sendNetworkRequest(new ImmediateComplete(submission)); + break; + case GROUP: + if (lastSubmission != null) { + ((CompletableFuture) lastSubmission.getCompletionStage()).thenApply(a -> submission.finish(null)); + } + break; + + default: + sendNetworkRequest(new ParseRequest<>(submission)); } lastSubmission = submission; } diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java b/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java index df61a72..d796ea7 100644 --- a/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java +++ b/src/main/java/org/postgresql/sql2/communication/NetworkConnection.java @@ -47,7 +47,7 @@ public class NetworkConnection implements NioService, NetworkConnectContext, Net private final BEFrameParser parser = new BEFrameParser(); - private final PreparedStatementCache preparedStatementCache = new PreparedStatementCache(); + private final QueryFactory queryFactory = new QueryFactory(); private NetworkConnect connect = null; @@ -462,8 +462,8 @@ public NetworkOutputStream getOutputStream() { } @Override - public PreparedStatementCache getPreparedStatementCache() { - return this.preparedStatementCache; + public QueryFactory getQueryFactory() { + return this.queryFactory; } @Override diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java b/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java index bcc3cfe..63e16dd 100644 --- a/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java +++ b/src/main/java/org/postgresql/sql2/communication/NetworkReadContext.java @@ -31,11 +31,11 @@ public interface NetworkReadContext extends NetworkContext { NetworkInputStream getPayload(); /** - * Obtains the {@link PreparedStatementCache}. + * Obtains the {@link QueryFactory}. * - * @return {@link PreparedStatementCache}. + * @return {@link QueryFactory}. */ - PreparedStatementCache getPreparedStatementCache(); + QueryFactory getQueryFactory(); /** * Allows overriding {@link ConnectionProperty}. diff --git a/src/main/java/org/postgresql/sql2/communication/NetworkWriteContext.java b/src/main/java/org/postgresql/sql2/communication/NetworkWriteContext.java index cfeba26..50d733e 100644 --- a/src/main/java/org/postgresql/sql2/communication/NetworkWriteContext.java +++ b/src/main/java/org/postgresql/sql2/communication/NetworkWriteContext.java @@ -15,10 +15,10 @@ public interface NetworkWriteContext extends NetworkContext { NetworkOutputStream getOutputStream(); /** - * Obtains the {@link PreparedStatementCache}. + * Obtains the {@link QueryFactory}. * - * @return {@link PreparedStatementCache}. + * @return {@link QueryFactory}. */ - PreparedStatementCache getPreparedStatementCache(); + QueryFactory getQueryFactory(); } \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/PreparedStatementCache.java b/src/main/java/org/postgresql/sql2/communication/QueryFactory.java similarity index 56% rename from src/main/java/org/postgresql/sql2/communication/PreparedStatementCache.java rename to src/main/java/org/postgresql/sql2/communication/QueryFactory.java index 57e8d82..85ddf13 100644 --- a/src/main/java/org/postgresql/sql2/communication/PreparedStatementCache.java +++ b/src/main/java/org/postgresql/sql2/communication/QueryFactory.java @@ -4,15 +4,18 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutionException; +import org.postgresql.sql2.PgSubmission; import org.postgresql.sql2.communication.network.Query; +import org.postgresql.sql2.communication.network.QueryReuse; -public class PreparedStatementCache { +public class QueryFactory { /** * As only used on networking thread, is thread safe. */ - private Map sqlToQuery = new HashMap<>(); + private Map sqlToReuse = new HashMap<>(); /** * Obtains the {@link Query} for the SQL. @@ -21,13 +24,21 @@ public class PreparedStatementCache { * @param params Parameters. * @return {@link Query}. */ - public Query getQuery(String sql, List params) { + public Query createQuery(PgSubmission submission) throws InterruptedException, ExecutionException { + + // Obtain the details + String sql = submission.getSql(); + List parameters = submission.getParamTypes(); if (sql == null) { throw new IllegalArgumentException("No SQL provided"); } - // Obtain or create the query - return this.sqlToQuery.computeIfAbsent(new StatementKey(sql, params), key -> new Query()); + // Obtain or create the query re-use + StatementKey key = new StatementKey(sql, parameters); + QueryReuse reuse = this.sqlToReuse.computeIfAbsent(key, (absentKey) -> new QueryReuse()); + + // Return the query + return new Query(submission, reuse); } private class StatementKey { @@ -50,7 +61,7 @@ public boolean equals(Object o) { } StatementKey that = (StatementKey) o; - return Objects.equals(sql, that.sql) && Objects.equals(params, that.params); + return Objects.equals(sql, that.sql) ; //&& Objects.equals(params, that.params); } @Override @@ -59,4 +70,4 @@ public int hashCode() { } } -} +} \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/network/AbstractPortalResponse.java b/src/main/java/org/postgresql/sql2/communication/network/AbstractPortalResponse.java deleted file mode 100644 index f2947cc..0000000 --- a/src/main/java/org/postgresql/sql2/communication/network/AbstractPortalResponse.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.postgresql.sql2.communication.network; - -import org.postgresql.sql2.communication.NetworkResponse; - -/** - * Abstract {@link Portal} {@link NetworkResponse}. - * - * @author Daniel Sagenschneider - */ -public abstract class AbstractPortalResponse implements NetworkResponse { - - /** - * {@link Portal}. - */ - protected final Portal portal; - - /** - * Instantiate. - * - * @param portal {@link Portal}. - */ - public AbstractPortalResponse(Portal portal) { - this.portal = portal; - } - - @Override - public NetworkResponse handleException(Throwable ex) { - portal.handleException(ex); - return new ReadyForQueryResponse(); - } -} \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/network/AbstractQueryResponse.java b/src/main/java/org/postgresql/sql2/communication/network/AbstractQueryResponse.java new file mode 100644 index 0000000..d378a9b --- /dev/null +++ b/src/main/java/org/postgresql/sql2/communication/network/AbstractQueryResponse.java @@ -0,0 +1,72 @@ +package org.postgresql.sql2.communication.network; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import org.postgresql.sql2.PgSubmission; +import org.postgresql.sql2.communication.NetworkResponse; +import org.postgresql.sql2.communication.packets.ErrorPacket; +import org.postgresql.sql2.communication.packets.parts.ErrorResponseField; + +import jdk.incubator.sql2.SqlException; + +/** + * Abstract {@link PgSubmission} {@link NetworkResponse}. + * + * @author Daniel Sagenschneider + */ +public abstract class AbstractQueryResponse implements NetworkResponse { + + /** + * Handle exception that occurred. + * + * @param submission the submission that was active when the exception happened + * @param ex the exception + */ + public static void doHandleException(PgSubmission submission, Throwable ex) { + if (ex instanceof ErrorPacket) { + ErrorPacket e = (ErrorPacket) ex; + int code = 0; + if (e.getField(ErrorResponseField.Types.SQLSTATE_CODE) != null) { + try { + code = Integer.parseInt(e.getField(ErrorResponseField.Types.SQLSTATE_CODE)); + } catch (NumberFormatException ignore) { + // ignored for now + } + } + int position = 0; + if (e.getField(ErrorResponseField.Types.POSITION) != null) { + position = Integer.parseInt(e.getField(ErrorResponseField.Types.POSITION)); + } + ex = new SqlException(e.getMessage(), e, e.getField(ErrorResponseField.Types.SEVERITY), code, null, position); + } + if (!(ex instanceof SqlException)) { + ex = new SqlException(ex.getMessage(), ex, null, 0, null, 0); + } + Consumer errorHandler = submission.getErrorHandler(); + if (errorHandler != null) { + errorHandler.accept(ex); + } + ((CompletableFuture) submission.getCompletionStage()).completeExceptionally(ex); + } + + /** + * {@link Query}. + */ + protected final Query query; + + /** + * Instantiate. + * + * @param query {@link Query}. + */ + public AbstractQueryResponse(Query query) { + this.query = query; + } + + @Override + public NetworkResponse handleException(Throwable ex) { + doHandleException(this.query.getSubmission(), ex); + return new ReadyForQueryResponse(); + } +} \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/network/BindRequest.java b/src/main/java/org/postgresql/sql2/communication/network/BindRequest.java index 106797c..412b30d 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/BindRequest.java +++ b/src/main/java/org/postgresql/sql2/communication/network/BindRequest.java @@ -16,10 +16,10 @@ */ public class BindRequest implements NetworkRequest { - private final Portal portal; + private final Query query; - public BindRequest(Portal portal) { - this.portal = portal; + public BindRequest(Query query) { + this.query = query; } /* @@ -30,17 +30,14 @@ public BindRequest(Portal portal) { public NetworkRequest write(NetworkWriteContext context) throws Exception { // Obtain the query details - String portalName = this.portal.getPortalName(); - String queryName = this.portal.getQuery().getQueryName(); - String sql = this.portal.getSql(); - ParameterHolder holder = this.portal.getParameterHolder(); + ParameterHolder holder = this.query.getSubmission().getHolder(); // Write the packet NetworkOutputStream wire = context.getOutputStream(); wire.write(FeFrame.FrontendTag.BIND.getByte()); wire.initPacket(); - wire.write(this.portal.getPortalName()); - wire.write(this.portal.getQuery().getQueryName()); + wire.write(this.query.getQueryName()); + wire.write(this.query.getReuse().getPortalNameOrUnnamed()); wire.write(BinaryHelper.writeShort(holder.size())); for (QueryParameter qp : holder.parameters()) { wire.write(BinaryHelper.writeShort(qp.getParameterFormatCode())); @@ -61,12 +58,12 @@ public NetworkRequest write(NetworkWriteContext context) throws Exception { wire.completePacket(); // Next step to execute - return new ExecuteRequest<>(this.portal); + return new ExecuteRequest<>(this.query); } @Override public NetworkResponse getRequiredResponse() { - return new BindResponse(this.portal); + return new BindResponse(this.query); } } \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java b/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java index b60f42c..0e14f30 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/BindResponse.java @@ -11,10 +11,10 @@ * * @author Daniel Sagenschneider */ -public class BindResponse extends AbstractPortalResponse { +public class BindResponse extends AbstractQueryResponse { - public BindResponse(Portal portal) { - super(portal); + public BindResponse(Query query) { + super(query); } @Override diff --git a/src/main/java/org/postgresql/sql2/communication/network/DescribeRequest.java b/src/main/java/org/postgresql/sql2/communication/network/DescribeRequest.java index 5d6e0b9..b8caaf1 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/DescribeRequest.java +++ b/src/main/java/org/postgresql/sql2/communication/network/DescribeRequest.java @@ -13,15 +13,15 @@ */ public class DescribeRequest implements NetworkRequest { - private final Portal portal; + private final Query query; /** * Instantiate. * - * @param portal the portal this request connects to + * @param query {@link Query}. */ - public DescribeRequest(Portal portal) { - this.portal = portal; + public DescribeRequest(Query query) { + this.query = query; } /* @@ -31,21 +31,37 @@ public DescribeRequest(Portal portal) { @Override public NetworkRequest write(NetworkWriteContext context) throws Exception { - // Send describe packet - NetworkOutputStream wire = context.getOutputStream(); - wire.write(FeFrame.FrontendTag.DESCRIBE.getByte()); - wire.initPacket(); - wire.write('S'); - wire.write(this.portal.getQuery().getQueryName()); - wire.completePacket(); + // Obtain the reuse + QueryReuse reuse = this.query.getReuse(); + + // Determine if describe query + if ((reuse.getRowDescription() == null) && (!reuse.isWaitingDescribe())) { + + // Send describe packet + NetworkOutputStream wire = context.getOutputStream(); + wire.write(FeFrame.FrontendTag.DESCRIBE.getByte()); + wire.initPacket(); + wire.write('S'); + wire.write(this.query.getReuse().getPortalNameOrUnnamed()); + wire.completePacket(); + } // Next step to bind - return new BindRequest<>(this.portal); + return new BindRequest<>(this.query); } @Override public NetworkResponse getRequiredResponse() { - return new DescribeResponse(this.portal); + + // Determine if waiting on describe + QueryReuse reuse = this.query.getReuse(); + if (!reuse.isWaitingDescribe()) { + reuse.flagWaitingDescribe(); + return new DescribeResponse(this.query); + } + + // Already waiting on describe + return null; } } \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java b/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java index bdfd07e..6f1c71d 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/DescribeResponse.java @@ -12,10 +12,10 @@ * * @author Daniel Sagenschneider */ -public class DescribeResponse extends AbstractPortalResponse { +public class DescribeResponse extends AbstractQueryResponse { - public DescribeResponse(Portal portal) { - super(portal); + public DescribeResponse(Query query) { + super(query); } @Override @@ -30,7 +30,8 @@ public NetworkResponse read(NetworkReadContext context) throws IOException { case BEFrameParser.ROW_DESCRIPTION: RowDescription rowDescription = new RowDescription(context.getPayload()); - this.portal.getQuery().setRowDescription(rowDescription); + QueryReuse reuse = this.query.getReuse(); + reuse.setRowDescription(rowDescription); return null; // nothing further default: diff --git a/src/main/java/org/postgresql/sql2/communication/network/ExecuteRequest.java b/src/main/java/org/postgresql/sql2/communication/network/ExecuteRequest.java index e7c19f6..7b95fd0 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/ExecuteRequest.java +++ b/src/main/java/org/postgresql/sql2/communication/network/ExecuteRequest.java @@ -14,10 +14,10 @@ */ public class ExecuteRequest implements NetworkRequest { - private final Portal portal; + private final Query query; - public ExecuteRequest(Portal portal) { - this.portal = portal; + public ExecuteRequest(Query query) { + this.query = query; } /* @@ -31,17 +31,16 @@ public NetworkRequest write(NetworkWriteContext context) throws Exception { NetworkOutputStream wire = context.getOutputStream(); wire.write(FeFrame.FrontendTag.EXECUTE.getByte()); wire.initPacket(); - wire.write(this.portal.getPortalName()); + wire.write(this.query.getQueryName()); wire.write(BinaryHelper.writeInt(0)); // number of rows to return, 0 == all wire.completePacket(); - // TODO Auto-generated method stub - return new SyncRequest(portal); + return new SyncRequest(this.query); } @Override public NetworkResponse getRequiredResponse() { - return new ExecuteResponse(this.portal); + return new ExecuteResponse(this.query); } } \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java b/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java index 54a805c..d56d339 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/ExecuteResponse.java @@ -13,10 +13,10 @@ * * @author Daniel Sagenschneider */ -public class ExecuteResponse extends AbstractPortalResponse { +public class ExecuteResponse extends AbstractQueryResponse { - public ExecuteResponse(Portal portal) { - super(portal); + public ExecuteResponse(Query query) { + super(query); } @Override @@ -24,14 +24,14 @@ public NetworkResponse read(NetworkReadContext context) throws IOException { switch (context.getFrameTag()) { case BEFrameParser.DATA_ROW: - DataRow dataRow = new DataRow(context, this.portal.getQuery().getRowDescription().getDescriptions(), - this.portal.nextRowNumber()); - this.portal.addDataRow(dataRow); + DataRow dataRow = new DataRow(context, this.query.getReuse().getRowDescription().getDescriptions(), + this.query.nextRowNumber()); + this.query.addDataRow(dataRow); return this; case BEFrameParser.COMMAND_COMPLETE: CommandComplete complete = new CommandComplete(context); - this.portal.commandComplete(complete, context.getSocketChannel()); + this.query.commandComplete(complete, context.getSocketChannel()); return this; case BEFrameParser.READY_FOR_QUERY: diff --git a/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java b/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java index 32e6ed3..63e9451 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java +++ b/src/main/java/org/postgresql/sql2/communication/network/NetworkConnectRequest.java @@ -134,7 +134,7 @@ public NetworkResponse read(NetworkReadContext context) throws IOException { @Override public NetworkResponse handleException(Throwable ex) { - Portal.doHandleException(this.connectSubmission, ex); + AbstractQueryResponse.doHandleException(this.connectSubmission, ex); return null; } diff --git a/src/main/java/org/postgresql/sql2/communication/network/ParseRequest.java b/src/main/java/org/postgresql/sql2/communication/network/ParseRequest.java index 9624d8a..2f04d08 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/ParseRequest.java +++ b/src/main/java/org/postgresql/sql2/communication/network/ParseRequest.java @@ -1,11 +1,12 @@ package org.postgresql.sql2.communication.network; +import org.postgresql.sql2.PgSubmission; import org.postgresql.sql2.communication.FeFrame; import org.postgresql.sql2.communication.NetworkOutputStream; import org.postgresql.sql2.communication.NetworkRequest; import org.postgresql.sql2.communication.NetworkResponse; import org.postgresql.sql2.communication.NetworkWriteContext; -import org.postgresql.sql2.communication.PreparedStatementCache; +import org.postgresql.sql2.communication.QueryFactory; import org.postgresql.sql2.operations.helpers.ParameterHolder; import org.postgresql.sql2.operations.helpers.QueryParameter; import org.postgresql.sql2.util.BinaryHelper; @@ -17,10 +18,12 @@ */ public class ParseRequest implements NetworkRequest { - private final Portal portal; + private final PgSubmission submission; - public ParseRequest(Portal portal) { - this.portal = portal; + private Query query = null; + + public ParseRequest(PgSubmission submission) { + this.submission = submission; } /* @@ -30,55 +33,48 @@ public ParseRequest(Portal portal) { @Override public NetworkRequest write(NetworkWriteContext context) throws Exception { - // Determine if already query - Query query = portal.getQuery(); - if (query == null) { - - // Obtain the prepared statement cache - PreparedStatementCache cache = context.getPreparedStatementCache(); + // Obtain the query + this.query = context.getQueryFactory().createQuery(this.submission); - // Obtain the query - String sql = this.portal.getSql(); - ParameterHolder holder = this.portal.getParameterHolder(); - query = cache.getQuery(sql, holder.getParamTypes()); + // Obtain the details of query + String sql = this.query.getSubmission().getSql(); + ParameterHolder parameters = this.query.getSubmission().getHolder(); - // Associate query to portal - this.portal.setQuery(query); + // Determine if simple query (not yet executed enough) + QueryReuse reuse = this.query.getReuse(); + if (reuse.isSimpleQuery()) { + // TODO send simple query + throw new UnsupportedOperationException("TODO implement simple query"); } // Determine if prepare query - if ((!query.isParsed()) && (!query.isWaitingParse())) { - - // Obtain the query details - String sql = this.portal.getSql(); - ParameterHolder holder = this.portal.getParameterHolder(); + if ((!reuse.isParsed()) && (!reuse.isWaitingParse())) { // Send the prepare packet NetworkOutputStream wire = context.getOutputStream(); wire.write(FeFrame.FrontendTag.PARSE.getByte()); wire.initPacket(); - wire.write(query.getQueryName()); + wire.write(this.query.getReuse().getPortalNameOrUnnamed()); wire.write(sql); - wire.write(BinaryHelper.writeShort(holder.size())); - for (QueryParameter qp : holder.parameters()) { + wire.write(BinaryHelper.writeShort(parameters.size())); + for (QueryParameter qp : parameters.parameters()) { wire.write(BinaryHelper.writeInt(qp.getOid())); } wire.completePacket(); } // Determine if describe or bind - return new DescribeRequest<>(this.portal); - + return new DescribeRequest<>(this.query); } @Override public NetworkResponse getRequiredResponse() { - Query query = this.portal.getQuery(); // Determine if waiting on parse - if (!query.isWaitingParse()) { - query.flagWaitingParse(); - return new ParseResponse(this.portal); + QueryReuse reuse = this.query.getReuse(); + if (!reuse.isWaitingParse()) { + reuse.flagWaitingParse(); + return new ParseResponse(this.query); } // Already waiting on parse diff --git a/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java b/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java index 9bf4411..007e2ad 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java +++ b/src/main/java/org/postgresql/sql2/communication/network/ParseResponse.java @@ -11,10 +11,10 @@ * * @author Daniel Sagenschneider */ -public class ParseResponse extends AbstractPortalResponse { +public class ParseResponse extends AbstractQueryResponse { - public ParseResponse(Portal portal) { - super(portal); + public ParseResponse(Query query) { + super(query); } @Override @@ -22,7 +22,7 @@ public NetworkResponse read(NetworkReadContext context) throws IOException { switch (context.getFrameTag()) { case BEFrameParser.PARSE_COMPLETE: - this.portal.getQuery().flagParsed(); + this.query.getReuse().flagParsed(); return null; // nothing further default: diff --git a/src/main/java/org/postgresql/sql2/communication/network/Portal.java b/src/main/java/org/postgresql/sql2/communication/network/Portal.java index d40353d..8623f20 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/Portal.java +++ b/src/main/java/org/postgresql/sql2/communication/network/Portal.java @@ -1,21 +1,6 @@ package org.postgresql.sql2.communication.network; -import static org.postgresql.sql2.PgSubmission.Types.ARRAY_COUNT; - -import java.nio.channels.SocketChannel; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import jdk.incubator.sql2.SqlException; -import org.postgresql.sql2.PgSubmission; -import org.postgresql.sql2.communication.packets.CommandComplete; -import org.postgresql.sql2.communication.packets.DataRow; -import org.postgresql.sql2.communication.packets.ErrorPacket; -import org.postgresql.sql2.communication.packets.parts.ErrorResponseField; -import org.postgresql.sql2.operations.helpers.ParameterHolder; -import org.postgresql.sql2.submissions.ArrayCountSubmission; -import org.postgresql.sql2.util.PgCount; /** * Portal. @@ -24,78 +9,15 @@ */ public class Portal { - /** - * Handle exception that occurred. - * - * @param submission the submission that was active when the exception happened - * @param ex the exception - */ - public static void doHandleException(PgSubmission submission, Throwable ex) { - if (ex instanceof ErrorPacket) { - ErrorPacket e = (ErrorPacket)ex; - int code = 0; - if (e.getField(ErrorResponseField.Types.SQLSTATE_CODE) != null) { - try { - code = Integer.parseInt(e.getField(ErrorResponseField.Types.SQLSTATE_CODE)); - } catch (NumberFormatException ignore) { - // ignored for now - } - } - int position = 0; - if (e.getField(ErrorResponseField.Types.POSITION) != null) { - position = Integer.parseInt(e.getField(ErrorResponseField.Types.POSITION)); - } - ex = new SqlException(e.getMessage(), e, e.getField(ErrorResponseField.Types.SEVERITY), code, null, position); - } - if (!(ex instanceof SqlException)) { - ex = new SqlException(ex.getMessage(), ex, null, 0, null, 0); - } - Consumer errorHandler = submission.getErrorHandler(); - if (errorHandler != null) { - errorHandler.accept(ex); - } - ((CompletableFuture) submission.getCompletionStage()).completeExceptionally(ex); - } - private static AtomicLong nameIndex = new AtomicLong(0); - private final PgSubmission submission; - - private String name; - - private long nextRowNumber = 0; - - /** - * Thread safe as only accessed via network thread. - */ - private Query query = null; + private final String name; /** * Instantiate. - * - * @param submission {@link PgSubmission}. */ - public Portal(PgSubmission submission) { + public Portal() { this.name = "p" + nameIndex.incrementAndGet(); - this.submission = submission; - } - - /** - * Obtains the SQL. - * - * @return SQL. - */ - public String getSql() { - return this.submission.getSql(); - } - - /** - * Obtains the {@link ParameterHolder}. - * - * @return {@link ParameterHolder}. - */ - public ParameterHolder getParameterHolder() { - return this.submission.getHolder(); } /** @@ -107,105 +29,4 @@ public String getPortalName() { return this.name; } - /** - * Handles the {@link Throwable}. - * - * @param ex {@link Throwable}. - */ - public void handleException(Throwable ex) { - doHandleException(this.submission, ex); - } - - /** - * Obtains the possibly associated {@link Query}. - * - * @return {@link Query}. May be null. - */ - Query getQuery() { - return this.query; - } - - /** - * Specifies the {@link Query}. - * - * @param query {@link Query}. - */ - void setQuery(Query query) { - this.query = query; - } - - /** - * Obtains the next row number. - * - * @return Next row number. - */ - long nextRowNumber() { - return this.nextRowNumber++; - } - - /** - * Adds a data row. - * - * @param dataRow {@link DataRow}. - */ - void addDataRow(DataRow dataRow) { - this.submission.addRow(dataRow); - } - - /** - * Flags the command is complete. - * - * @param complete Command is complete. - * @param socketChannel {@link SocketChannel}. - */ - void commandComplete(CommandComplete complete, SocketChannel socketChannel) { - try { - switch (submission.getCompletionType()) { - case COUNT: - submission.finish(new PgCount(complete.getNumberOfRowsAffected())); - break; - case ROW: - submission.finish(null); - break; - case CLOSE: - submission.finish(socketChannel); - break; - case TRANSACTION: - submission.finish(complete.getType()); - break; - case ARRAY_COUNT: - submission.finish(complete.getNumberOfRowsAffected()); - break; - case VOID: - ((CompletableFuture) submission.getCompletionStage()).complete(null); - break; - case PROCESSOR: - submission.finish(null); - break; - case OUT_PARAMETER: - submission.finish(null); - break; - default: - throw new IllegalStateException("Invalid completion type '" + submission.getCompletionType() + "' for " - + this.getClass().getSimpleName()); - } - } catch (Throwable t) { - ((CompletableFuture)submission.getCompletionStage()).completeExceptionally(t); - } - } - - /** - * Some submission types needs multiple rounds of queries before the operation is finished. This function - * returns true if more is needed. - * - * @return true if another query should be sent to the database - * @throws ExecutionException if the bound variables are a future that fails - * @throws InterruptedException if the bound variables are a future that fails - */ - public boolean hasMoreToExecute() throws ExecutionException, InterruptedException { - if (submission.getCompletionType() == ARRAY_COUNT) { - return ((ArrayCountSubmission)submission).hasMoreToExecute(); - } - return false; - } } \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/network/Query.java b/src/main/java/org/postgresql/sql2/communication/network/Query.java index 1e55623..c7cb367 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/Query.java +++ b/src/main/java/org/postgresql/sql2/communication/network/Query.java @@ -1,8 +1,17 @@ package org.postgresql.sql2.communication.network; +import static org.postgresql.sql2.PgSubmission.Types.ARRAY_COUNT; + +import java.nio.channels.SocketChannel; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import org.postgresql.sql2.communication.packets.RowDescription; +import org.postgresql.sql2.PgSubmission; +import org.postgresql.sql2.communication.packets.CommandComplete; +import org.postgresql.sql2.communication.packets.DataRow; +import org.postgresql.sql2.submissions.ArrayCountSubmission; +import org.postgresql.sql2.util.PgCount; /** * Query. @@ -17,29 +26,34 @@ public class Query { private static AtomicInteger nameIndex = new AtomicInteger(0); /** - * Name for the {@link Query}. + * {@link PgSubmission}. */ - private final String name; + private final PgSubmission submission; /** - * Indicates whether parsed. + * {@link QueryReuse}. */ - private boolean isParsed = false; + private final QueryReuse reuse; /** - * Indicates if waiting parse. + * Name for the {@link Query}. */ - private boolean isAwaitingParse = false; + private final String name; /** - * {@link RowDescription}. + * Next row number. */ - private RowDescription rowDescription = null; + private long nextRowNumber = 0; /** * Instantiate. + * + * @param submission {@link PgSubmission}. + * @param reuse {@link QueryReuse}. */ - public Query() { + public Query(PgSubmission submission, QueryReuse reuse) { + this.submission = submission; + this.reuse = reuse; this.name = "q" + nameIndex.incrementAndGet(); } @@ -53,53 +67,95 @@ public String getQueryName() { } /** - * Indicates if parsed. + * Obtains the {@link QueryReuse}. * - * @return Parsed. + * @return {@link QueryReuse}. */ - public boolean isParsed() { - return this.isParsed; + public QueryReuse getReuse() { + return this.reuse; } /** - * Flags that the query has parsed. + * Obtains the {@link PgSubmission}. + * + * @return {@link PgSubmission}. */ - void flagParsed() { - this.isParsed = true; + public PgSubmission getSubmission() { + return this.submission; } /** - * Indicates if waiting on parse. + * Obtains the next row number. * - * @return Waiting on parse. + * @return Next row number. */ - public boolean isWaitingParse() { - return this.isAwaitingParse; + long nextRowNumber() { + return this.nextRowNumber++; } - + /** - * Flags that waiting on parse. + * Adds a data row. + * + * @param dataRow {@link DataRow}. */ - void flagWaitingParse() { - this.isAwaitingParse = true; + void addDataRow(DataRow dataRow) { + this.submission.addRow(dataRow); } /** - * Obtains the {@link RowDescription}. + * Flags the command is complete. * - * @return {@link RowDescription}. + * @param complete Command is complete. + * @param socketChannel {@link SocketChannel}. */ - RowDescription getRowDescription() { - return this.rowDescription; + void commandComplete(CommandComplete complete, SocketChannel socketChannel) { + try { + switch (submission.getCompletionType()) { + case COUNT: + submission.finish(new PgCount(complete.getNumberOfRowsAffected())); + break; + case ROW: + submission.finish(null); + break; + case CLOSE: + submission.finish(socketChannel); + break; + case TRANSACTION: + submission.finish(complete.getType()); + break; + case ARRAY_COUNT: + submission.finish(complete.getNumberOfRowsAffected()); + break; + case VOID: + ((CompletableFuture) submission.getCompletionStage()).complete(null); + break; + case PROCESSOR: + submission.finish(null); + break; + case OUT_PARAMETER: + submission.finish(null); + break; + default: + throw new IllegalStateException( + "Invalid completion type '" + submission.getCompletionType() + "' for " + this.getClass().getSimpleName()); + } + } catch (Throwable t) { + ((CompletableFuture) submission.getCompletionStage()).completeExceptionally(t); + } } /** - * Specifies the {@link RowDescription}. - * - * @param rowDescription {@link RowDescription}. + * Some submission types needs multiple rounds of queries before the operation + * is finished. This function returns true if more is needed. + * + * @return true if another query should be sent to the database + * @throws ExecutionException if the bound variables are a future that fails + * @throws InterruptedException if the bound variables are a future that fails */ - void setRowDescription(RowDescription rowDescription) { - this.rowDescription = rowDescription; + public boolean hasMoreToExecute() throws ExecutionException, InterruptedException { + if (submission.getCompletionType() == ARRAY_COUNT) { + return ((ArrayCountSubmission) submission).hasMoreToExecute(); + } + return false; } - } \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/network/QueryReuse.java b/src/main/java/org/postgresql/sql2/communication/network/QueryReuse.java new file mode 100644 index 0000000..9201d05 --- /dev/null +++ b/src/main/java/org/postgresql/sql2/communication/network/QueryReuse.java @@ -0,0 +1,140 @@ +package org.postgresql.sql2.communication.network; + +import org.postgresql.sql2.communication.packets.RowDescription; + +/** + * Re-use of an SQL query. + * + * @author Daniel Sagenschneider + */ +public class QueryReuse { + + /** + * {@link RowDescription}. + */ + private RowDescription rowDescription = null; + + /** + * Number of times this {@link Query} has been executed. + */ + private int executeCount = 10; // TODO reset to start at 0 + + /** + * {@link Portal}. + */ + private Portal portal = new Portal(); // TODO reset to create + + /** + * Indicates whether parsed. + */ + private boolean isParsed = false; + + /** + * Indicates if waiting parse. + */ + private boolean isAwaitingParse = false; + + /** + * Indicates whether described. + */ + private boolean isDescribed = false; + + /** + * Indicates if waiting describe. + */ + private boolean isAwaitingDescribe = false; + + /** + * Indicates if execute as simple query. + * + * @return true to execute as simple query. + */ + public boolean isSimpleQuery() { + return this.executeCount < 5; // TODO allow configuring + } + + /** + * Specifies the {@link RowDescription}. + * + * @param rowDescription {@link RowDescription}. + */ + public void setRowDescription(RowDescription rowDescription) { + this.rowDescription = rowDescription; + } + + /** + * Obtains the {@link RowDescription}. + * + * @return {@link RowDescription}. + */ + public RowDescription getRowDescription() { + return this.rowDescription; + } + + /** + * Indicates if parsed. + * + * @return Parsed. + */ + public boolean isParsed() { + return this.isParsed; + } + + /** + * Flags that the query has parsed. + */ + void flagParsed() { + this.isParsed = true; + } + + /** + * Indicates if waiting on parse. + * + * @return Waiting on parse. + */ + public boolean isWaitingParse() { + return this.isAwaitingParse; + } + + /** + * Flags that waiting on parse. + */ + void flagWaitingParse() { + this.isAwaitingParse = true; + } + + /** + * Indicates if waiting on describe. + * + * @return Waiting on describe. + */ + public boolean isWaitingDescribe() { + return this.isAwaitingDescribe; + } + + /** + * Flags that waiting on describe. + */ + void flagWaitingDescribe() { + this.isAwaitingDescribe = true; + } + + /** + * Obtains the {@link Portal}. + * + * @return {@link Portal}. + */ + public Portal getPortal() { + return this.portal; + } + + /** + * Convenience method to obtain the {@link Portal} name. + * + * @return {@link Portal} name or unnamed name if no {@link Portal}. + */ + public String getPortalNameOrUnnamed() { + return this.portal == null ? "" : this.portal.getPortalName(); + } + +} \ No newline at end of file diff --git a/src/main/java/org/postgresql/sql2/communication/network/SyncRequest.java b/src/main/java/org/postgresql/sql2/communication/network/SyncRequest.java index 4fb3de3..85b3665 100644 --- a/src/main/java/org/postgresql/sql2/communication/network/SyncRequest.java +++ b/src/main/java/org/postgresql/sql2/communication/network/SyncRequest.java @@ -11,10 +11,11 @@ * @author Daniel Sagenschneider */ public class SyncRequest implements NetworkRequest { - private final Portal portal; + + private final Query query; - public SyncRequest(Portal portal) { - this.portal = portal; + public SyncRequest(Query query) { + this.query = query; } /* @@ -29,8 +30,8 @@ public NetworkRequest write(NetworkWriteContext context) throws Exception { wire.initPacket(); wire.completePacket(); - if (portal.hasMoreToExecute()) { - return new BindRequest<>(portal); + if (this.query.hasMoreToExecute()) { + return new BindRequest<>(this.query); } // Nothing further diff --git a/src/test/java/org/postgresql/sql2/communication/QueryFactoryTest.java b/src/test/java/org/postgresql/sql2/communication/QueryFactoryTest.java new file mode 100644 index 0000000..dd2712f --- /dev/null +++ b/src/test/java/org/postgresql/sql2/communication/QueryFactoryTest.java @@ -0,0 +1,41 @@ +package org.postgresql.sql2.communication; + +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import org.junit.Test; +import org.postgresql.sql2.PgSubmission; +import org.postgresql.sql2.communication.network.Query; +import org.postgresql.sql2.operations.helpers.ParameterHolder; +import org.postgresql.sql2.submissions.RowSubmission; + +/** + * Tests the {@link QueryFactory}. + * + * @author Daniel Sagenschneider + */ +public class QueryFactoryTest { + + private final QueryFactory queryFactory = new QueryFactory(); + + @Test + public void ensureDifferentQueriesNotSameReuse() throws Exception { + Query queryOne = this.queryFactory.createQuery(createSubmission("SELECT 1 as t")); + Query queryTwo = this.queryFactory.createQuery(createSubmission("SELECT 2 as t")); + assertNotSame("Should not re-use as different queries", queryOne.getReuse(), queryTwo.getReuse()); + } + + @Test + public void ensureSameQueryReuse() throws Exception { + Query queryOne = this.queryFactory.createQuery(createSubmission("SELECT 1 as t")); + Query queryTwo = this.queryFactory.createQuery(createSubmission("SELECT 1 as t")); + assertSame("Should re-use as same queries", queryOne.getReuse(), queryTwo.getReuse()); + } + + private static PgSubmission createSubmission(String sql) { + ParameterHolder holder = new ParameterHolder(); + return new RowSubmission<>(() -> Boolean.FALSE, (ex) -> { + }, holder, null, sql); + } + +} \ No newline at end of file diff --git a/src/test/java/org/postgresql/sql2/execution/NioLoopTest.java b/src/test/java/org/postgresql/sql2/execution/NioLoopTest.java index 51047b6..1506c93 100644 --- a/src/test/java/org/postgresql/sql2/execution/NioLoopTest.java +++ b/src/test/java/org/postgresql/sql2/execution/NioLoopTest.java @@ -75,8 +75,12 @@ public void pipelineQueries() throws Exception { final int queryCount = 1000; Submission[] submissions = new Submission[queryCount]; for (int i = 0; i < queryCount; i++) { + final int index = i; submissions[i] = connection.rowOperation("SELECT 1 as t") .collect(CollectorUtils.singleCollector(Integer.class)).submit(); + + // TODO RMEOVE + submissions[i].getCompletionStage().thenRun(() -> System.out.println("-> " + index)); } // Ensure obtain all results @@ -112,10 +116,8 @@ public void reuseNioLoopBetweenConnections() throws Exception { @Test public void reuseNioLoopBetweenDataSources() throws Exception { MockNioLoop loop = new MockNioLoop(); - try ( - DataSource dataSourceOne = createDataSource().connectionProperty(PgConnectionProperty.NIO_LOOP, loop).build(); - DataSource dataSourceTwo = createDataSource().connectionProperty(PgConnectionProperty.NIO_LOOP, loop) - .build()) { + try (DataSource dataSourceOne = createDataSource().connectionProperty(PgConnectionProperty.NIO_LOOP, loop).build(); + DataSource dataSourceTwo = createDataSource().connectionProperty(PgConnectionProperty.NIO_LOOP, loop).build()) { // Run query via each data source DataSource[] dataSources = new DataSource[] { dataSourceOne, dataSourceTwo }; diff --git a/src/test/java/org/postgresql/sql2/util/PreparedStatementCacheTest.java b/src/test/java/org/postgresql/sql2/util/PreparedStatementCacheTest.java deleted file mode 100644 index 263feff..0000000 --- a/src/test/java/org/postgresql/sql2/util/PreparedStatementCacheTest.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.postgresql.sql2.util; - -import org.junit.jupiter.api.Test; -import org.postgresql.sql2.communication.PreparedStatementCache; - -import java.util.Arrays; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -public class PreparedStatementCacheTest { - - @Test - public void getNameForQuery() { - PreparedStatementCache cache = new PreparedStatementCache(); - - String name = cache.getQuery("select 1", Arrays.asList(1, 2)).getQueryName(); - assertEquals(name, cache.getQuery("select 1", Arrays.asList(1, 2)).getQueryName()); - assertEquals(name, cache.getQuery("select 1", Arrays.asList(1, 2)).getQueryName()); - } - - @Test - public void getNameForQueryNull() { - PreparedStatementCache cache = new PreparedStatementCache(); - - assertThrows(IllegalArgumentException.class, () -> cache.getQuery(null, Arrays.asList(1, 2))); - } -} \ No newline at end of file