Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIR-32016, FIR-32018: implemented blob, clob, stream in PS #394

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ test {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
testLogging {
events 'PASSED', 'FAILED', 'SKIPPED'
exceptionFormat = 'full'
showStackTraces = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import javax.sql.rowset.serial.SerialBlob;
import javax.sql.rowset.serial.SerialClob;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -25,6 +31,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static java.sql.Statement.SUCCESS_NO_INFO;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
Expand Down Expand Up @@ -201,6 +208,76 @@ void shouldInsertAndSelectByteArray() throws SQLException {
}
}

@Test
void shouldInsertAndSelectBlobClob() throws SQLException, IOException {
Car car1 = Car.builder().make("Ford").sales(12345).signature("Henry Ford".getBytes()).build();
Car car2 = Car.builder().make("Tesla").sales(54321).signature("Elon Musk".getBytes()).build();
try (Connection connection = createConnection()) {

try (PreparedStatement statement = connection
.prepareStatement("INSERT INTO prepared_statement_test (sales, make, signature) VALUES (?,?,?)")) {
statement.setLong(1, car1.getSales());
statement.setClob(2, new SerialClob(car1.getMake().toCharArray()));
statement.setBlob(3, new SerialBlob(car1.getSignature()));
statement.addBatch();
statement.setLong(1, car2.getSales());
statement.setClob(2, new SerialClob(car2.getMake().toCharArray()));
statement.setBlob(3, new SerialBlob(car2.getSignature()));
statement.addBatch();
int[] result = statement.executeBatch();
assertArrayEquals(new int[] { SUCCESS_NO_INFO, SUCCESS_NO_INFO }, result);
}

Set<Car> actual = new HashSet<>();
try (Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery("SELECT sales, make, signature FROM prepared_statement_test")) {
while(rs.next()) {
actual.add(Car.builder()
.sales(rs.getInt(1))
.make(new String(new BufferedReader(rs.getClob(2).getCharacterStream()).lines().collect(Collectors.joining(System.lineSeparator()))))
.signature(rs.getBlob(3).getBinaryStream().readAllBytes())
.build());
}
}
assertEquals(Set.of(car1, car2), actual);
}
}

@Test
void shouldInsertAndSelectStreams() throws SQLException, IOException {
Car car1 = Car.builder().make("Ford").sales(12345).signature("Henry Ford".getBytes()).build();
Car car2 = Car.builder().make("Tesla").sales(54321).signature("Elon Musk".getBytes()).build();
try (Connection connection = createConnection()) {

try (PreparedStatement statement = connection
.prepareStatement("INSERT INTO prepared_statement_test (sales, make, signature) VALUES (?,?,?)")) {
statement.setLong(1, car1.getSales());
statement.setCharacterStream(2, new StringReader(car1.getMake()));
statement.setBinaryStream(3, new ByteArrayInputStream(car1.getSignature()));
statement.addBatch();
statement.setLong(1, car2.getSales());
statement.setCharacterStream(2, new StringReader(car2.getMake()));
statement.setBinaryStream(3, new ByteArrayInputStream(car2.getSignature()));
statement.addBatch();
int[] result = statement.executeBatch();
assertArrayEquals(new int[] { SUCCESS_NO_INFO, SUCCESS_NO_INFO }, result);
}

Set<Car> actual = new HashSet<>();
try (Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery("SELECT sales, make, signature FROM prepared_statement_test")) {
while(rs.next()) {
actual.add(Car.builder()
.sales(rs.getInt(1))
.make(new String(rs.getAsciiStream(2).readAllBytes()))
.signature(rs.getBinaryStream(3).readAllBytes())
.build());
}
}
assertEquals(Set.of(car1, car2), actual);
}
}

private QueryResult createExpectedResult(List<List<?>> expectedRows) {
return QueryResult.builder().databaseName(ConnectionInfo.getInstance().getDatabase())
.tableName("prepared_statement_test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.firebolt.jdbc.exception.ExceptionType;
import com.firebolt.jdbc.exception.FireboltException;
import com.firebolt.jdbc.exception.FireboltSQLFeatureNotSupportedException;
import com.firebolt.jdbc.exception.FireboltUnsupportedOperationException;
import com.firebolt.jdbc.metadata.FireboltDatabaseMetadata;
import com.firebolt.jdbc.metadata.FireboltSystemEngineDatabaseMetadata;
import com.firebolt.jdbc.service.FireboltAuthenticationService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.firebolt.jdbc.exception.ExceptionType;
import com.firebolt.jdbc.exception.FireboltException;
import com.firebolt.jdbc.exception.FireboltSQLFeatureNotSupportedException;
import com.firebolt.jdbc.exception.FireboltUnsupportedOperationException;
import com.firebolt.jdbc.resultset.column.Column;
import com.firebolt.jdbc.resultset.column.ColumnType;
import com.firebolt.jdbc.resultset.compress.LZ4InputStream;
Expand Down Expand Up @@ -53,7 +52,6 @@
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -616,17 +614,17 @@ private InputStream getTextStream(int columnIndex, Charset charset) throws SQLEx

@Override
public InputStream getBinaryStream(int columnIndex) throws SQLException {
return ofNullable(getString(columnIndex)).map(String::getBytes).map(ByteArrayInputStream::new).orElse(null);
return ofNullable(getBytes(columnIndex)).map(ByteArrayInputStream::new).orElse(null);
}

@Override
public InputStream getAsciiStream(String columnLabel) throws SQLException {
return getBinaryStream(columnLabel);
return getAsciiStream(findColumn(columnLabel));
}

@Override
public InputStream getUnicodeStream(String columnLabel) throws SQLException {
return getBinaryStream(columnLabel);
return getUnicodeStream(findColumn(columnLabel));
}

@Override
Expand All @@ -641,7 +639,7 @@ public String getCursorName() throws SQLException {

@Override
public Reader getCharacterStream(int columnIndex) throws SQLException {
return ofNullable(getBinaryStream(columnIndex)).map(InputStreamReader::new).orElse(null);
return ofNullable(getUnicodeStream(columnIndex)).map(InputStreamReader::new).orElse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import com.firebolt.jdbc.statement.StatementUtil;
import com.firebolt.jdbc.statement.rawstatement.RawStatementWrapper;
import com.firebolt.jdbc.type.JavaTypeToFireboltSQLString;
import com.firebolt.jdbc.util.InputStreamUtil;
import lombok.CustomLog;
import lombok.NonNull;

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
Expand Down Expand Up @@ -291,9 +293,12 @@ public boolean execute(String sql) throws SQLException {
}

@Override
@NotImplemented
public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
try {
setString(parameterIndex, reader == null ? null : InputStreamUtil.read(reader, length));
} catch (IOException e) {
throw new SQLException(e);
}
}

@Override
Expand All @@ -303,15 +308,13 @@ public void setRef(int parameterIndex, Ref x) throws SQLException {
}

@Override
@NotImplemented
public void setBlob(int parameterIndex, Blob x) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
public void setBlob(int parameterIndex, Blob blob) throws SQLException {
setBytes(parameterIndex, blob == null ? null : blob.getBytes(1, (int)blob.length()));
}

@Override
@NotImplemented
public void setClob(int parameterIndex, Clob x) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
public void setClob(int parameterIndex, Clob clob) throws SQLException {
setString(parameterIndex, clob == null ? null : clob.getSubString(1, (int)clob.length()));
}

@Override
Expand Down Expand Up @@ -345,33 +348,28 @@ public void setRowId(int parameterIndex, RowId x) throws SQLException {
}

@Override
@NotImplemented
public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
setCharacterStream(parameterIndex, value, length);
}

@Override
@NotImplemented
public void setNClob(int parameterIndex, NClob value) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
setClob(parameterIndex, value);
}

@Override
@NotImplemented
public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
}
setCharacterStream(parameterIndex, reader, length);
}

