Skip to content

Commit

Permalink
incremental flush
Browse files Browse the repository at this point in the history
  • Loading branch information
alexradzin committed May 27, 2024
1 parent 14fd61a commit bf0548d
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1134,12 +1134,7 @@ public void updateNClob(String columnLabel, NClob nClob) throws SQLException {
@Override
public NClob getNClob(int columnIndex) throws SQLException {
String str = getString(columnIndex);
class FireboltNClob extends SerialClob implements NClob {
public FireboltNClob(char[] ch) throws SQLException {
super(ch);
}
}
return str == null ? null : new FireboltNClob(str.toCharArray());
return str == null ? null : new FireboltClob(str.toCharArray());
}

@Override
Expand Down
25 changes: 20 additions & 5 deletions src/main/java/com/firebolt/jdbc/type/lob/FireboltBlob.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.firebolt.jdbc.type.lob;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Blob;
Expand Down Expand Up @@ -98,22 +99,36 @@ public OutputStream setBinaryStream(long pos) throws SQLException {
isValid(buf);
return new OutputStream() {
private final List<Byte> bytes = new LinkedList<>();
private int from = (int)(pos - 1);
private volatile boolean closed = false;

@Override
public void write(int b) {
public void write(int b) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
bytes.add((byte)b);
}
public void close() {

@Override
public void flush() {
int length = bytes.size();
int newLength = Math.max(buf.length, length + (int)pos - 1);
int newLength = Math.max(buf.length, length + from);
if (newLength > buf.length) {
byte[] newBuf = new byte[newLength];
System.arraycopy(buf, 0, newBuf, 0, buf.length);
buf = newBuf;
}
int i = (int)(pos - 1);
for (byte b : bytes) {
buf[i++] = b;
buf[from++] = b;
}
bytes.clear();
}

@Override
public void close() {
flush();
closed = true;
}
};
}
Expand Down
24 changes: 19 additions & 5 deletions src/main/java/com/firebolt/jdbc/type/lob/FireboltClob.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.firebolt.jdbc.type.lob;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -115,22 +116,35 @@ public OutputStream setAsciiStream(long pos) throws SQLException {
isValid(buf);
return new OutputStream() {
private final List<Character> characters = new LinkedList<>();
private int from = (int)(pos - 1);
private volatile boolean closed = false;

@Override
public void write(int b) {
public void write(int b) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
characters.add((char)b);
}
public void close() {

@Override
public void flush() {
int length = characters.size();
int newLength = Math.max(buf.length, length + (int)pos - 1);
int newLength = Math.max(buf.length, length + from);
if (newLength > buf.length) {
char[] newBuf = new char[newLength];
System.arraycopy(buf, 0, newBuf, 0, buf.length);
buf = newBuf;
}
int i = (int)(pos - 1);
for (char b : characters) {
buf[i++] = b;
buf[from++] = b;
}
characters.clear();
}

public void close() {
flush();
closed = true;
}
};
}
Expand Down
28 changes: 28 additions & 0 deletions src/test/java/com/firebolt/jdbc/type/lob/FireboltBlobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.SQLException;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
Expand Down Expand Up @@ -67,6 +69,32 @@ void binaryStreamToEmptyBlob() throws SQLException, IOException {
assertEquals(str, new String(blob.getBinaryStream().readAllBytes()));
}

@Test
void characterStreamWithFlush() throws SQLException, IOException {
String str = "hello, world!";
Blob blob = new FireboltBlob(str.getBytes());
try (OutputStream os = blob.setBinaryStream(8)) {
os.write("all".getBytes());
assertEquals(str, new String(blob.getBinaryStream().readAllBytes()));
os.flush();
assertEquals("hello, allld!", new String(blob.getBinaryStream().readAllBytes()));
os.write(" ".getBytes());
os.write("people".getBytes());
os.write("!".getBytes());
assertEquals("hello, allld!", new String(blob.getBinaryStream().readAllBytes()));
}
// the rest is flushed automatically when writer is closed
assertEquals("hello, all people!", new String(blob.getBinaryStream().readAllBytes()));
}

@Test
void failedToWriteToClosedWriter() throws SQLException, IOException {
Blob blob = new FireboltBlob();
OutputStream os = blob.setBinaryStream(1);
os.close();
assertThrows(IOException.class, () -> os.write(1));
}

@ParameterizedTest
@MethodSource("replace")
void binaryStreamReplace(String initial, String replacement, int pos, String expected) throws SQLException, IOException {
Expand Down
26 changes: 26 additions & 0 deletions src/test/java/com/firebolt/jdbc/type/lob/FireboltClobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,32 @@ void characterStream() throws SQLException, IOException {
assertEquals(str, readAll(clob.getCharacterStream()));
}

@Test
void characterStreamWithFlush() throws SQLException, IOException {
String str = "hello, world!";
Clob clob = new FireboltClob(str.toCharArray());
try (Writer writer = clob.setCharacterStream(8)) {
writer.write("all");
assertEquals(str, readAll(clob.getCharacterStream()));
writer.flush();
assertEquals("hello, allld!", readAll(clob.getCharacterStream()));
writer.write(" ");
writer.write("people");
writer.write("!");
assertEquals("hello, allld!", readAll(clob.getCharacterStream()));
}
// the rest is flushed automatically when writer is closed
assertEquals("hello, all people!", readAll(clob.getCharacterStream()));
}

@Test
void failedToWriteToClosedWriter() throws SQLException, IOException {
Clob clob = new FireboltClob();
Writer writer = clob.setCharacterStream(1);
writer.close();
assertThrows(IOException.class, () -> writer.write("x"));
}

@ParameterizedTest
@MethodSource("replace")
void binaryStreamReplace(String initial, String replacement, int pos, String expected) throws SQLException, IOException {
Expand Down

0 comments on commit bf0548d

Please sign in to comment.