From bf0548d53b0d3091a69f098f621a1fd2d72a53ac Mon Sep 17 00:00:00 2001 From: alexradzin Date: Mon, 27 May 2024 18:41:03 +0300 Subject: [PATCH] incremental flush --- .../jdbc/resultset/FireboltResultSet.java | 7 +---- .../firebolt/jdbc/type/lob/FireboltBlob.java | 25 +++++++++++++---- .../firebolt/jdbc/type/lob/FireboltClob.java | 24 ++++++++++++---- .../jdbc/type/lob/FireboltBlobTest.java | 28 +++++++++++++++++++ .../jdbc/type/lob/FireboltClobTest.java | 26 +++++++++++++++++ 5 files changed, 94 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/firebolt/jdbc/resultset/FireboltResultSet.java b/src/main/java/com/firebolt/jdbc/resultset/FireboltResultSet.java index d18d62626..c7c4b44f0 100644 --- a/src/main/java/com/firebolt/jdbc/resultset/FireboltResultSet.java +++ b/src/main/java/com/firebolt/jdbc/resultset/FireboltResultSet.java @@ -1134,12 +1134,7 @@ public void updateNClob(String columnLabel, NClob nClob) throws SQLException { @Override public NClob getNClob(int columnIndex) throws SQLException { String str = getString(columnIndex); - class FireboltNClob extends SerialClob implements NClob { - public FireboltNClob(char[] ch) throws SQLException { - super(ch); - } - } - return str == null ? null : new FireboltNClob(str.toCharArray()); + return str == null ? null : new FireboltClob(str.toCharArray()); } @Override diff --git a/src/main/java/com/firebolt/jdbc/type/lob/FireboltBlob.java b/src/main/java/com/firebolt/jdbc/type/lob/FireboltBlob.java index 6a8957632..02f08e1f8 100644 --- a/src/main/java/com/firebolt/jdbc/type/lob/FireboltBlob.java +++ b/src/main/java/com/firebolt/jdbc/type/lob/FireboltBlob.java @@ -1,6 +1,7 @@ package com.firebolt.jdbc.type.lob; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.sql.Blob; @@ -98,22 +99,36 @@ public OutputStream setBinaryStream(long pos) throws SQLException { isValid(buf); return new OutputStream() { private final List bytes = new LinkedList<>(); + private int from = (int)(pos - 1); + private volatile boolean closed = false; + @Override - public void write(int b) { + public void write(int b) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } bytes.add((byte)b); } - public void close() { + + @Override + public void flush() { int length = bytes.size(); - int newLength = Math.max(buf.length, length + (int)pos - 1); + int newLength = Math.max(buf.length, length + from); if (newLength > buf.length) { byte[] newBuf = new byte[newLength]; System.arraycopy(buf, 0, newBuf, 0, buf.length); buf = newBuf; } - int i = (int)(pos - 1); for (byte b : bytes) { - buf[i++] = b; + buf[from++] = b; } + bytes.clear(); + } + + @Override + public void close() { + flush(); + closed = true; } }; } diff --git a/src/main/java/com/firebolt/jdbc/type/lob/FireboltClob.java b/src/main/java/com/firebolt/jdbc/type/lob/FireboltClob.java index 86ff9f564..73ae3595c 100644 --- a/src/main/java/com/firebolt/jdbc/type/lob/FireboltClob.java +++ b/src/main/java/com/firebolt/jdbc/type/lob/FireboltClob.java @@ -1,6 +1,7 @@ package com.firebolt.jdbc.type.lob; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.OutputStreamWriter; @@ -115,22 +116,35 @@ public OutputStream setAsciiStream(long pos) throws SQLException { isValid(buf); return new OutputStream() { private final List characters = new LinkedList<>(); + private int from = (int)(pos - 1); + private volatile boolean closed = false; + @Override - public void write(int b) { + public void write(int b) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } characters.add((char)b); } - public void close() { + + @Override + public void flush() { int length = characters.size(); - int newLength = Math.max(buf.length, length + (int)pos - 1); + int newLength = Math.max(buf.length, length + from); if (newLength > buf.length) { char[] newBuf = new char[newLength]; System.arraycopy(buf, 0, newBuf, 0, buf.length); buf = newBuf; } - int i = (int)(pos - 1); for (char b : characters) { - buf[i++] = b; + buf[from++] = b; } + characters.clear(); + } + + public void close() { + flush(); + closed = true; } }; } diff --git a/src/test/java/com/firebolt/jdbc/type/lob/FireboltBlobTest.java b/src/test/java/com/firebolt/jdbc/type/lob/FireboltBlobTest.java index 7c11650ac..f5a81911c 100644 --- a/src/test/java/com/firebolt/jdbc/type/lob/FireboltBlobTest.java +++ b/src/test/java/com/firebolt/jdbc/type/lob/FireboltBlobTest.java @@ -7,7 +7,9 @@ import java.io.IOException; import java.io.OutputStream; +import java.io.Writer; import java.sql.Blob; +import java.sql.Clob; import java.sql.SQLException; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -67,6 +69,32 @@ void binaryStreamToEmptyBlob() throws SQLException, IOException { assertEquals(str, new String(blob.getBinaryStream().readAllBytes())); } + @Test + void characterStreamWithFlush() throws SQLException, IOException { + String str = "hello, world!"; + Blob blob = new FireboltBlob(str.getBytes()); + try (OutputStream os = blob.setBinaryStream(8)) { + os.write("all".getBytes()); + assertEquals(str, new String(blob.getBinaryStream().readAllBytes())); + os.flush(); + assertEquals("hello, allld!", new String(blob.getBinaryStream().readAllBytes())); + os.write(" ".getBytes()); + os.write("people".getBytes()); + os.write("!".getBytes()); + assertEquals("hello, allld!", new String(blob.getBinaryStream().readAllBytes())); + } + // the rest is flushed automatically when writer is closed + assertEquals("hello, all people!", new String(blob.getBinaryStream().readAllBytes())); + } + + @Test + void failedToWriteToClosedWriter() throws SQLException, IOException { + Blob blob = new FireboltBlob(); + OutputStream os = blob.setBinaryStream(1); + os.close(); + assertThrows(IOException.class, () -> os.write(1)); + } + @ParameterizedTest @MethodSource("replace") void binaryStreamReplace(String initial, String replacement, int pos, String expected) throws SQLException, IOException { diff --git a/src/test/java/com/firebolt/jdbc/type/lob/FireboltClobTest.java b/src/test/java/com/firebolt/jdbc/type/lob/FireboltClobTest.java index b383eceb9..9ebdf1280 100644 --- a/src/test/java/com/firebolt/jdbc/type/lob/FireboltClobTest.java +++ b/src/test/java/com/firebolt/jdbc/type/lob/FireboltClobTest.java @@ -82,6 +82,32 @@ void characterStream() throws SQLException, IOException { assertEquals(str, readAll(clob.getCharacterStream())); } + @Test + void characterStreamWithFlush() throws SQLException, IOException { + String str = "hello, world!"; + Clob clob = new FireboltClob(str.toCharArray()); + try (Writer writer = clob.setCharacterStream(8)) { + writer.write("all"); + assertEquals(str, readAll(clob.getCharacterStream())); + writer.flush(); + assertEquals("hello, allld!", readAll(clob.getCharacterStream())); + writer.write(" "); + writer.write("people"); + writer.write("!"); + assertEquals("hello, allld!", readAll(clob.getCharacterStream())); + } + // the rest is flushed automatically when writer is closed + assertEquals("hello, all people!", readAll(clob.getCharacterStream())); + } + + @Test + void failedToWriteToClosedWriter() throws SQLException, IOException { + Clob clob = new FireboltClob(); + Writer writer = clob.setCharacterStream(1); + writer.close(); + assertThrows(IOException.class, () -> writer.write("x")); + } + @ParameterizedTest @MethodSource("replace") void binaryStreamReplace(String initial, String replacement, int pos, String expected) throws SQLException, IOException {