Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into pagination-browser-…
Browse files Browse the repository at this point in the history
…back-btn-issue
  • Loading branch information
shrushti2000 committed Jan 22, 2025
2 parents 3b09893 + 68c3246 commit 95d0144
Show file tree
Hide file tree
Showing 14 changed files with 325 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4650,6 +4650,37 @@ default List<String> listAfter(ListFilter filter, int limit, String afterName, S
return listAfter(
getTableName(), mySqlCondition, postgresCondition, limit, afterName, afterId, groupBy);
}

@SqlQuery(
"SELECT json FROM <table> tn\n"
+ "INNER JOIN (SELECT DISTINCT fromId FROM entity_relationship er\n"
+ "<cond> AND toEntity = 'testSuite' and fromEntity = :entityType) er ON fromId = tn.id\n"
+ "LIMIT :limit OFFSET :offset;")
List<String> listEntitiesWithTestSuite(
@Define("table") String table,
@BindMap Map<String, ?> params,
@Define("cond") String cond,
@Bind("entityType") String entityType,
@Bind("limit") int limit,
@Bind("offset") int offset);

default List<String> listEntitiesWithTestsuite(
ListFilter filter, String table, String entityType, int limit, int offset) {
return listEntitiesWithTestSuite(
table, filter.getQueryParams(), filter.getCondition(), entityType, limit, offset);
}

@SqlQuery(
"SELECT COUNT(DISTINCT fromId) FROM entity_relationship er\n"
+ "<cond> AND toEntity = 'testSuite' and fromEntity = :entityType;")
Integer countEntitiesWithTestSuite(
@BindMap Map<String, ?> params,
@Define("cond") String cond,
@Bind("entityType") String entityType);

default Integer countEntitiesWithTestsuite(ListFilter filter, String entityType) {
return countEntitiesWithTestSuite(filter.getQueryParams(), filter.getCondition(), entityType);
}
}

interface TestCaseDAO extends EntityDAO<TestCase> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
import static org.openmetadata.service.util.EntityUtil.nextVersion;
import static org.openmetadata.service.util.EntityUtil.objectMatch;
import static org.openmetadata.service.util.EntityUtil.tagLabelMatch;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getAfterOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getBeforeOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getOffset;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -180,11 +183,13 @@
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ListWithOffsetFunction;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.RestUtil.DeleteResponse;
import org.openmetadata.service.util.RestUtil.PatchResponse;
import org.openmetadata.service.util.RestUtil.PutResponse;
import org.openmetadata.service.util.ResultList;
import software.amazon.awssdk.utils.Either;

