diff --git a/document-store/build.gradle.kts b/document-store/build.gradle.kts index cc0028a4..0fafd769 100644 --- a/document-store/build.gradle.kts +++ b/document-store/build.gradle.kts @@ -19,6 +19,8 @@ dependencies { implementation("org.apache.commons:commons-lang3:3.10") implementation("net.jodah:failsafe:2.4.0") implementation("com.google.guava:guava:31.1-jre") + implementation("org.apache.commons:commons-dbcp2:2.9.0") + testImplementation("org.junit.jupiter:junit-jupiter:5.8.2") testImplementation("org.mockito:mockito-core:4.4.0") testImplementation("org.mockito:mockito-junit-jupiter:4.4.0") @@ -27,6 +29,7 @@ dependencies { integrationTestImplementation("com.github.java-json-tools:json-patch:1.13") integrationTestImplementation("org.testcontainers:testcontainers:1.15.2") integrationTestImplementation("org.testcontainers:junit-jupiter:1.15.2") + integrationTestImplementation("org.apache.logging.log4j:log4j-slf4j-impl:2.17.1") } tasks.test { diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java index 7ad951b3..5b5eba76 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreQueryV1Test.java @@ -33,7 +33,6 @@ import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Stream; @@ -154,9 +153,10 @@ public void testFindAll(String dataStoreName) throws IOException { Collection collection = datastore.getCollection(COLLECTION_NAME); Query query = Query.builder().build(); - Iterator resultDocs = collection.find(query); - Utils.assertSizeEqual(resultDocs, "mongo/collection_data.json"); + try (CloseableIterator resultDocs = collection.find(query)) { + Utils.assertSizeEqual(resultDocs, "mongo/collection_data.json"); + } testCountApi(dataStoreName, query, "mongo/collection_data.json"); } @@ -167,11 +167,11 @@ public void testHasNext(String dataStoreName) throws IOException { Collection collection = datastore.getCollection(COLLECTION_NAME); Query query = Query.builder().build(); - Iterator resultDocs = collection.find(query); - - Utils.assertSizeEqual(resultDocs, "mongo/collection_data.json"); - // hasNext should not throw error even after cursor is closed - assertFalse(resultDocs.hasNext()); + try (CloseableIterator resultDocs = collection.find(query)) { + Utils.assertSizeEqual(resultDocs, "mongo/collection_data.json"); + // hasNext should not throw error even after cursor is closed + assertFalse(resultDocs.hasNext()); + } } @ParameterizedTest @@ -198,9 +198,10 @@ public void testFindSimple(String dataStoreName) throws IOException { Query query = Query.builder().setSelection(selection).setFilter(filter).build(); - Iterator resultDocs = collection.find(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 5, "mongo/simple_filter_response.json"); + try (CloseableIterator resultDocs = collection.find(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 5, "mongo/simple_filter_response.json"); + } testCountApi(dataStoreName, query, "mongo/simple_filter_response.json"); } @@ -230,9 +231,10 @@ public void testFindWithDuplicateSelections(String dataStoreName) throws IOExcep .build(); Query query = Query.builder().setSelection(selection).setFilter(filter).build(); - Iterator resultDocs = collection.find(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 5, "mongo/simple_filter_response.json"); + try (CloseableIterator resultDocs = collection.find(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 5, "mongo/simple_filter_response.json"); + } testCountApi(dataStoreName, query, "mongo/simple_filter_response.json"); } @@ -277,9 +279,10 @@ public void testFindWithDuplicateSortingAndPagination(String dataStoreName) thro .setPagination(pagination) .build(); - Iterator resultDocs = collection.find(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 3, "mongo/filter_with_sorting_and_pagination_response.json"); + try (CloseableIterator resultDocs = collection.find(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 3, "mongo/filter_with_sorting_and_pagination_response.json"); + } testCountApi(dataStoreName, query, "mongo/filter_with_sorting_and_pagination_response.json"); } @@ -319,9 +322,10 @@ public void testFindWithNestedFields(String dataStoreName) throws IOException { .addSort(IdentifierExpression.of("props.seller.address.city"), ASC) .build(); - Iterator resultDocs = collection.find(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 6, "mongo/filter_on_nested_fields_response.json"); + try (CloseableIterator resultDocs = collection.find(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 6, "mongo/filter_on_nested_fields_response.json"); + } testCountApi(dataStoreName, query, "mongo/filter_on_nested_fields_response.json"); } @@ -333,8 +337,9 @@ public void testAggregateEmpty(String dataStoreName) throws IOException { Collection collection = datastore.getCollection(COLLECTION_NAME); Query query = Query.builder().build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertSizeEqual(resultDocs, "mongo/collection_data.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertSizeEqual(resultDocs, "mongo/collection_data.json"); + } testCountApi(dataStoreName, query, "mongo/collection_data.json"); } @@ -348,9 +353,10 @@ public void testAggregateSimple(String dataStoreName) throws IOException { .addSelection(AggregateExpression.of(COUNT, IdentifierExpression.of("item")), "count") .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 1, "mongo/count_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 1, "mongo/count_response.json"); + } testCountApi(dataStoreName, query, "mongo/count_response.json"); } @@ -366,9 +372,10 @@ public void testOptionalFieldCount(String dataStoreName) throws IOException { "count") .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 1, "mongo/optional_field_count_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 1, "mongo/optional_field_count_response.json"); + } testCountApi(dataStoreName, query, "mongo/optional_field_count_response.json"); } @@ -383,9 +390,10 @@ public void testAggregateWithDuplicateSelections(String dataStoreName) throws IO .addSelection(AggregateExpression.of(COUNT, IdentifierExpression.of("item")), "count") .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 1, "mongo/count_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 1, "mongo/count_response.json"); + } testCountApi(dataStoreName, query, "mongo/count_response.json"); } @@ -424,9 +432,10 @@ public void testAggregateWithFiltersAndOrdering(String dataStoreName) throws IOE .setPagination(Pagination.builder().limit(10).offset(0).build()) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 2, "mongo/sum_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 2, "mongo/sum_response.json"); + } testCountApi(dataStoreName, query, "mongo/sum_response.json"); } @@ -468,9 +477,10 @@ public void testAggregateWithFiltersAndDuplicateOrderingAndDuplicateAggregations .setPagination(Pagination.builder().limit(10).offset(0).build()) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 2, "mongo/sum_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 2, "mongo/sum_response.json"); + } testCountApi(dataStoreName, query, "mongo/sum_response.json"); } @@ -491,21 +501,24 @@ public void testAggregateWithNestedFields(String dataStoreName) throws IOExcepti IdentifierExpression.of("num_items.value"), GT, ConstantExpression.of(1))) .build(); - Iterator resultDocs = collection.aggregate(query); - - if (dataStoreName.equals(POSTGRES_STORE)) { - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 3, "mongo/pg_aggregate_on_nested_fields_response.json"); - testCountApi(dataStoreName, query, "mongo/pg_aggregate_on_nested_fields_response.json"); - } else { - // NOTE that as part of this query, mongo impl returns a null field in the response. However, - // in the rest of the other queries, it's not returning. So, we need to fix this inconsistency - // in mongo impl. we should always return the null field or not. In Postgres, for - // compatibility with the rest - // of the mongo response, it is excluded in {@link PostgresResultIteratorWithMetaData} - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 3, "mongo/aggregate_on_nested_fields_response.json"); - testCountApi(dataStoreName, query, "mongo/aggregate_on_nested_fields_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + + if (dataStoreName.equals(POSTGRES_STORE)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 3, "mongo/pg_aggregate_on_nested_fields_response.json"); + testCountApi(dataStoreName, query, "mongo/pg_aggregate_on_nested_fields_response.json"); + } else { + // NOTE that as part of this query, mongo impl returns a null field in the response. + // However, + // in the rest of the other queries, it's not returning. So, we need to fix this + // inconsistency + // in mongo impl. we should always return the null field or not. In Postgres, for + // compatibility with the rest + // of the mongo response, it is excluded in {@link PostgresResultIteratorWithMetaData} + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 3, "mongo/aggregate_on_nested_fields_response.json"); + testCountApi(dataStoreName, query, "mongo/aggregate_on_nested_fields_response.json"); + } } } @@ -580,9 +593,10 @@ public void testAggregateWithMultipleGroupingLevels(String dataStoreName) throws .addSort(IdentifierExpression.of("item"), DESC) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 2, "mongo/multi_level_grouping_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 2, "mongo/multi_level_grouping_response.json"); + } testCountApi(dataStoreName, query, "mongo/multi_level_grouping_response.json"); } @@ -613,9 +627,10 @@ public void testQueryQ1AggregationFilterAlongWithNonAliasFields(String dataStore .build()) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 4, "mongo/test_aggr_alias_distinct_count_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 4, "mongo/test_aggr_alias_distinct_count_response.json"); + } } @ParameterizedTest @@ -645,9 +660,13 @@ public void testQueryQ1AggregationFilterWithStringAlongWithNonAliasFields(String .build()) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 2, "mongo/test_string_aggr_alias_distinct_count_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, + resultDocs, + 2, + "mongo/test_string_aggr_alias_distinct_count_response.json"); + } } @ParameterizedTest @@ -680,12 +699,13 @@ public void testQueryQ1AggregationFilterWithStringInFilterAlongWithNonAliasField .build()) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, - resultDocs, - 3, - "mongo/test_string_in_filter_aggr_alias_distinct_count_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, + resultDocs, + 3, + "mongo/test_string_in_filter_aggr_alias_distinct_count_response.json"); + } } @ParameterizedTest @@ -701,9 +721,10 @@ public void testQueryV1ForSimpleWhereClause(String dataStoreName) throws IOExcep IdentifierExpression.of("quantity"), NEQ, ConstantExpression.of(10))) .build(); - Iterator iterator = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, iterator, 6, "mongo/simple_filter_quantity_neq_10.json"); + try (CloseableIterator iterator = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, iterator, 6, "mongo/simple_filter_quantity_neq_10.json"); + } } @ParameterizedTest @@ -729,9 +750,10 @@ public void testQueryV1FilterWithNestedFiled(String dataStoreName) throws IOExce .build()) .build(); - Iterator iterator = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, iterator, 1, "mongo/test_nest_field_filter_response.json"); + try (CloseableIterator iterator = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, iterator, 1, "mongo/test_nest_field_filter_response.json"); + } } @ParameterizedTest @@ -767,9 +789,10 @@ public void testQueryV1ForFilterWithLogicalExpressionAndOr(String dataStoreName) .build()) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 6, "mongo/filter_with_logical_and_or_operator.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 6, "mongo/filter_with_logical_and_or_operator.json"); + } } @ParameterizedTest @@ -796,9 +819,10 @@ public void testQueryV1ForSelectionExpression(String dataStoreName) throws IOExc "total") .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 2, "mongo/test_selection_expression_result.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 2, "mongo/test_selection_expression_result.json"); + } } @ParameterizedTest @@ -826,12 +850,13 @@ public void testQueryV1FunctionalSelectionExpressionWithNestedFieldWithAlias(Str "total") .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, - resultDocs, - 2, - "mongo/test_selection_expression_nested_fields_alias_result.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, + resultDocs, + 2, + "mongo/test_selection_expression_nested_fields_alias_result.json"); + } } @ParameterizedTest @@ -862,9 +887,10 @@ public void testQueryV1AggregationExpression(String dataStoreName) throws IOExce .addAggregation(IdentifierExpression.of("item")) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 3, "mongo/test_aggregation_expression_result.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 3, "mongo/test_aggregation_expression_result.json"); + } } @ParameterizedTest @@ -885,9 +911,10 @@ public void testQueryV1AggregationFilter(String dataStoreName) throws IOExceptio IdentifierExpression.of("qty_count"), LTE, ConstantExpression.of(10))) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 4, "mongo/distinct_count_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 4, "mongo/distinct_count_response.json"); + } } @ParameterizedTest @@ -911,9 +938,10 @@ public void testQueryV1AggregationFilterWithWhereClause(String dataStoreName) th IdentifierExpression.of("qty_count"), LTE, ConstantExpression.of(10))) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 2, "mongo/test_aggr_filter_and_where_filter_result.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 2, "mongo/test_aggr_filter_and_where_filter_result.json"); + } } @ParameterizedTest @@ -932,9 +960,10 @@ public void testUnnestWithoutPreserveNullAndEmptyArrays(String dataStoreName) th .addFromClause(UnnestExpression.of(IdentifierExpression.of("sales.medium"), false)) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 11, "mongo/unwind_not_preserving_selection_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 11, "mongo/unwind_not_preserving_selection_response.json"); + } } @ParameterizedTest @@ -965,9 +994,10 @@ public void testUnnestWithoutPreserveNullAndEmptyArraysWithFilters(String dataSt .addFromClause(UnnestExpression.of(IdentifierExpression.of("sales.medium"), false)) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 3, "mongo/unwind_not_preserving_filter_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 3, "mongo/unwind_not_preserving_filter_response.json"); + } } @ParameterizedTest @@ -986,9 +1016,10 @@ public void testUnnestWithPreserveNullAndEmptyArrays(String dataStoreName) throw .addFromClause(UnnestExpression.of(IdentifierExpression.of("sales.medium"), true)) .build(); - Iterator resultDocs = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, resultDocs, 17, "mongo/unwind_preserving_selection_response.json"); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, resultDocs, 17, "mongo/unwind_preserving_selection_response.json"); + } } @ParameterizedTest @@ -1010,9 +1041,10 @@ public void testUnnestAndAggregate(String dataStoreName) throws IOException { .addSort(IdentifierExpression.of("totalSales"), DESC) .build(); - Iterator iterator = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, iterator, 3, "mongo/aggregate_on_nested_array_reponse.json"); + try (CloseableIterator iterator = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, iterator, 3, "mongo/aggregate_on_nested_array_reponse.json"); + } } @ParameterizedTest @@ -1029,9 +1061,10 @@ public void testUnnestAndAggregate_preserveEmptyTrue(String dataStoreName) throw .addFromClause(UnnestExpression.of(IdentifierExpression.of("sales.medium"), true)) .build(); - Iterator iterator = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, iterator, 1, "mongo/unwind_preserving_empty_array_response.json"); + try (CloseableIterator iterator = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, iterator, 1, "mongo/unwind_preserving_empty_array_response.json"); + } } @ParameterizedTest @@ -1053,9 +1086,10 @@ public void testUnnest(String dataStoreName) throws IOException { .addSort(IdentifierExpression.of("sales.medium.type"), DESC) .build(); - Iterator iterator = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, iterator, 17, "mongo/unwind_response.json"); + try (CloseableIterator iterator = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, iterator, 17, "mongo/unwind_response.json"); + } } @ParameterizedTest @@ -1072,9 +1106,10 @@ public void testUnnestAndAggregate_preserveEmptyFalse(String dataStoreName) thro .addFromClause(UnnestExpression.of(IdentifierExpression.of("sales.medium"), true)) .build(); - Iterator iterator = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, iterator, 1, "mongo/unwind_not_preserving_empty_array_response.json"); + try (CloseableIterator iterator = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, iterator, 1, "mongo/unwind_not_preserving_empty_array_response.json"); + } } @ParameterizedTest @@ -1105,9 +1140,10 @@ public void testFilterAndUnnest(String dataStoreName) throws IOException { .addSort(IdentifierExpression.of("sales.medium.type"), DESC) .build(); - Iterator iterator = collection.aggregate(query); - Utils.assertDocsAndSizeEqualWithoutOrder( - dataStoreName, iterator, 7, "mongo/unwind_filter_response.json"); + try (CloseableIterator iterator = collection.aggregate(query)) { + Utils.assertDocsAndSizeEqualWithoutOrder( + dataStoreName, iterator, 7, "mongo/unwind_filter_response.json"); + } } @ParameterizedTest @@ -1130,8 +1166,9 @@ public void testQueryV1DistinctCountWithSortingSpecs(String dataStoreName) throw .addSort(IdentifierExpression.of("item"), DESC) .build(); - Iterator resultDocs = collection.aggregate(query); - assertDocsAndSizeEqual(resultDocs, "mongo/distinct_count_response.json", 4); + try (CloseableIterator resultDocs = collection.aggregate(query)) { + assertDocsAndSizeEqual(resultDocs, "mongo/distinct_count_response.json", 4); + } } @ParameterizedTest @@ -1173,9 +1210,10 @@ public void testFindWithSortingAndPagination(String datastoreName) throws IOExce .setPagination(pagination) .build(); - Iterator resultDocs = collection.find(query); - Utils.assertDocsAndSizeEqual( - resultDocs, "mongo/filter_with_sorting_and_pagination_response.json", 3); + try (CloseableIterator resultDocs = collection.find(query)) { + Utils.assertDocsAndSizeEqual( + resultDocs, "mongo/filter_with_sorting_and_pagination_response.json", 3); + } } private static void testCountApi( diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java index 872aa439..313b2ce8 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/DocStoreTest.java @@ -29,7 +29,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -152,11 +151,7 @@ public void testUpsert(String dataStoreName) throws Exception { Query query = new Query(); query.setFilter(Filter.eq("_id", "default:testKey")); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertFalse(documents.isEmpty()); String persistedDocument = documents.get(0).toJson(); verifyTimeRelatedFieldsPresent(persistedDocument, dataStoreName); @@ -166,10 +161,11 @@ public void testUpsert(String dataStoreName) throws Exception { // Upsert again and verify that created time does not change, while updated time // has changed collection.upsert(new SingleValueKey("default", "testKey"), document); - results = collection.search(query); - documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); + try (CloseableIterator results = collection.search(query)) { + documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } } Assertions.assertFalse(documents.isEmpty()); @@ -310,34 +306,32 @@ public void testWithDifferentFieldTypes(String dataStoreName) throws Exception { Query queryNumericField = new Query(); Filter filter = new Filter(Op.GT, "size", -30); queryNumericField.setFilter(filter); - Iterator results = collection.search(queryNumericField); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, queryNumericField); Assertions.assertEquals(4, documents.size()); // query field having boolean field Query queryBooleanField = new Query(); filter = new Filter(Op.GT, "isCostly", false); queryBooleanField.setFilter(filter); - results = collection.search(queryBooleanField); - documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); + try (CloseableIterator results = collection.search(queryBooleanField)) { + documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } + Assertions.assertEquals(2, documents.size()); } - Assertions.assertEquals(2, documents.size()); // query string field Query queryStringField = new Query(); filter = new Filter(Op.GT, "name", "abc1"); queryStringField.setFilter(filter); - results = collection.search(queryBooleanField); - documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); + try (CloseableIterator results = collection.search(queryBooleanField)) { + documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } + Assertions.assertEquals(2, documents.size()); } - Assertions.assertEquals(2, documents.size()); datastore.deleteCollection(COLLECTION_NAME); } @@ -379,11 +373,7 @@ public void testNotEquals(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Op.NEQ, "_id", "default:testKey3")); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(3, documents.size()); documents.forEach( @@ -400,11 +390,7 @@ public void testNotEquals(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Op.NEQ, "key1", "abc3")); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(3, documents.size()); documents.forEach( document -> { @@ -420,11 +406,7 @@ public void testNotEquals(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Op.NEQ, "key2", "xyz2")); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(3, documents.size()); documents.forEach( document -> { @@ -440,11 +422,7 @@ public void testNotEquals(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Op.NEQ, "subdoc.nestedkey1", "pqr2")); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(3, documents.size()); documents.forEach( document -> { @@ -457,6 +435,16 @@ public void testNotEquals(String dataStoreName) throws IOException { } } + private List getDocuments(Collection collection, Query query) throws IOException { + try (CloseableIterator results = collection.search(query)) { + List documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } + return documents; + } + } + @ParameterizedTest @MethodSource("databaseContextProvider") public void testNotInQueryWithNumberField(String dataStoreName) throws IOException { @@ -523,11 +511,7 @@ public void testNotInQueryWithNumberField(String dataStoreName) throws IOExcepti Query query = new Query(); query.setFilter(new Filter(Filter.Op.NOT_IN, "name", names)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(3, documents.size()); documents.forEach( document -> { @@ -551,10 +535,11 @@ public void testNotInQueryWithNumberField(String dataStoreName) throws IOExcepti f.setOp(Op.OR); f.setChildFilters(filters); query.setFilter(f); - results = collection.search(query); - documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); + try (CloseableIterator results = collection.search(query)) { + documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } } Assertions.assertEquals(4, documents.size()); documents.forEach( @@ -574,10 +559,11 @@ public void testNotInQueryWithNumberField(String dataStoreName) throws IOExcepti query = new Query(); query.setFilter(new Filter(Filter.Op.NOT_IN, "size", sizes)); - results = collection.search(query); - documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); + try (CloseableIterator results = collection.search(query)) { + documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } } Assertions.assertEquals(2, documents.size()); documents.forEach( @@ -600,10 +586,11 @@ public void testNotInQueryWithNumberField(String dataStoreName) throws IOExcepti f.setOp(Op.OR); f.setChildFilters(filters); query.setFilter(f); - results = collection.search(query); - documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); + try (CloseableIterator results = collection.search(query)) { + documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } } Assertions.assertEquals(3, documents.size()); documents.forEach( @@ -622,10 +609,11 @@ public void testNotInQueryWithNumberField(String dataStoreName) throws IOExcepti query = new Query(); query.setFilter(new Filter(Op.NOT_IN, "subdoc.nestedkey1", subDocs)); - results = collection.search(query); - documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); + try (CloseableIterator results = collection.search(query)) { + documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } } assertEquals(3, documents.size()); documents.forEach( @@ -654,11 +642,7 @@ public void testSubDocumentUpdate(String dataStoreName) throws IOException { Query query = new Query(); query.setFilter(Filter.eq(getId(dataStoreName), "default:testKey")); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertFalse(documents.isEmpty()); // mongo @@ -760,36 +744,41 @@ public void bulkUpdateSubDocOnlyForExistingDocuments(String dataStoreName) throw Query query = new Query(); query.setFilter(new Filter(Op.EQ, "_id", key1.toString())); - Iterator it = collection.search(query); - JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson()); - String nestedTimestamp = root.findValue("subDocPath1").toString(); - assertEquals("{\"nested1\":\"100\"}", nestedTimestamp); + try (CloseableIterator it = collection.search(query)) { + JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson()); + String nestedTimestamp = root.findValue("subDocPath1").toString(); + assertEquals("{\"nested1\":\"100\"}", nestedTimestamp); + } query = new Query(); query.setFilter(new Filter(Op.EQ, "_id", key3.toString())); - it = collection.search(query); - root = OBJECT_MAPPER.readTree(it.next().toJson()); - nestedTimestamp = root.findValue("foo3").toString(); - assertEquals("{\"nested3\":\"100\"}", nestedTimestamp); + try (CloseableIterator it = collection.search(query)) { + JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson()); + String nestedTimestamp = root.findValue("foo3").toString(); + assertEquals("{\"nested3\":\"100\"}", nestedTimestamp); + } query = new Query(); query.setFilter(new Filter(Op.EQ, "_id", key4.toString())); - it = collection.search(query); - root = OBJECT_MAPPER.readTree(it.next().toJson()); - String nestedValue = root.findValue("foo4").toString(); - assertEquals("{\"nested4\":{\"someKey\":{}}}", nestedValue); + try (CloseableIterator it = collection.search(query)) { + JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson()); + String nestedValue = root.findValue("foo4").toString(); + assertEquals("{\"nested4\":{\"someKey\":{}}}", nestedValue); + } query = new Query(); query.setFilter(new Filter(Op.EQ, "_id", key5.toString())); - it = collection.search(query); - root = OBJECT_MAPPER.readTree(it.next().toJson()); - nestedValue = root.findValue("foo5").toString(); - assertEquals("{\"nested5\":[]}", nestedValue); + try (CloseableIterator it = collection.search(query)) { + JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson()); + String nestedValue = root.findValue("foo5").toString(); + assertEquals("{\"nested5\":[]}", nestedValue); + } query = new Query(); query.setFilter(new Filter(Op.EQ, "_id", key2.toString())); - it = collection.search(query); - assertFalse(it.hasNext()); + try (CloseableIterator it = collection.search(query)) { + assertFalse(it.hasNext()); + } } @ParameterizedTest @@ -839,11 +828,7 @@ public void testIgnoreCaseLikeQuery(String dataStoreName) throws IOException { for (String searchValue : ignoreCaseSearchValues) { Query query = new Query(); query.setFilter(new Filter(Filter.Op.LIKE, "name", searchValue)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertFalse(documents.isEmpty()); String persistedDocument = documents.get(0).toJson(); JsonNode jsonNode = OBJECT_MAPPER.reader().readTree(persistedDocument); @@ -1008,11 +993,7 @@ public void test_bulkOperationOnArrayValue_setOperation(String dataStoreName) th // get all documents Query query = new Query(); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(4, documents.size()); @@ -1093,11 +1074,7 @@ public void test_bulkOperationOnArrayValue_setOperation_malformedDocs(String dat () -> collection.bulkOperationOnArrayValue(bulkArrayValueUpdateRequest)); // get all documents Query query = new Query(); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(1, documents.size()); @@ -1291,11 +1268,7 @@ public void test_bulkOperationOnArrayValue_addOperation(String dataStoreName) th // get all documents Query query = new Query(); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(4, documents.size()); @@ -1370,11 +1343,7 @@ public void test_bulkOperationOnArrayValue_addOperation_malformedDocs(String dat () -> collection.bulkOperationOnArrayValue(bulkArrayValueUpdateRequest)); // get all documents Query query = new Query(); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(1, documents.size()); @@ -1546,11 +1515,7 @@ public void test_bulkOperationOnArrayValue_removeOperation(String dataStoreName) // get all documents Query query = new Query(); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(4, documents.size()); @@ -1625,11 +1590,7 @@ public void test_bulkOperationOnArrayValue_removeOperation_malformedDocs(String () -> collection.bulkOperationOnArrayValue(bulkArrayValueUpdateRequest)); // get all documents Query query = new Query(); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(1, documents.size()); @@ -1721,11 +1682,7 @@ public void testExistsFilter(String dataStoreName) throws IOException { ImmutablePair.of("city", null))); Query query = new Query(); query.setFilter(new Filter(Op.EXISTS, "city", true)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(documents.size(), 2); } @@ -1766,12 +1723,13 @@ public void testNotExistsFilter(String dataStoreName) throws IOException { ImmutablePair.of("city", null))); Query query = new Query(); query.setFilter(new Filter(Op.EXISTS, "city", false)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); + try (CloseableIterator results = collection.search(query)) { + List documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } + Assertions.assertEquals(documents.size(), 2); } - Assertions.assertEquals(documents.size(), 2); } @ParameterizedTest @@ -1840,11 +1798,7 @@ public void testOffsetAndLimitOrderBy(String dataStoreName) throws IOException { query.addOrderBy(new OrderBy("foo2", true)); query.addOrderBy(new OrderBy("foo3", true)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); assertEquals(2, documents.size()); String persistedDocument1 = documents.get(0).toJson(); @@ -1915,11 +1869,7 @@ public void testInQuery(String dataStoreName) throws IOException { Query query = new Query(); query.setFilter(new Filter(Filter.Op.IN, "name", inArray)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(documents.size(), 2); } @@ -1944,11 +1894,7 @@ public void testSearchForNestedKey(String dataStoreName) throws IOException { Query query = new Query(); query.setFilter( new Filter(Filter.Op.EQ, "attributes.span_id.value.string", "6449f1f720c93a67")); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(documents.size(), 1); } @@ -2002,11 +1948,7 @@ public void testSearch(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Filter.Op.EQ, "amount", 1234)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(1, documents.size()); } @@ -2014,11 +1956,7 @@ public void testSearch(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Filter.Op.EQ, "amount", 1234.5)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(1, documents.size()); } @@ -2026,11 +1964,7 @@ public void testSearch(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Filter.Op.GTE, "amount", 123)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(2, documents.size()); } @@ -2038,11 +1972,7 @@ public void testSearch(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Filter.Op.EQ, "_id", key1.toString())); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(1, documents.size()); } @@ -2050,11 +1980,7 @@ public void testSearch(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Op.EXISTS, "testKeyExist", null)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(2, documents.size()); } @@ -2062,11 +1988,7 @@ public void testSearch(String dataStoreName) throws IOException { { Query query = new Query(); query.setFilter(new Filter(Op.EXISTS, "attributes.trace_id.value.testKeyExistNested", null)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(2, documents.size()); } @@ -2075,11 +1997,7 @@ public void testSearch(String dataStoreName) throws IOException { Query query = new Query(); query.setFilter( new Filter(Op.NOT_EXISTS, "attributes.trace_id.value.testKeyExistNested", null)); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(1, documents.size()); } @@ -2107,11 +2025,7 @@ public void testSearch(String dataStoreName) throws IOException { query.addSelection("entityId"); query.addSelection("entityType"); query.setFilter(new Filter(Filter.Op.EQ, "_id", key1.toString())); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertEquals(1, documents.size()); Map result = OBJECT_MAPPER.readValue(documents.get(0).toJson(), new TypeReference<>() {}); @@ -2137,11 +2051,7 @@ public void testCreateWithMultipleThreads(String dataStoreName) throws Exception // check the inserted document and thread result matches Query query = new Query(); query.setFilter(Filter.eq("_id", documentKey.toString())); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertTrue(documents.size() == 1); Map doc = OBJECT_MAPPER.readValue(documents.get(0).toJson(), Map.class); Assertions.assertEquals(resultMap.get(SUCCESS).get(0).getTestValue(), (int) doc.get("size")); @@ -2173,11 +2083,7 @@ public void testUpdateWithMultipleThreads(String dataStoreName) throws Exception // check the inserted document and thread result matches Query query = new Query(); query.setFilter(Filter.eq("_id", documentKey.toString())); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertTrue(documents.size() == 1); Map doc = OBJECT_MAPPER.readValue(documents.get(0).toJson(), Map.class); Assertions.assertEquals( @@ -2195,11 +2101,7 @@ public void testUpdateWithCondition(String dataStoreName) throws Exception { Filter condition = new Filter(Op.EQ, "isCostly", false); // test that document is inserted if its not exists - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + List documents = getDocuments(collection, query); Assertions.assertTrue(documents.size() == 0); CreateResult createResult = @@ -2226,11 +2128,7 @@ public void testUpdateWithCondition(String dataStoreName) throws Exception { Assertions.assertTrue(updateResult.getUpdatedCount() == 1); - results = collection.search(query); - documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + documents = getDocuments(collection, query); Assertions.assertTrue(documents.size() == 1); Map doc = OBJECT_MAPPER.readValue(documents.get(0).toJson(), Map.class); Assertions.assertEquals(10, (int) doc.get("size")); @@ -2253,11 +2151,7 @@ public void testUpdateWithCondition(String dataStoreName) throws Exception { Assertions.assertTrue(updateResult.getUpdatedCount() == 0); - results = collection.search(query); - documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); - } + documents = getDocuments(collection, query); Assertions.assertTrue(documents.size() == 1); doc = OBJECT_MAPPER.readValue(documents.get(0).toJson(), Map.class); Assertions.assertEquals(10, (int) doc.get("size")); @@ -2315,15 +2209,16 @@ public void testSearchIteratorInterface(String dataStoreName) throws IOException { Query query = new Query(); query.setFilter(new Filter(Filter.Op.EQ, "_id", key1.toString())); - Iterator results = collection.search(query); - if (!results.hasNext()) { - Assertions.fail(); - } - List documents = new ArrayList<>(); - while (results.hasNext()) { - documents.add(results.next()); + try (CloseableIterator results = collection.search(query)) { + if (!results.hasNext()) { + Assertions.fail(); + } + List documents = new ArrayList<>(); + while (results.hasNext()) { + documents.add(results.next()); + } + Assertions.assertEquals(1, documents.size()); } - Assertions.assertEquals(1, documents.size()); } // Search _id field in the document @@ -2331,15 +2226,16 @@ public void testSearchIteratorInterface(String dataStoreName) throws IOException { Query query = new Query(); query.setFilter(new Filter(Filter.Op.EQ, "_id", key1.toString())); - Iterator results = collection.search(query); - List documents = new ArrayList<>(); - while (true) { - documents.add(results.next()); - if (!results.hasNext()) { - break; + try (CloseableIterator results = collection.search(query)) { + List documents = new ArrayList<>(); + while (true) { + documents.add(results.next()); + if (!results.hasNext()) { + break; + } } + Assertions.assertEquals(1, documents.size()); } - Assertions.assertEquals(1, documents.size()); } } @@ -2371,8 +2267,9 @@ public void whenBulkUpdatingNonExistentRecords_thenExpectNothingToBeUpdatedOrCre Query query = new Query(); query.setFilter( new Filter(Op.EQ, "_id", new SingleValueKey("tenant-1", "testKey1").toString())); - Iterator it = collection.search(query); - assertFalse(it.hasNext()); + try (CloseableIterator it = collection.search(query)) { + assertFalse(it.hasNext()); + } } @ParameterizedTest @@ -2411,10 +2308,11 @@ public void whenBulkUpdatingExistingRecords_thenExpectOnlyRecordsWhoseConditions Query query = new Query(); query.setFilter( new Filter(Op.EQ, "_id", new SingleValueKey("tenant-1", "testKey1").toString())); - Iterator it = collection.search(query); - JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson()); - Long timestamp = root.findValue("timestamp").asLong(); - Assertions.assertEquals(110, timestamp); + try (CloseableIterator it = collection.search(query)) { + JsonNode root = OBJECT_MAPPER.readTree(it.next().toJson()); + Long timestamp = root.findValue("timestamp").asLong(); + Assertions.assertEquals(110, timestamp); + } } private Map> executeCreateUpdateThreads( @@ -2497,13 +2395,4 @@ static String getId(String dataStoreName) { return "id"; } } - - private static void assertSizeEqual(Iterator documents, int expectedSize) { - int actualSize = 0; - while (documents.hasNext()) { - documents.next(); - actualSize++; - } - assertEquals(expectedSize, actualSize); - } } diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java index e3388390..69d8adf1 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/postgres/PostgresDocStoreTest.java @@ -11,10 +11,10 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; +import org.hypertrace.core.documentstore.CloseableIterator; import org.hypertrace.core.documentstore.Collection; import org.hypertrace.core.documentstore.Datastore; import org.hypertrace.core.documentstore.DatastoreProvider; @@ -93,7 +93,7 @@ public void testInitWithDatabase() { datastore.init(config); try { - DatabaseMetaData metaData = datastore.getPostgresClient().getMetaData(); + DatabaseMetaData metaData = datastore.getPostgresClient().getConnection().getMetaData(); Assertions.assertEquals(metaData.getURL(), connectionUrl + database); Assertions.assertEquals(metaData.getUserName(), user); } catch (SQLException e) { @@ -178,17 +178,21 @@ public void testBulkUpsertAndReturn() throws IOException { new SingleValueKey("default", "testKey6"), Utils.createDocument("email", "bob@example.com")); - Iterator iterator = collection.bulkUpsertAndReturnOlderDocuments(bulkMap); - // Initially there shouldn't be any documents. - Assertions.assertFalse(iterator.hasNext()); + try (CloseableIterator iterator = + collection.bulkUpsertAndReturnOlderDocuments(bulkMap)) { + // Initially there shouldn't be any documents. + Assertions.assertFalse(iterator.hasNext()); + } // The operation should be idempotent, so go ahead and try again. - iterator = collection.bulkUpsertAndReturnOlderDocuments(bulkMap); - List documents = new ArrayList<>(); - while (iterator.hasNext()) { - documents.add(iterator.next()); + try (CloseableIterator iterator = + collection.bulkUpsertAndReturnOlderDocuments(bulkMap)) { + List documents = new ArrayList<>(); + while (iterator.hasNext()) { + documents.add(iterator.next()); + } + Assertions.assertEquals(6, documents.size()); } - Assertions.assertEquals(6, documents.size()); { // empty query returns all the documents diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java new file mode 100644 index 00000000..666900eb --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java @@ -0,0 +1,78 @@ +package org.hypertrace.core.documentstore.postgres; + +import java.sql.Connection; +import java.sql.SQLException; +import java.time.Duration; +import javax.sql.DataSource; +import org.apache.commons.dbcp2.ConnectionFactory; +import org.apache.commons.dbcp2.DriverManagerConnectionFactory; +import org.apache.commons.dbcp2.PoolableConnection; +import org.apache.commons.dbcp2.PoolableConnectionFactory; +import org.apache.commons.dbcp2.PoolingDataSource; +import org.apache.commons.pool2.impl.AbandonedConfig; +import org.apache.commons.pool2.impl.GenericObjectPool; + +class PostgresClient { + + private final DataSource dataSource; + + PostgresClient( + String url, + String user, + String password, + int maxConnections, + Duration maxWaitTime, + Duration removeAbandonedTimeout) { + this.dataSource = + createPooledDataSource( + url, user, password, maxConnections, maxWaitTime, removeAbandonedTimeout); + } + + private DataSource createPooledDataSource( + String url, + String user, + String password, + int maxConnections, + Duration maxWaitTime, + Duration removeAbandonedTimeout) { + ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(url, user, password); + PoolableConnectionFactory poolableConnectionFactory = + new PoolableConnectionFactory(connectionFactory, null); + GenericObjectPool connectionPool = + new GenericObjectPool<>(poolableConnectionFactory); + connectionPool.setMaxTotal(maxConnections); + // max idle connections are 20% of max connections + connectionPool.setMaxIdle(getPercentOf(maxConnections, 20)); + // min idle connections are 10% of max connections + connectionPool.setMinIdle(getPercentOf(maxConnections, 10)); + connectionPool.setBlockWhenExhausted(true); + connectionPool.setMaxWaitMillis(maxWaitTime.toMillis()); + + // set the abandoned config for connection pool + AbandonedConfig abandonedConfig = new AbandonedConfig(); + abandonedConfig.setLogAbandoned(true); + abandonedConfig.setRemoveAbandonedOnBorrow(true); + abandonedConfig.setRequireFullStackTrace(true); + abandonedConfig.setRemoveAbandonedTimeout(removeAbandonedTimeout); + connectionPool.setAbandonedConfig(abandonedConfig); + + poolableConnectionFactory.setPool(connectionPool); + poolableConnectionFactory.setValidationQuery("SELECT 1"); + poolableConnectionFactory.setValidationQueryTimeout(5); + poolableConnectionFactory.setDefaultReadOnly(false); + poolableConnectionFactory.setDefaultAutoCommit(true); + poolableConnectionFactory.setDefaultTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + poolableConnectionFactory.setPoolStatements(false); + return new PoolingDataSource<>(connectionPool); + } + + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + private int getPercentOf(int maxConnections, int percent) { + int value = (maxConnections * percent) / 100; + // minimum value should be 1 + return Math.max(value, 1); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java index be86284c..c785ca6c 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresCollection.java @@ -28,7 +28,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest; import org.hypertrace.core.documentstore.BulkDeleteResult; @@ -63,18 +62,19 @@ public class PostgresCollection implements Collection { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final CloseableIterator EMPTY_ITERATOR = createEmptyIterator(); - private final Connection client; + private final PostgresClient client; private final String collectionName; - public PostgresCollection(Connection client, String collectionName) { + public PostgresCollection(PostgresClient client, String collectionName) { this.client = client; this.collectionName = collectionName; } @Override public boolean upsert(Key key, Document document) throws IOException { - try (PreparedStatement preparedStatement = - client.prepareStatement(getUpsertSQL(), Statement.RETURN_GENERATED_KEYS)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + connection.prepareStatement(getUpsertSQL(), Statement.RETURN_GENERATED_KEYS)) { String jsonString = prepareDocument(key, document); preparedStatement.setString(1, key.toString()); preparedStatement.setString(2, jsonString); @@ -110,8 +110,10 @@ public UpdateResult update(Key key, Document document, Filter condition) throws } } - try (PreparedStatement preparedStatement = - buildPreparedStatement(upsertQueryBuilder.toString(), paramsBuilder.build())) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + buildPreparedStatement( + connection, upsertQueryBuilder.toString(), paramsBuilder.build())) { int result = preparedStatement.executeUpdate(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Write result: {}", result); @@ -126,8 +128,9 @@ public UpdateResult update(Key key, Document document, Filter condition) throws /** create a new document if one doesn't exists with key */ @Override public CreateResult create(Key key, Document document) throws IOException { - try (PreparedStatement preparedStatement = - client.prepareStatement(getInsertSQL(), Statement.RETURN_GENERATED_KEYS)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + connection.prepareStatement(getInsertSQL(), Statement.RETURN_GENERATED_KEYS)) { String jsonString = prepareDocument(key, document); preparedStatement.setString(1, key.toString()); preparedStatement.setString(2, jsonString); @@ -225,8 +228,9 @@ private boolean updateSubDocInternal(Key key, String subDocPath, Document subDoc String jsonSubDocPath = getJsonSubDocPath(subDocPath); String jsonString = subDocument.toJson(); - try (PreparedStatement preparedStatement = - client.prepareStatement(updateSubDocSQL, Statement.RETURN_GENERATED_KEYS)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + connection.prepareStatement(updateSubDocSQL, Statement.RETURN_GENERATED_KEYS)) { preparedStatement.setString(1, jsonSubDocPath); preparedStatement.setString(2, jsonString); preparedStatement.setString(3, key.toString()); @@ -271,8 +275,8 @@ private BulkUpdateSubDocsInternalResult bulkUpdateSubDocsInternal( String.format( "UPDATE %s SET %s=jsonb_set(%s, ?::text[], ?::jsonb) WHERE %s = ?", collectionName, DOCUMENT, DOCUMENT, ID); - try { - PreparedStatement preparedStatement = client.prepareStatement(updateSubDocSQL); + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(updateSubDocSQL)) { for (Key key : documents.keySet()) { orderList.add(key); Map subDocuments = documents.get(key); @@ -310,16 +314,18 @@ public BulkUpdateResult bulkOperationOnArrayValue(BulkArrayValueUpdateRequest re subDocs.add(getDocAsJSON(subDoc)); } Map idToTenantIdMap = getDocIdToTenantIdMap(request); - CloseableIterator docs = searchDocsForKeys(request.getKeys()); - switch (request.getOperation()) { - case ADD: - return bulkAddOnArrayValue(request.getSubDocPath(), idToTenantIdMap, subDocs, docs); - case SET: - return bulkSetOnArrayValue(request.getSubDocPath(), idToTenantIdMap, subDocs, docs); - case REMOVE: - return bulkRemoveOnArrayValue(request.getSubDocPath(), idToTenantIdMap, subDocs, docs); - default: - throw new UnsupportedOperationException("Unsupported operation: " + request.getOperation()); + try (CloseableIterator docs = searchDocsForKeys(request.getKeys())) { + switch (request.getOperation()) { + case ADD: + return bulkAddOnArrayValue(request.getSubDocPath(), idToTenantIdMap, subDocs, docs); + case SET: + return bulkSetOnArrayValue(request.getSubDocPath(), idToTenantIdMap, subDocs, docs); + case REMOVE: + return bulkRemoveOnArrayValue(request.getSubDocPath(), idToTenantIdMap, subDocs, docs); + default: + throw new UnsupportedOperationException( + "Unsupported operation: " + request.getOperation()); + } } } @@ -357,19 +363,22 @@ public CloseableIterator search(Query query) { } String pgSqlQuery = sqlBuilder.toString(); + Connection connection = null; + PreparedStatement preparedStatement = null; + ResultSet resultSet = null; try { - PreparedStatement preparedStatement = - buildPreparedStatement(pgSqlQuery, paramsBuilder.build()); - LOGGER.debug("Executing search query to PostgresSQL:{}", preparedStatement.toString()); - ResultSet resultSet = preparedStatement.executeQuery(); + connection = client.getConnection(); + preparedStatement = buildPreparedStatement(connection, pgSqlQuery, paramsBuilder.build()); + resultSet = preparedStatement.executeQuery(); CloseableIterator closeableIterator = query.getSelections().size() > 0 - ? new PostgresResultIteratorWithMetaData(resultSet) - : new PostgresResultIterator(resultSet); + ? new PostgresResultIteratorWithMetaData(connection, preparedStatement, resultSet) + : new PostgresResultIterator(connection, preparedStatement, resultSet); return closeableIterator; } catch (SQLException e) { LOGGER.error( "SQLException in querying documents - query: {}, sqlQuery:{}", query, pgSqlQuery, e); + closeAll(connection, preparedStatement, resultSet); } return EMPTY_ITERATOR; @@ -394,15 +403,15 @@ public long count(org.hypertrace.core.documentstore.query.Query query) { collectionName, query); String subQuery = queryParser.parse(); String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) p(count)", subQuery); - try { - PreparedStatement preparedStatement = - buildPreparedStatement(sqlQuery, queryParser.getParamsBuilder().build()); - ResultSet resultSet = preparedStatement.executeQuery(); + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + buildPreparedStatement(connection, sqlQuery, queryParser.getParamsBuilder().build()); + ResultSet resultSet = preparedStatement.executeQuery()) { resultSet.next(); return resultSet.getLong(1); } catch (SQLException e) { LOGGER.error( - "SQLException querying documents. original query: {}, sql query:", query, sqlQuery, e); + "SQLException querying documents. original query: {}, sql query: {}", query, sqlQuery, e); throw new RuntimeException(e); } } @@ -410,7 +419,8 @@ public long count(org.hypertrace.core.documentstore.query.Query query) { @Override public boolean delete(Key key) { String deleteSQL = String.format("DELETE FROM %s WHERE %s = ?", collectionName, ID); - try (PreparedStatement preparedStatement = client.prepareStatement(deleteSQL)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) { preparedStatement.setString(1, key.toString()); preparedStatement.executeUpdate(); return true; @@ -433,9 +443,9 @@ public boolean delete(Filter filter) { throw new UnsupportedOperationException("Parsed filter is invalid"); } sqlBuilder.append(" WHERE ").append(filters); - try { - PreparedStatement preparedStatement = - buildPreparedStatement(sqlBuilder.toString(), paramsBuilder.build()); + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + buildPreparedStatement(connection, sqlBuilder.toString(), paramsBuilder.build())) { int deletedCount = preparedStatement.executeUpdate(); return deletedCount > 0; } catch (SQLException e) { @@ -460,7 +470,8 @@ public BulkDeleteResult delete(Set keys) { .append(ids) .append(")") .toString(); - try (PreparedStatement preparedStatement = client.prepareStatement(deleteSQL)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) { int deletedCount = preparedStatement.executeUpdate(); return new BulkDeleteResult(deletedCount); } catch (SQLException e) { @@ -476,8 +487,9 @@ public boolean deleteSubDoc(Key key, String subDocPath) { "UPDATE %s SET %s=%s #- ?::text[] WHERE %s=?", collectionName, DOCUMENT, DOCUMENT, ID); String jsonSubDocPath = getJsonSubDocPath(subDocPath); - try (PreparedStatement preparedStatement = - client.prepareStatement(deleteSubDocSQL, Statement.RETURN_GENERATED_KEYS)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + connection.prepareStatement(deleteSubDocSQL, Statement.RETURN_GENERATED_KEYS)) { preparedStatement.setString(1, jsonSubDocPath); preparedStatement.setString(2, key.toString()); int resultSet = preparedStatement.executeUpdate(); @@ -497,7 +509,8 @@ public boolean deleteSubDoc(Key key, String subDocPath) { @Override public boolean deleteAll() { String deleteSQL = String.format("DELETE FROM %s", collectionName); - try (PreparedStatement preparedStatement = client.prepareStatement(deleteSQL)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) { preparedStatement.executeUpdate(); return true; } catch (SQLException e) { @@ -510,8 +523,9 @@ public boolean deleteAll() { public long count() { String countSQL = String.format("SELECT COUNT(*) FROM %s", collectionName); long count = -1; - try (PreparedStatement preparedStatement = client.prepareStatement(countSQL)) { - ResultSet resultSet = preparedStatement.executeQuery(); + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(countSQL); + ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { count = resultSet.getLong(1); } @@ -536,9 +550,10 @@ public long total(Query query) { } } - try (PreparedStatement preparedStatement = - buildPreparedStatement(totalSQLBuilder.toString(), paramsBuilder.build())) { - ResultSet resultSet = preparedStatement.executeQuery(); + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + buildPreparedStatement(connection, totalSQLBuilder.toString(), paramsBuilder.build()); + ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { count = resultSet.getLong(1); } @@ -577,6 +592,9 @@ public boolean bulkUpsert(Map documents) { public CloseableIterator bulkUpsertAndReturnOlderDocuments(Map documents) throws IOException { String query = null; + Connection connection = null; + PreparedStatement preparedStatement = null; + ResultSet resultSet = null; try { String collect = documents.keySet().stream() @@ -595,9 +613,9 @@ public CloseableIterator bulkUpsertAndReturnOlderDocuments(Map bulkUpsertAndReturnOlderDocuments(Map getCreatedTime(Key key) throws IOException { - CloseableIterator iterator = searchDocsForKeys(Set.of(key)); - if (iterator.hasNext()) { - JsonNode existingDocument = getDocAsJSON(iterator.next()); - if (existingDocument.has(DocStoreConstants.CREATED_TIME)) { - return Optional.of(existingDocument.get(DocStoreConstants.CREATED_TIME).asLong()); + try (CloseableIterator iterator = searchDocsForKeys(Set.of(key))) { + if (iterator.hasNext()) { + JsonNode existingDocument = getDocAsJSON(iterator.next()); + if (existingDocument.has(DocStoreConstants.CREATED_TIME)) { + return Optional.of(existingDocument.get(DocStoreConstants.CREATED_TIME).asLong()); + } } } return Optional.empty(); @@ -783,19 +805,25 @@ private CloseableIterator executeQueryV1( new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser( collectionName, query); String sqlQuery = queryParser.parse(); + + Connection connection = null; + PreparedStatement preparedStatement = null; + ResultSet resultSet = null; try { - PreparedStatement preparedStatement = - buildPreparedStatement(sqlQuery, queryParser.getParamsBuilder().build()); + connection = client.getConnection(); + preparedStatement = + buildPreparedStatement(connection, sqlQuery, queryParser.getParamsBuilder().build()); LOGGER.debug("Executing executeQueryV1 sqlQuery:{}", preparedStatement.toString()); - ResultSet resultSet = preparedStatement.executeQuery(); + resultSet = preparedStatement.executeQuery(); CloseableIterator closeableIterator = query.getSelections().size() > 0 - ? new PostgresResultIteratorWithMetaData(resultSet) - : new PostgresResultIterator(resultSet); + ? new PostgresResultIteratorWithMetaData(connection, preparedStatement, resultSet) + : new PostgresResultIterator(connection, preparedStatement, resultSet); return closeableIterator; } catch (SQLException e) { LOGGER.error( "SQLException querying documents. original query: {}, sql query:", query, sqlQuery, e); + closeAll(connection, preparedStatement, resultSet); throw new RuntimeException(e); } } @@ -822,8 +850,9 @@ private String getJsonSubDocPath(String subDocPath) { } private int[] bulkUpsertImpl(Map documents) throws SQLException, IOException { - try (PreparedStatement preparedStatement = - client.prepareStatement(getUpsertSQL(), Statement.RETURN_GENERATED_KEYS)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + connection.prepareStatement(getUpsertSQL(), Statement.RETURN_GENERATED_KEYS)) { for (Map.Entry entry : documents.entrySet()) { Key key = entry.getKey(); @@ -900,9 +929,8 @@ private long bulkUpdateRequestsWithoutFilter(List requestsWit throws IOException { // We can batch all requests here since the query is the same. long totalRowsUpdated = 0; - try { - - PreparedStatement ps = client.prepareStatement(getUpdateSQL()); + try (Connection connection = client.getConnection(); + PreparedStatement ps = connection.prepareStatement(getUpdateSQL())) { for (BulkUpdateRequest req : requestsWithoutFilter) { Key key = req.getKey(); @@ -972,9 +1000,9 @@ private long updateLastModifiedTime(Set keys) { "UPDATE %s SET %s=jsonb_set(%s, '{lastUpdatedTime}'::text[], ?::jsonb) WHERE %s=?", collectionName, DOCUMENT, DOCUMENT, ID); long now = System.currentTimeMillis(); - try { - PreparedStatement preparedStatement = - client.prepareStatement(updateSubDocSQL, Statement.RETURN_GENERATED_KEYS); + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = + connection.prepareStatement(updateSubDocSQL, Statement.RETURN_GENERATED_KEYS)) { for (Key key : keys) { preparedStatement.setString(1, String.valueOf(now)); preparedStatement.setString(2, key.toString()); @@ -1012,14 +1040,44 @@ private String getUpsertSQL() { collectionName, ID, DOCUMENT, ID, DOCUMENT); } + private static void closeAll( + Connection connection, PreparedStatement preparedStatement, ResultSet resultSet) { + if (resultSet != null) { + try { + resultSet.close(); + } catch (SQLException ex) { + LOGGER.warn("Error closing result set", ex); + } + } + if (preparedStatement != null) { + try { + preparedStatement.close(); + } catch (SQLException ex) { + LOGGER.warn("Error closing prepared statement", ex); + } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException ex) { + LOGGER.warn("Error closing connection", ex); + } + } + } + static class PostgresResultIterator implements CloseableIterator { protected final ObjectMapper MAPPER = new ObjectMapper(); + private final Connection connection; + private final PreparedStatement preparedStatement; protected ResultSet resultSet; protected boolean cursorMovedForward = false; protected boolean hasNext = false; - public PostgresResultIterator(ResultSet resultSet) { + public PostgresResultIterator( + Connection connection, PreparedStatement preparedStatement, ResultSet resultSet) { + this.connection = connection; + this.preparedStatement = preparedStatement; this.resultSet = resultSet; } @@ -1064,17 +1122,17 @@ protected Document prepareDocument() throws SQLException, IOException { return new JSONDocument(MAPPER.writeValueAsString(jsonNode)); } - @SneakyThrows @Override public void close() { - resultSet.close(); + closeAll(connection, preparedStatement, resultSet); } } static class PostgresResultIteratorWithMetaData extends PostgresResultIterator { - public PostgresResultIteratorWithMetaData(ResultSet resultSet) { - super(resultSet); + public PostgresResultIteratorWithMetaData( + Connection connection, PreparedStatement preparedStatement, ResultSet resultSet) { + super(connection, preparedStatement, resultSet); } @Override diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java index 79d54a83..abd5f344 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresDatastore.java @@ -12,6 +12,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.time.Duration; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -28,8 +29,11 @@ public class PostgresDatastore implements Datastore { private static final String DEFAULT_USER = "postgres"; private static final String DEFAULT_PASSWORD = "postgres"; private static final String DEFAULT_DB_NAME = "postgres"; + private static final int DEFAULT_MAX_CONNECTIONS = 16; + private static final Duration DEFAULT_MAX_WAIT_TIME = Duration.ofSeconds(10); + private static final Duration DEFAULT_REMOVE_ABANDONED_TIMEOUT = Duration.ofSeconds(60); - private Connection client; + private PostgresClient client; private String database; @Override @@ -47,13 +51,24 @@ public boolean init(Config config) { url = String.format("jdbc:postgresql://%s:%s/", hostName, port); } - String DEFAULT_USER = "postgres"; String user = config.hasPath("user") ? config.getString("user") : DEFAULT_USER; String password = config.hasPath("password") ? config.getString("password") : DEFAULT_PASSWORD; + int maxConnections = + config.hasPath("maxConnections") + ? config.getInt("maxConnections") + : DEFAULT_MAX_CONNECTIONS; + Duration maxWaitTime = + config.hasPath("maxWaitTime") ? config.getDuration("maxWaitTime") : DEFAULT_MAX_WAIT_TIME; + Duration removeAbandonedTimeout = + config.hasPath("removeAbandonedTimeout") + ? config.getDuration("removeAbandonedTimeout") + : DEFAULT_REMOVE_ABANDONED_TIMEOUT; String finalUrl = url + this.database; - client = DriverManager.getConnection(finalUrl, user, password); + client = + new PostgresClient( + finalUrl, user, password, maxConnections, maxWaitTime, removeAbandonedTimeout); } catch (IllegalArgumentException e) { throw new IllegalArgumentException( @@ -68,8 +83,8 @@ public boolean init(Config config) { @Override public Set listCollections() { Set collections = new HashSet<>(); - try { - DatabaseMetaData metaData = client.getMetaData(); + try (Connection connection = client.getConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); ResultSet tables = metaData.getTables(null, null, "%", new String[] {"TABLE"}); while (tables.next()) { collections.add(database + "." + tables.getString("TABLE_NAME")); @@ -91,7 +106,8 @@ public boolean createCollection(String collectionName, Map optio + "%s TIMESTAMPTZ NOT NULL DEFAULT NOW()" + ");", collectionName, ID, DOCUMENT, CREATED_AT, UPDATED_AT); - try (PreparedStatement preparedStatement = client.prepareStatement(createTableSQL)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(createTableSQL)) { preparedStatement.executeUpdate(); } catch (SQLException e) { LOGGER.error("Exception creating table name: {}", collectionName); @@ -103,7 +119,8 @@ public boolean createCollection(String collectionName, Map optio @Override public boolean deleteCollection(String collectionName) { String dropTableSQL = String.format("DROP TABLE IF EXISTS %s", collectionName); - try (PreparedStatement preparedStatement = client.prepareStatement(dropTableSQL)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(dropTableSQL)) { int result = preparedStatement.executeUpdate(); return result >= 0; } catch (SQLException e) { @@ -124,7 +141,8 @@ public Collection getCollection(String collectionName) { @Override public boolean healthCheck() { String healtchCheckSQL = "SELECT 1;"; - try (PreparedStatement preparedStatement = client.prepareStatement(healtchCheckSQL)) { + try (Connection connection = client.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(healtchCheckSQL)) { return preparedStatement.execute(); } catch (SQLException e) { LOGGER.error("Exception executing health check"); @@ -132,7 +150,7 @@ public boolean healthCheck() { return false; } - public Connection getPostgresClient() { + public PostgresClient getPostgresClient() { return client; } }