Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add api for Retrieving unused segments #15415

Merged
merged 21 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/api-reference/legacy-metadata-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ Returns full segment metadata for a specific segment in the cluster.

Return the tiers that a datasource exists in.

`GET /druid/coordinator/v1/datasources/{dataSourceName}/unusedSegments?interval={interval}&limit={limit}&lastSegmentId={lastSegmentId}&sortOrder={sortOrder}`

Returns a list of unused segments for a datasource in the cluster contained within an optionally specified interval.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the API behavior clear in the absence of all the optional parameters, maybe specify what the defaults for the no interval, no limit, etc. Also, it'd be good to include an example for these params, so a user knows what to specify for params like sortOrder (wish we had an open API spec in Druid :)).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Optional parameters for limit and lastSegmentId can be given as well, to limit results and enable paginated results.
The results may be sorted in either ASC, or DESC order concerning their id, start, and end time, depending on
specifying the sortOrder parameter. The default behavior in the absence of all optional parameters is to return all
unused segments for the given datasource in no guaranteed order.

Example usage: `GET /druid/coordinator/v1/datasources/inline_data/unusedSegments?interval=2023-12-01_2023-12-10&limit=10&lastSegmentId=inline_data_2023-12-03T00%3A00%3A00.000Z_2023-12-04T00%3A00%3A00.000Z_2023-12-09T14%3A16%3A53.738Z&sortOrder=ASC}`

