Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add api for Retrieving unused segments #15415

Merged
merged 21 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public List<DataSegment> retrieveUnusedSegmentsForInterval(
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit, null)) {
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit, null, null)) {
return ImmutableList.copyOf(iterator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
String datasource,
@Nullable Interval interval,
@Nullable Integer limit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be a good idea to go ahead and add a sort order now too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Good suggestion. Added

@Nullable Integer offset
@Nullable Integer offset,
@Nullable Integer orderByStartEnd
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable
}

try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null)) {
queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
timeline.addSegments(Iterators.singletonIterator(dataSegment));
Expand Down Expand Up @@ -956,12 +956,30 @@ public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForD
.transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE));
}

/**
* Retrieves segments for a given datasource that are marked unused and that are *fully contained by* any interval
* in a particular collection of intervals. If the collection of intervals is empty, this method will retrieve all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, copy-pasta here and below? This method only takes a single interval, right? (not a collection)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* unused segments.
*
* This call does not return any information about realtime segments.
*
* @param datasource The name of the datasource
* @param interval The intervals to search over
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param interval The intervals to search over
* @param interval an optional interval to search over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @param limit The limit of segments to return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param limit The limit of segments to return
* @param limit an optional maximum number of results to return. If none is specified, the results are not limited.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @param offset The offset to use when retrieving matching segments.
* @param orderByStartEnd Specifies the order with which to return the matching segments by start time, end time. A
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An enum type would be a better choice for the direction of order by? Similar to OrderByColumnSpec#Direction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Added new enum

* value of less than or equal to 0, specifies a descending order, while a value of greater
* than 0 specifies an ascending order. A null value indicates that order does not matter.

* Returns an iterable.
*/
@Override
public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
String datasource,
@Nullable Interval interval,
@Nullable Integer limit,
@Nullable Integer offset
final String datasource,
@Nullable final Interval interval,
@Nullable final Integer limit,
@Nullable final Integer offset,
@Nullable final Integer orderByStartEnd
)
{
return connector.inReadOnlyTransaction(
Expand All @@ -974,7 +992,7 @@ public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
? Intervals.ONLY_ETERNITY
: Collections.singletonList(interval);
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(datasource, intervals, limit, offset)) {
queryTool.retrieveUnusedSegments(datasource, intervals, limit, offset, orderByStartEnd)) {
return ImmutableList.copyOf(iterator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public CloseableIterator<DataSegment> retrieveUsedSegments(
final Collection<Interval> intervals
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null, null);
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null, null, null);
}

/**
Expand All @@ -133,17 +133,21 @@ public CloseableIterator<DataSegment> retrieveUsedSegments(
* @param offset The offset to use when retrieving matching segments. Note that offset is only applied to a
* single batch - i.e., when the number of intervals is less than {@link #MAX_INTERVALS_PER_BATCH}.
* For multiple batches, the offset parameter is ignored.
* @param orderByStartEnd Specifies the order with which to return the matching segments by start time, end time. A
* value of less than or equal to 0, specifies a descending order, while a value of greater
* than 0 specifies an ascending order. A null value indicates that order does not matter.

* Returns a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegment> retrieveUnusedSegments(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final Integer limit,
@Nullable final Integer offset
@Nullable final Integer offset,
@Nullable final Integer orderByStartEnd
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit, offset);
return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit, offset, orderByStartEnd);
}

/**
Expand Down Expand Up @@ -231,7 +235,15 @@ public int markSegmentsUnused(final String dataSource, final Interval interval)
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
Iterators.transform(
retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true, null, null),
retrieveSegments(
dataSource,
Collections.singletonList(interval),
IntervalMode.CONTAINS,
true,
null,
null,
null
),
DataSegment::getId
)
);
Expand Down Expand Up @@ -367,12 +379,13 @@ private CloseableIterator<DataSegment> retrieveSegments(
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final Integer offset
@Nullable final Integer offset,
@Nullable final Integer orderByStartEnd
)
{
if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage(
retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit, offset)
retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit, offset, orderByStartEnd)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
Expand All @@ -386,7 +399,8 @@ private CloseableIterator<DataSegment> retrieveSegments(
matchMode,
used,
limitPerBatch,
null // don't use offset with multiple batches for now. Note added to javadoc.
null, // don't use offset with multiple batches for now. Note added to javadoc.
orderByStartEnd
);
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent batches or circuit break if
Expand All @@ -411,7 +425,8 @@ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final Integer offset
@Nullable final Integer offset,
@Nullable final Integer orderByStartEnd
)
{
// Check if the intervals all support comparing as strings. If so, bake them into the SQL.
Expand All @@ -424,9 +439,13 @@ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector);
}

abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
if (offset != null) {
sb.append(StringUtils.format(" ORDER BY start, %1$send%1$s", connector.getQuoteString()));
sb.append(StringUtils.format(connector.getOffsetClause(offset)));
if (offset != null || orderByStartEnd != null) {
sb.append(StringUtils.format(" ORDER BY start, %1$send%1$s %2$s",
connector.getQuoteString(),
orderByStartEnd != null && orderByStartEnd <= 0L ? "DESC" : "ASC"));
if (offset != null) {
sb.append(StringUtils.format(connector.getOffsetClause(offset)));
}
}
final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
@Path("/druid/coordinator/v1/metadata")
public class MetadataResource
{
private static final int ASCENDING_ORDER_BY_START_END = 1;
private final SegmentsMetadataManager segmentsMetadataManager;
private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private final AuthorizerMapper authorizerMapper;
Expand Down Expand Up @@ -362,7 +363,8 @@ public Response getUnusedSegmentsInDataSource(
dataSource,
theInterval,
limit,
offset
offset,
ASCENDING_ORDER_BY_START_END
);

final Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,7 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() throws
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
null,
null,
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Expand All @@ -1224,6 +1225,7 @@ public void testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitNoOffset() throws I
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
ImmutableList.of(),
null,
null,
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Expand All @@ -1237,16 +1239,38 @@ public void testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndOffset() thro
markAllSegmentsUnused(new HashSet<>(segments));

int offset = 10;
final List<DataSegment> expectedSegments = segments.stream()
final List<DataSegment> expectedSegmentsAscOrder = segments.stream()
.skip(offset)
.collect(Collectors.toList());
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
ImmutableList.of(),
null,
offset,
null
);
Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size());
Assert.assertTrue(expectedSegmentsAscOrder.containsAll(actualUnusedSegments));

actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
ImmutableList.of(),
null,
offset
offset,
1
);
Assert.assertEquals(expectedSegments.size(), actualUnusedSegments.size());
Assert.assertTrue(expectedSegments.containsAll(actualUnusedSegments));
Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size());
Assert.assertTrue(expectedSegmentsAscOrder.containsAll(actualUnusedSegments));

final List<DataSegment> expectedSegmentsDescOrder = new ArrayList<>(expectedSegmentsAscOrder);
Collections.reverse(expectedSegmentsDescOrder);

actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
ImmutableList.of(),
null,
offset,
-1
);
Assert.assertEquals(expectedSegmentsDescOrder.size(), actualUnusedSegments.size());
Assert.assertTrue(expectedSegmentsDescOrder.containsAll(actualUnusedSegments));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For test coverage, can we also please include new or extend existing tests for the following scenarios:

  1. non-empty interval, non-null limit and non-null offset
  2. multiple intervals, non-null limit and non-null offset
  3. multiple intervals exceeding MAX_INTERVALS_PER_BATCH, non-null limit and non-null offset (the offset in this case would be ignored)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@Test
Expand All @@ -1258,6 +1282,7 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() th
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.size(),
null,
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Expand All @@ -1274,6 +1299,7 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() th
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit,
null,
null
);
Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
Expand All @@ -1291,7 +1317,8 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAn
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit,
offset
offset,
null
);
Assert.assertEquals(segments.size() - offset, actualUnusedSegments.size());
// offset used when number of intervals does not require multiple batches
Expand All @@ -1309,7 +1336,8 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffsetInRang
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit,
offset
offset,
null
);
Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
// offset not used when number of intervals requires multiple batches
Expand All @@ -1325,6 +1353,7 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitOutOfRange()
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.size() + 1,
null,
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Expand All @@ -1344,6 +1373,7 @@ public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws IOExcepti
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
ImmutableList.of(outOfRangeInterval),
null,
null,
null
);
Assert.assertEquals(0, actualUnusedSegments.size());
Expand Down Expand Up @@ -3126,7 +3156,8 @@ private List<DataSegment> createAndGetUsedYearSegments(final int startYear, fina
private ImmutableList<DataSegment> retrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffset(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no offset anymore; maybe we can simply call this method retrieveUnusedSegments?

final List<Interval> intervals,
final Integer limit,
final Integer offset
final Integer offset,
final Integer orderByStartEnd
)
{
return derbyConnector.inReadOnlyTransaction(
Expand All @@ -3138,7 +3169,7 @@ private ImmutableList<DataSegment> retrieveUnusedSegmentsUsingMultipleIntervalsL
derbyConnectorRule.metadataTablesConfigSupplier().get(),
mapper
)
.retrieveUnusedSegments(DS.WIKI, intervals, limit, offset)) {
.retrieveUnusedSegments(DS.WIKI, intervals, limit, offset, orderByStartEnd)) {
return ImmutableList.copyOf(iterator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
String datasource,
@Nullable Interval interval,
@Nullable Integer limit,
@Nullable Integer offset
@Nullable Integer offset,
@Nullable Integer orderByStartEnd
)
{
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ public void testGetUnusedSegmentsInDataSource()
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any());

// test with null datasource name - fails with expected bad datasource name error
Expand Down