diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java index 03885ba3..2803c546 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/mongo/MongoConnectionConfig.java @@ -59,7 +59,10 @@ public MongoConnectionConfig( public MongoClientSettings toSettings() { final MongoClientSettings.Builder settingsBuilder = - MongoClientSettings.builder().applicationName(applicationName()).retryWrites(true); + MongoClientSettings.builder() + .applicationName(applicationName()) + .retryWrites(true) + .retryReads(true); applyClusterSettings(settingsBuilder); applyConnectionPoolSettings(settingsBuilder); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java index 79c41168..b93ae76f 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java @@ -731,7 +731,15 @@ public void close() { @Override public boolean hasNext() { - boolean hasNext = !closed && cursor.hasNext(); + boolean hasNext; + + try { + hasNext = !closed && cursor.hasNext(); + } catch (final Exception e) { + close(); + throw e; + } + if (!hasNext) { close(); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index 6bc5075b..ef3fee68 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.annotations.VisibleForTesting; +import com.sun.jdi.InternalException; import java.io.IOException; import java.math.BigInteger; import java.sql.BatchUpdateException; @@ -51,7 +52,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest; import org.hypertrace.core.documentstore.BulkDeleteResult; @@ -1277,9 +1277,15 @@ public boolean hasNext() { hasNext = resultSet.next(); cursorMovedForward = true; } + + if (!hasNext) { + closeResultSet(); + } + return hasNext; } catch (SQLException e) { LOGGER.error("SQLException iterating documents.", e); + closeResultSet(); } return false; } @@ -1294,6 +1300,7 @@ public Document next() { cursorMovedForward = false; return prepareDocument(); } catch (IOException | SQLException e) { + closeResultSet(); return JSONDocument.errorDocument(e.getMessage()); } } @@ -1314,10 +1321,20 @@ protected Document prepareDocument() throws SQLException, IOException { return new JSONDocument(MAPPER.writeValueAsString(jsonNode)); } - @SneakyThrows + protected void closeResultSet() { + try { + if (resultSet != null && !resultSet.isClosed()) { + resultSet.close(); + } + } catch (SQLException ex) { + LOGGER.error("Unable to close connection", ex); + throw new InternalException(ex.getMessage()); + } + } + @Override public void close() { - resultSet.close(); + closeResultSet(); } } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java index ceeb3068..e94f6009 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/PostgresCollectionTest.java @@ -338,6 +338,7 @@ void testUpdateAtomicWithFilter_emptyResults() throws IOException, SQLException when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(false); + when(mockResultSet.isClosed()).thenReturn(false, true); final Optional oldDocument = postgresCollection.update( @@ -492,6 +493,7 @@ void testUpdateBulkWithFilter() throws IOException, SQLException { when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(true).thenReturn(false); when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData); + when(mockResultSet.isClosed()).thenReturn(false, true); when(mockResultSetMetaData.getColumnCount()).thenReturn(4); mockResultSetMetadata(); @@ -660,6 +662,7 @@ void testUpdateBulkWithFilter_emptyResults() throws IOException, SQLException { when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement); when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet); when(mockResultSet.next()).thenReturn(false); + when(mockResultSet.isClosed()).thenReturn(false, true); final String updateQuery = String.format(