Skip to content

Commit

Permalink
MongoDB read path enhancements
Browse files Browse the repository at this point in the history
1. Enable read-retries for Mongo
2. Close Mongo connection on exceptions in cursor.hasNext()
3. Close Postgres connections on reaching the end of the result set or an exception occurs
  • Loading branch information
suresh-prakash committed Jan 2, 2024
1 parent 782fddb commit 6ceef74
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public MongoConnectionConfig(

public MongoClientSettings toSettings() {
final MongoClientSettings.Builder settingsBuilder =
MongoClientSettings.builder().applicationName(applicationName()).retryWrites(true);
MongoClientSettings.builder()
.applicationName(applicationName())
.retryWrites(true)
.retryReads(true);

applyClusterSettings(settingsBuilder);
applyConnectionPoolSettings(settingsBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,15 @@ public void close() {

@Override
public boolean hasNext() {
boolean hasNext = !closed && cursor.hasNext();
boolean hasNext;

try {
hasNext = !closed && cursor.hasNext();
} catch (final Exception e) {
close();
throw e;

Check warning on line 740 in document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/mongo/MongoCollection.java#L738-L740

Added lines #L738 - L740 were not covered by tests
}

if (!hasNext) {
close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jdi.InternalException;
import java.io.IOException;
import java.math.BigInteger;
import java.sql.BatchUpdateException;
Expand All @@ -51,7 +52,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest;
import org.hypertrace.core.documentstore.BulkDeleteResult;
Expand Down Expand Up @@ -1277,9 +1277,15 @@ public boolean hasNext() {
hasNext = resultSet.next();
cursorMovedForward = true;
}

if (!hasNext) {
closeResultSet();
}

return hasNext;
} catch (SQLException e) {
LOGGER.error("SQLException iterating documents.", e);
closeResultSet();

Check warning on line 1288 in document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java#L1288

Added line #L1288 was not covered by tests
}
return false;
}
Expand All @@ -1294,6 +1300,7 @@ public Document next() {
cursorMovedForward = false;
return prepareDocument();
} catch (IOException | SQLException e) {
closeResultSet();

Check warning on line 1303 in document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java#L1303

Added line #L1303 was not covered by tests
return JSONDocument.errorDocument(e.getMessage());
}
}
Expand All @@ -1314,10 +1321,20 @@ protected Document prepareDocument() throws SQLException, IOException {
return new JSONDocument(MAPPER.writeValueAsString(jsonNode));
}

@SneakyThrows
protected void closeResultSet() {
try {
if (resultSet != null && !resultSet.isClosed()) {
resultSet.close();
}
} catch (SQLException ex) {
LOGGER.error("Unable to close connection", ex);
throw new InternalException(ex.getMessage());

Check warning on line 1331 in document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java#L1329-L1331

Added lines #L1329 - L1331 were not covered by tests
}
}

@Override
public void close() {
resultSet.close();
closeResultSet();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ void testUpdateAtomicWithFilter_emptyResults() throws IOException, SQLException
when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement);
when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet);
when(mockResultSet.next()).thenReturn(false);
when(mockResultSet.isClosed()).thenReturn(false, true);

final Optional<Document> oldDocument =
postgresCollection.update(
Expand Down Expand Up @@ -492,6 +493,7 @@ void testUpdateBulkWithFilter() throws IOException, SQLException {
when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet);
when(mockResultSet.next()).thenReturn(true).thenReturn(false);
when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData);
when(mockResultSet.isClosed()).thenReturn(false, true);
when(mockResultSetMetaData.getColumnCount()).thenReturn(4);
mockResultSetMetadata();

Expand Down Expand Up @@ -660,6 +662,7 @@ void testUpdateBulkWithFilter_emptyResults() throws IOException, SQLException {
when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement);
when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet);
when(mockResultSet.next()).thenReturn(false);
when(mockResultSet.isClosed()).thenReturn(false, true);

final String updateQuery =
String.format(
Expand Down

0 comments on commit 6ceef74

Please sign in to comment.