Skip to content

Commit

Permalink
Make numCorePartitions as 0 for tombstones (apache#15379)
Browse files Browse the repository at this point in the history
* Make numCorePartitions as 0 in the TombstoneShardSpec.

* fix up test

* Add tombstone core partition tests

* review comment

* Need to register the test shard type to make jackson happy
  • Loading branch information
abhishekrb19 authored Nov 20, 2023
1 parent ba1b6fa commit 470c8ed
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.Objects;

/**
* A shard spec to represent tombstones. Its partition number is always zero and contains 1 core partitions.
* A shard spec to represent tombstones. Its partition number is always zero and contains zero core partitions as it
* contains no data. This allows other shard types appending to an existing {@link TombstoneShardSpec} to exist independently
* in the timeline even if the {@link TombstoneShardSpec} is dropped.
*/
public class TombstoneShardSpec implements ShardSpec
{
Expand Down Expand Up @@ -69,7 +71,7 @@ public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
@JsonProperty("partitions")
public int getNumCorePartitions()
{
return 1;
return 0;
}

@Override
Expand All @@ -88,8 +90,8 @@ public String getType()
public String toString()
{
return "TombstoneShardSpec{" +
"partitionNum=" + 0 +
", partitions=" + 1 +
"partitionNum=" + getPartitionNum() +
", partitions=" + getNumCorePartitions() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void possibleInDomain()
@Test
public void getNumCorePartitions()
{
assertEquals(1, tombstoneShardSpec.getNumCorePartitions());
assertEquals(0, tombstoneShardSpec.getNumCorePartitions());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.metadata;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -42,6 +43,7 @@
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand All @@ -55,6 +57,7 @@
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -1965,7 +1968,7 @@ public void testAllocatePendingSegment()
* - Later, after the above was dropped, another segment on same interval was created by the stream but this
* time there was an integrity violation in the pending segments table because the
* {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)}
* method returned an segment id that already existed in the pending segments table
* method returned a segment id that already existed in the pending segments table
*/
@Test
public void testAllocatePendingSegmentAfterDroppingExistingSegment()
Expand Down Expand Up @@ -2907,6 +2910,110 @@ public void testRetrieveUsedSegmentsAndCreatedDatesFetchesEternityForAnyInterval
Assert.assertEquals(3, resultForEternity.size());
}

@Test
public void testTimelineVisibilityWith0CorePartitionTombstone() throws IOException
{
final Interval interval = Intervals.of("2020/2021");
// Create and commit a tombstone segment
final DataSegment tombstoneSegment = createSegment(
interval,
"version",
new TombstoneShardSpec()
);

final Set<DataSegment> tombstones = new HashSet<>(Collections.singleton(tombstoneSegment));
Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones));

// Allocate and commit a data segment by appending to the same interval
final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment(
DS.WIKI,
"seq",
tombstoneSegment.getVersion(),
interval,
NumberedPartialShardSpec.instance(),
"version",
false
);

Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());
Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions());

final DataSegment dataSegment = createSegment(
interval,
"version",
identifier.getShardSpec()
);
final Set<DataSegment> dataSegments = new HashSet<>(Collections.singleton(dataSegment));
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));

// Mark the tombstone as unused
markAllSegmentsUnused(tombstones);

final Collection<DataSegment> allUsedSegments = coordinator.retrieveAllUsedSegments(
DS.WIKI,
Segments.ONLY_VISIBLE
);

// The appended data segment will still be visible in the timeline since the
// tombstone contains 0 core partitions
SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(allUsedSegments);
Assert.assertEquals(1, segmentTimeline.lookup(interval).size());
Assert.assertEquals(dataSegment, segmentTimeline.lookup(interval).get(0).getObject().getChunk(1).getObject());
}

@Test
public void testTimelineWith1CorePartitionTombstone() throws IOException
{
// Register the old generation tombstone spec for this test.
mapper.registerSubtypes(TombstoneShardSpecWith1CorePartition.class);

final Interval interval = Intervals.of("2020/2021");
// Create and commit an old generation tombstone with 1 core partition
final DataSegment tombstoneSegment = createSegment(
interval,
"version",
new TombstoneShardSpecWith1CorePartition()
);

final Set<DataSegment> tombstones = new HashSet<>(Collections.singleton(tombstoneSegment));
Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones));

// Allocate and commit a data segment by appending to the same interval
final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment(
DS.WIKI,
"seq",
tombstoneSegment.getVersion(),
interval,
NumberedPartialShardSpec.instance(),
"version",
false
);

Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());
Assert.assertEquals(1, identifier.getShardSpec().getNumCorePartitions());

final DataSegment dataSegment = createSegment(
interval,
"version",
identifier.getShardSpec()
);
final Set<DataSegment> dataSegments = new HashSet<>(Collections.singleton(dataSegment));
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));

// Mark the tombstone as unused
markAllSegmentsUnused(tombstones);

final Collection<DataSegment> allUsedSegments = coordinator.retrieveAllUsedSegments(
DS.WIKI,
Segments.ONLY_VISIBLE
);

// The appended data segment will not be visible in the timeline since the old generation
// tombstone contains 1 core partition
SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(allUsedSegments);
Assert.assertEquals(0, segmentTimeline.lookup(interval).size());
}

private static class DS
{
static final String WIKI = "wiki";
Expand Down Expand Up @@ -2936,7 +3043,7 @@ private List<DataSegment> createAndGetUsedYearSegments(final int startYear, fina
}
final Set<DataSegment> segmentsSet = new HashSet<>(segments);
final Set<DataSegment> committedSegments = coordinator.commitSegments(segmentsSet);
Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments)));
Assert.assertTrue(committedSegments.containsAll(segmentsSet));

return segments;
}
Expand All @@ -2961,4 +3068,17 @@ private ImmutableList<DataSegment> retrieveUnusedSegmentsUsingMultipleIntervalsA
}
);
}

/**
* This test-only shard type is to test the behavior of "old generation" tombstones with 1 core partition.
*/
private static class TombstoneShardSpecWith1CorePartition extends TombstoneShardSpec
{
@Override
@JsonProperty("partitions")
public int getNumCorePartitions()
{
return 1;
}
}
}

1 comment on commit 470c8ed

@vercel
Copy link

@vercel vercel bot commented on 470c8ed Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

druid – ./

druid-317brian.vercel.app
druid-git-master-317brian.vercel.app
druid-phi.vercel.app

Please sign in to comment.