@Override
@NotImplemented
public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
setBinaryStream(parameterIndex, inputStream, length);
}

@Override
@NotImplemented
public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
setClob(parameterIndex, reader, length);
}

@Override
Expand Down Expand Up @@ -402,80 +400,75 @@ private String formatDecimalNumber(Object x, int scaleOrLength) {
}

@Override
@NotImplemented
public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
throw new FireboltUnsupportedOperationException();
setBinaryStream(parameterIndex, x, length);
}

@Override
@NotImplemented
public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
public void setBinaryStream(int parameterIndex, InputStream inputStream, long length) throws SQLException {
setBinaryStream(parameterIndex, inputStream, (int)length);
}

@Override
@NotImplemented
public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
throw new FireboltUnsupportedOperationException();
setCharacterStream(parameterIndex, reader, (int)length);
}

@Override
@NotImplemented
public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
throw new FireboltUnsupportedOperationException();
setBinaryStream(parameterIndex, x);
}

@Override
@NotImplemented
public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
throw new FireboltUnsupportedOperationException();
try {
setBytes(parameterIndex, x == null ? null : x.readAllBytes());
} catch (IOException e) {
throw new SQLException(e);
}
}

@Override
@NotImplemented
public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
setCharacterStream(parameterIndex, reader, Integer.MAX_VALUE);
}

@Override
@NotImplemented
public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
setCharacterStream(parameterIndex, value);
}

@Override
@NotImplemented
public void setClob(int parameterIndex, Reader reader) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
setClob(parameterIndex, reader, Integer.MAX_VALUE);
}

@Override
@NotImplemented
public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
}
setBinaryStream(parameterIndex, inputStream);
}

@Override
@NotImplemented
public void setNClob(int parameterIndex, Reader reader) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
setClob(parameterIndex, reader);
}

@Override
@NotImplemented
public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new FireboltUnsupportedOperationException();
setBinaryStream(parameterIndex, x, length);
}

@Override
@NotImplemented
public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new FireboltSQLFeatureNotSupportedException();
setBinaryStream(parameterIndex, x, length);
}

@Override
@NotImplemented
public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new FireboltUnsupportedOperationException();
public void setBinaryStream(int parameterIndex, InputStream inputStream, int length) throws SQLException {
try {
setBytes(parameterIndex, inputStream == null ? null : inputStream.readNBytes(length));
} catch (IOException e) {
throw new SQLException(e);
}
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/firebolt/jdbc/util/InputStreamUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;

@UtilityClass
@CustomLog
public class InputStreamUtil {
private static final int K_BYTE = 1024;
private static final int BUFFER_SIZE = 8 * K_BYTE;

/**
* Read all bytes from the input stream if the stream is not null
Expand All @@ -27,4 +30,17 @@ public void readAllBytes(@Nullable InputStream is) {
}
}
}

public String read(Reader initialReader, int limit) throws IOException {
char[] arr = new char[BUFFER_SIZE];
StringBuilder buffer = new StringBuilder();
int numCharsRead;
while ((numCharsRead = initialReader.read(arr, 0, arr.length)) != -1) {
buffer.append(arr, 0, numCharsRead);
if (buffer.length() >= limit) {
break;
}
}
return buffer.length() > limit ? buffer.substring(0, limit) : buffer.toString();
}
}
Loading
Loading