## Intervals

Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` as in `2016-06-27_2016-06-28`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public List<DataSegment> retrieveUnusedSegmentsForInterval(
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) {
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit, null, null)) {
return ImmutableList.copyOf(iterator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasour
boolean requiresLatest
);

/**
* Returns an iterable to go over un-used segments for a given datasource over an optional interval.
* The order in which segments are iterated is from earliest start-time, with ties being broken with earliest end-time
* first. Note: the iteration may not be as trivially cheap as,
* for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it
* iterates the returned iterable only once rather than several times.
*
* @param datasource the name of the datasource.
* @param interval an optional interval to search over. If none is specified, {@link org.apache.druid.java.util.common.Intervals#ETERNITY}
* @param limit an optional maximum number of results to return. If none is specified, the results are not limited.
* @param lastSegmentId an optional last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or {@link SortOrder#ASC}, or < this segment
* lexigraphically if sortOrder is {@link SortOrder#DESC}. If none is specified, no such filter is used.
* @param sortOrder an optional order with which to return the matching segments by id, start time, end time.
* If none is specified, the order of the results is not guarenteed.
*/
Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
String datasource,
@Nullable Interval interval,
@Nullable Integer limit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be a good idea to go ahead and add a sort order now too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Good suggestion. Added

@Nullable String lastSegmentId,
@Nullable SortOrder sortOrder
);

/**
* Retrieves all data source names for which there are segment in the database, regardless of whether those segments
* are used or not. If there are no segments in the database, returns an empty set.
Expand Down
66 changes: 66 additions & 0 deletions server/src/main/java/org/apache/druid/metadata/SortOrder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.StringUtils;

import java.util.Arrays;
import java.util.stream.Collectors;

/**
* Specifies the sort order when doing metadata store queries.
*/
public enum SortOrder
{
ASC("ASC"),

DESC("DESC");

private String value;

SortOrder(String value)
{
this.value = value;
}

@Override
@JsonValue
public String toString()
{
return String.valueOf(value);
}

@JsonCreator
public static SortOrder fromValue(String value)
{
for (SortOrder b : SortOrder.values()) {
if (String.valueOf(b.value).equalsIgnoreCase(String.valueOf(value))) {
return b;
}
}
throw InvalidInput.exception(StringUtils.format(
"Unexpected value[%s] for SortOrder. Possible values are: %s",
value, Arrays.stream(SortOrder.values()).map(SortOrder::toString).collect(Collectors.toList())
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -686,7 +687,7 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable
}

try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(dataSourceName, intervals, null)) {
queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
timeline.addSegments(Iterators.singletonIterator(dataSegment));
Expand Down Expand Up @@ -955,6 +956,50 @@ public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForD
.transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE));
}

/**
* Retrieves segments for a given datasource that are marked unused and that are *fully contained by* an optionally
* specified interval. If the interval specified is null, this method will retrieve all unused segments.
*
* This call does not return any information about realtime segments.
*
* @param datasource The name of the datasource
* @param interval The intervals to search over
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param interval The intervals to search over
* @param interval an optional interval to search over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @param limit The limit of segments to return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param limit The limit of segments to return
* @param limit an optional maximum number of results to return. If none is specified, the results are not limited.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @param lastSegmentId an optional last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or {@link SortOrder#ASC}, or < this
* segment lexigraphically if sortOrder is {@link SortOrder#DESC}. If none is specified, no
* such filter is used.
* @param sortOrder an optional order with which to return the matching segments by id, start time, end time. If
* none is specified, the order of the results is not guarenteed.

* Returns an iterable.
*/
@Override
public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
final String datasource,
@Nullable final Interval interval,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder
)
{
return connector.inReadOnlyTransaction(
(handle, status) -> {
final SqlSegmentsMetadataQuery queryTool =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper);

final List<Interval> intervals =
interval == null
? Intervals.ONLY_ETERNITY
: Collections.singletonList(interval);
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(datasource, intervals, limit, lastSegmentId, sortOrder)) {
return ImmutableList.copyOf(iterator);
}
}
);
}

@Override
public Set<String> retrieveAllDataSourceNames()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public CloseableIterator<DataSegment> retrieveUsedSegments(
final Collection<Interval> intervals
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null);
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null, null, null);
}

/**
Expand All @@ -127,15 +127,26 @@ public CloseableIterator<DataSegment> retrieveUsedSegments(
*
* This call does not return any information about realtime segments.
*
* @param dataSource The name of the datasource
* @param intervals The intervals to search over
* @param limit The limit of segments to return
* @param lastSegmentId the last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or ASC, or < this segment
* lexigraphically if sortOrder is DESC.
* @param sortOrder Specifies the order with which to return the matching segments by start time, end time.
* A null value indicates that order does not matter.

* Returns a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegment> retrieveUnusedSegments(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final Integer limit
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit);
return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit, lastSegmentId, sortOrder);
}

/**
Expand Down Expand Up @@ -223,7 +234,15 @@ public int markSegmentsUnused(final String dataSource, final Interval interval)
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
Iterators.transform(
retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true, null),
retrieveSegments(
dataSource,
Collections.singletonList(interval),
IntervalMode.CONTAINS,
true,
null,
null,
null
),
DataSegment::getId
)
);
Expand Down Expand Up @@ -358,20 +377,30 @@ private CloseableIterator<DataSegment> retrieveSegments(
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder
)
{
if (intervals.isEmpty()) {
if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage(
retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit)
retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
final List<Iterator<DataSegment>> resultingIterators = new ArrayList<>();
Integer limitPerBatch = limit;

for (final List<Interval> intervalList : intervalsLists) {
final UnmodifiableIterator<DataSegment> iterator = retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used, limitPerBatch);
final UnmodifiableIterator<DataSegment> iterator = retrieveSegmentsInIntervalsBatch(
dataSource,
intervalList,
matchMode,
used,
limitPerBatch,
lastSegmentId,
sortOrder
);
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent batches or circuit break if
// we have reached what was requested for.
Expand All @@ -394,7 +423,9 @@ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder
)
{
// Check if the intervals all support comparing as strings. If so, bake them into the SQL.
Expand All @@ -407,11 +438,33 @@ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector);
}

abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
if (lastSegmentId != null) {
sb.append(
StringUtils.format(
" AND id %s :id",
(sortOrder == null || sortOrder == SortOrder.ASC)
? ">"
: "<"
)
);
}

if (sortOrder != null) {
sb.append(StringUtils.format(" ORDER BY id %2$s, start %2$s, %1$send%1$s %2$s",
connector.getQuoteString(),
sortOrder.toString()));
}
final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable()))
.createQuery(StringUtils.format(
sb.toString(),
dbTables.getSegmentsTable()
))
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", used)
.bind("dataSource", dataSource);
if (lastSegmentId != null) {
sql.bind("id", lastSegmentId);
}
if (null != limit) {
sql.setMaxRows(limit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SortOrder;
import org.apache.druid.segment.metadata.AvailableSegmentMetadata;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.DataSourceInformation;
Expand Down Expand Up @@ -334,6 +337,49 @@ public Response getUsedSegmentsInDataSourceForIntervals(
return builder.entity(Collections2.transform(segments, DataSegment::getId)).build();
}

@GET
@Path("/datasources/{dataSourceName}/unusedSegments")
@Produces(MediaType.APPLICATION_JSON)
public Response getUnusedSegmentsInDataSource(
@Context final HttpServletRequest req,
@PathParam("dataSourceName") final String dataSource,
@QueryParam("interval") @Nullable String interval,
@QueryParam("limit") @Nullable Integer limit,
@QueryParam("lastSegmentId") @Nullable final String lastSegmentId,
@QueryParam("sortOrder") @Nullable final String sortOrder
)
{
if (dataSource == null || dataSource.isEmpty()) {
throw InvalidInput.exception("dataSourceName must be non-empty");
}
if (limit != null && limit < 0) {
throw InvalidInput.exception("Invalid limit[%s] specified. Limit must be > 0", limit);
}

SortOrder theSortOrder = sortOrder == null ? null : SortOrder.fromValue(sortOrder);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add a validation check for lastSegmentId if it's non-nul? We could use org.apache.druid.timeline.SegmentId#tryParse utility to validate the id and throw an invalid input exception if the result of the tryParse is null (the id failed to parse)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

final Interval theInterval = interval != null ? Intervals.of(interval.replace('_', '/')) : null;
Iterable<DataSegment> unusedSegments = segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource(
dataSource,
theInterval,
limit,
lastSegmentId,
theSortOrder
);

final Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));

final Iterable<DataSegment> authorizedSegments =
AuthorizationUtils.filterAuthorizedResources(req, unusedSegments, raGenerator, authorizerMapper);
Fixed Show fixed Hide fixed
Dismissed Show dismissed Hide dismissed

// sort by earliest start interval first, then end interval. DataSegment are sorted in this same order due to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this comment here since the sorting actually happens inside iterateAllUnusedSegmentsForDatasource? Also, this is missing last segment id in the sort by criteria

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// how the segment id is generated.
final List<DataSegment> retVal = new ArrayList<>();
authorizedSegments.iterator().forEachRemaining(retVal::add);
return Response.status(Response.Status.OK).entity(retVal).build();
}

@GET
@Path("/datasources/{dataSourceName}/segments/{segmentId}")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Loading