/**
* This is the base class used by Entity Resources to perform READ and WRITE operations to the backend database to
Expand Down Expand Up @@ -767,40 +772,6 @@ public ResultList<T> listAfter(
}
}

public final ResultList<T> listAfterWithSkipFailure(
UriInfo uriInfo, Fields fields, ListFilter filter, int limitParam, String after) {
List<EntityError> errors = new ArrayList<>();
List<T> entities = new ArrayList<>();
int beforeOffset = Integer.parseInt(RestUtil.decodeCursor(after));
int currentOffset = beforeOffset;
int total = dao.listCount(filter);
if (limitParam > 0) {
List<String> jsons = dao.listAfter(filter, limitParam, currentOffset);

for (String json : jsons) {
T parsedEntity = JsonUtils.readValue(json, entityClass);
try {
T entity = setFieldsInternal(parsedEntity, fields);
setInheritedFields(entity, fields);
clearFieldsInternal(entity, fields);
entities.add(withHref(uriInfo, entity));
} catch (Exception e) {
clearFieldsInternal(parsedEntity, fields);
EntityError entityError =
new EntityError().withMessage(e.getMessage()).withEntity(parsedEntity);
errors.add(entityError);
LOG.error("[ListForIndexing] Failed for Entity : {}", entityError);
}
}
currentOffset = currentOffset + limitParam;
String newAfter = currentOffset > total ? null : String.valueOf(currentOffset);
return getResultList(entities, errors, String.valueOf(beforeOffset), newAfter, total);
} else {
// limit == 0 , return total count of entity.
return getResultList(entities, errors, null, null, total);
}
}

@SuppressWarnings("unchecked")
Map<String, String> parseCursorMap(String param) {
Map<String, String> cursorMap;
Expand Down Expand Up @@ -897,6 +868,45 @@ public final EntityHistoryWithOffset listVersionsWithOffset(UUID id, int limit,
new EntityHistory().withEntityType(entityType).withVersions(versions), offset + limit);
}

public final ResultList<T> listWithOffset(
ListWithOffsetFunction<ListFilter, Integer, Integer, List<String>> callable,
Function<ListFilter, Integer> countCallable,
ListFilter filter,
Integer limitParam,
String offset,
boolean skipErrors,
Fields fields,
UriInfo uriInfo) {
List<T> entities = new ArrayList<>();
List<EntityError> errors = new ArrayList<>();

Integer total = countCallable.apply(filter);

int offsetInt = getOffset(offset);
String afterOffset = getAfterOffset(offsetInt, limitParam, total);
String beforeOffset = getBeforeOffset(offsetInt, limitParam);
if (limitParam > 0) {
List<String> jsons = callable.apply(filter, limitParam, offsetInt);
Iterator<Either<T, EntityError>> iterator = serializeJsons(jsons, fields, uriInfo);
while (iterator.hasNext()) {
Either<T, EntityError> either = iterator.next();
if (either.right().isPresent()) {
if (!skipErrors) {
throw new RuntimeException(either.right().get().getMessage());
} else {
errors.add(either.right().get());
LOG.error("[List] Failed for Entity : {}", either.right().get());
}
} else {
entities.add(either.left().get());
}
}
return getResultList(entities, errors, beforeOffset, afterOffset, total);
} else {
return getResultList(entities, errors, null, null, total);
}
}

public final EntityHistory listVersions(UUID id) {
T latest = setFieldsInternal(find(id, ALL), putFields);
setInheritedFields(latest, putFields);
Expand Down Expand Up @@ -1580,6 +1590,23 @@ public final List<String> getResultsFromAndToTimestamps(
.listBetweenTimestampsByOrder(fqn, extension, startTs, endTs, orderBy);
}

@Transaction
public ResultList<T> getEntitiesWithTestSuite(
ListFilter filter, Integer limit, String offset, EntityUtil.Fields fields) {
CollectionDAO.TestSuiteDAO testSuiteDAO = daoCollection.testSuiteDAO();
return listWithOffset(
(filterParam, limitParam, offsetParam) ->
testSuiteDAO.listEntitiesWithTestsuite(
filterParam, dao.getTableName(), entityType, limitParam, offsetParam),
(filterParam) -> testSuiteDAO.countEntitiesWithTestsuite(filterParam, entityType),
filter,
limit,
offset,
false,
fields,
null);
}

@Transaction
public final void deleteExtensionAtTimestamp(String fqn, String extension, Long timestamp) {
daoCollection.entityExtensionTimeSeriesDao().deleteAtTimestamp(fqn, extension, timestamp);
Expand Down Expand Up @@ -3925,4 +3952,36 @@ private Map<UUID, Object> batchFetchExtensions(List<T> entities) {
private List<String> entityListToStrings(List<T> entities) {
return entities.stream().map(EntityInterface::getId).map(UUID::toString).toList();
}

private Iterator<Either<T, EntityError>> serializeJsons(
List<String> jsons, Fields fields, UriInfo uriInfo) {
return new Iterator<>() {
private final Iterator<String> iterator = jsons.iterator();

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public Either<T, EntityError> next() {
String json = iterator.next();
T entity = JsonUtils.readValue(json, entityClass);
try {
setFieldsInternal(entity, fields);
setInheritedFields(entity, fields);
clearFieldsInternal(entity, fields);
if (!nullOrEmpty(uriInfo)) {
entity = withHref(uriInfo, entity);
}
return Either.left(entity);
} catch (Exception e) {
clearFieldsInternal(entity, fields);
EntityError entityError =
new EntityError().withMessage(e.getMessage()).withEntity(entity);
return Either.right(entityError);
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.getEntityFields;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getAfterOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getBeforeOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getOffset;

import java.beans.IntrospectionException;
import java.io.IOException;
Expand Down Expand Up @@ -316,26 +319,6 @@ public void deleteById(UUID id, boolean hardDelete) {
timeSeriesDao.deleteById(id);
}

private String getAfterOffset(int offsetInt, int limit, int total) {
int afterOffset = offsetInt + limit;
// If afterOffset is greater than total, then set it to null to indicate end of list
return afterOffset >= total ? null : String.valueOf(afterOffset);
}

private String getBeforeOffset(int offsetInt, int limit) {
int beforeOffsetInt = offsetInt - limit;
// If offset is negative, then set it to 0 if you pass offset 4 and limit 10, then the previous
// page will be at offset 0
if (beforeOffsetInt < 0) beforeOffsetInt = 0;
// if offsetInt is 0 (i.e. either no offset or offset is 0), then set it to null as there is no
// previous page
return (offsetInt == 0) ? null : String.valueOf(beforeOffsetInt);
}

private int getOffset(String offset) {
return offset != null ? Integer.parseInt(RestUtil.decodeCursor(offset)) : 0;
}

private Map<String, List<?>> getEntityList(List<String> jsons, boolean skipErrors) {
List<T> entityList = new ArrayList<>();
List<EntityError> errors = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.openmetadata.service.util;

import java.util.Objects;
import java.util.function.Function;

@FunctionalInterface
public interface ListWithOffsetFunction<A, B, C, R> {
R apply(A a, B b, C c);

default <V> ListWithOffsetFunction<A, B, C, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (A a, B b, C c) -> after.apply(apply(a, b, c));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.jdbi.v3.sqlobject.SqlObjectPlugin;
import org.jdbi.v3.sqlobject.SqlObjects;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
import org.openmetadata.service.util.RestUtil;

public class JdbiUtils {

Expand Down Expand Up @@ -52,4 +53,24 @@ public static Jdbi createAndSetupJDBI(DataSourceFactory dbFactory) {

return jdbiInstance;
}

public static int getOffset(String offset) {
return offset != null ? Integer.parseInt(RestUtil.decodeCursor(offset)) : 0;
}

public static String getAfterOffset(int offsetInt, int limit, int total) {
int afterOffset = offsetInt + limit;
// If afterOffset is greater than total, then set it to null to indicate end of list
return afterOffset >= total ? null : String.valueOf(afterOffset);
}

public static String getBeforeOffset(int offsetInt, int limit) {
int beforeOffsetInt = offsetInt - limit;
// If offset is negative, then set it to 0 if you pass offset 4 and limit 10, then the previous
// page will be at offset 0
if (beforeOffsetInt < 0) beforeOffsetInt = 0;
// if offsetInt is 0 (i.e. either no offset or offset is 0), then set it to null as there is no
// previous page
return (offsetInt == 0) ? null : String.valueOf(beforeOffsetInt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.EntityDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.RestUtil;
Expand Down Expand Up @@ -96,9 +97,17 @@ private ResultList<? extends EntityInterface> read(String cursor) throws SearchI
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
ResultList<? extends EntityInterface> result;
try {
EntityDAO<?> entityDAO = entityRepository.getDao();
result =
entityRepository.listAfterWithSkipFailure(
null, Entity.getFields(entityType, fields), filter, batchSize, cursor);
entityRepository.listWithOffset(
entityDAO::listAfter,
entityDAO::listCount,
filter,
batchSize,
cursor,
true,
Entity.getFields(entityType, fields),
null);
if (!result.getErrors().isEmpty()) {
lastFailedCursor = this.cursor.get();
if (result.getPaging().getAfter() == null) {
Expand Down Expand Up @@ -154,9 +163,17 @@ public ResultList<? extends EntityInterface> readWithCursor(String currentCursor
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
ResultList<? extends EntityInterface> result;
try {
EntityDAO<?> entityDAO = entityRepository.getDao();
result =
entityRepository.listAfterWithSkipFailure(
null, Entity.getFields(entityType, fields), filter, batchSize, currentCursor);
entityRepository.listWithOffset(
entityDAO::listAfter,
entityDAO::listCount,
filter,
batchSize,
currentCursor,
true,
Entity.getFields(entityType, fields),
null);
LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, result.getData().size(), result.getErrors().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@
"mappings": {
"properties": {
"id": {
"type": "text"
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
Expand Down
Loading

0 comments on commit 95d0144

Please sign in to comment.