-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Marks end criteria reached for the segment if the Index cannot consume more rows #14479
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14479 +/- ##
============================================
+ Coverage 61.75% 64.05% +2.30%
- Complexity 207 1574 +1367
============================================
Files 2436 2687 +251
Lines 133233 147699 +14466
Branches 20636 22634 +1998
============================================
+ Hits 82274 94614 +12340
- Misses 44911 46146 +1235
- Partials 6048 6939 +891
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
@@ -362,6 +363,13 @@ private boolean endCriteriaReached() { | |||
_numRowsConsumed, _numRowsIndexed); | |||
_stopReason = SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED; | |||
return true; | |||
} else if (_thresholdForNumOfColValuesEnabled && _realtimeSegment.isNumOfColValuesAboveThreshold()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, so it is fairly easy to stop consumption and commit
@@ -149,6 +149,7 @@ public enum ControllerResponseStatus { | |||
public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup"; | |||
// Stop reason sent by server as force commit message received | |||
public static final String REASON_FORCE_COMMIT_MESSAGE_RECEIVED = "forceCommitMessageReceived"; | |||
public static final String REASON_NUM_OF_COL_VALUES_ABOVE_THRESHOLD = "numColValuesAboveThreshold"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be always on. We may introduce a config to turn it off if we are not confident about this new logic, but if it is not very complicated we can remove this config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering new impln now where we are keeping conservative size estimation check as well, I guess it makes sense to enable this change only behind a flag?
|
||
if (_thresholdForNumOfColValuesEnabled) { | ||
int prevCount = indexContainer._valuesInfo.getNumValues(); | ||
long newCount = prevCount + 1L + values.length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Total values itself is not enough. We should perform a per-index check (add an api to the MutableIndex
and let it return if it can take more values).
E.g. for MV forward index, if we get 1B values, but each value takes more than 2 bytes, we will run into the same exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mutable index is unbounded right (No code enforced limit)? From the code I see Realtime Mutable index is always created with dictionary (even for MV VarByte col with noDict enabled in config).
Hence from numOfValues
we know that size of mutable index is approx: numOfValues * 4 Bytes
.
But we are more interested in the size of immutable index since that's where exception is being thrown. However Immutable index can be larger or even smaller than mutable index as implementation is completely diff. So while building mutable index we need to keep some state to estimate the approx size of the immutable version of the index (Like while building mutable fwd index, we need to keep an estimation of bitmap, numBitsPerValue, header size, etc).
We should perform a per-index check (add an api to the MutableIndex and let it return if it can take more values).
So this might be adding too much complexity to every mutable index since now after every row consumption we need to update estimated size of the corresponding immutable index and it tight couples code with immutable index logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially what we want to ensure is that whatever accepted in mutable segment won't cause problem when the mutable segment is sealed.
I'm okay if we only limit total values here, but we need to enhance immutable index so that it can hold 2B values regardless of the bit length per value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. Okay, let me figure out if we can even do this on ingestion side considering we have 5 different implementations of immutable forward index for MV columns each having diff size limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can keep threshold near 500 million for numOfValues which will be good enough for few immutable indexes but not for varByte fwd index having 4GB Limit (2GB wasted). However this will ensure we never encounter numOfValues overflow and index limit size reached exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now we can figure out a conservative value that will never cause over 2GB buffer. Once we support larger index, we can increate this value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will pick the task to keep the indexes unbounded post this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good
@@ -1265,6 +1281,10 @@ private boolean isAggregateMetricsEnabled() { | |||
return _recordIdMap != null; | |||
} | |||
|
|||
public boolean isIndexCapacityThresholdBreached() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest name it canAddMore()
to be consistent with the method in each index
public boolean isIndexCapacityThresholdCheckEnabled() { | ||
return _indexCapacityThresholdCheckEnabled; | ||
} | ||
|
||
public void setIndexCapacityThresholdCheckEnabled(boolean indexCapacityThresholdCheckEnabled) { | ||
_indexCapacityThresholdCheckEnabled = indexCapacityThresholdCheckEnabled; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the overhead is very low, I'd suggest making this on by default, or simply remove this flag.
We didn't run into this issue often because it is extremely rare to have more than 450M values in a segment. Before this PR, we usually manually reduce the commit threshold to workaround this, which is not ideal, and the user experience is very bad.
segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.INITIAL_CONSUMING); | ||
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached()); | ||
|
||
Field realtimeSegmentField = RealtimeSegmentDataManager.class.getDeclaredField("_realtimeSegment"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using reflection is not reliable. Consider adding a package private method canAddMore()
into RealtimeSegmentDataManager
and we can override it here
@@ -700,6 +707,10 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee | |||
return prematureExit; | |||
} | |||
|
|||
boolean canAddMore() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Annotate it with @VisibleForTesting
// Hence, The below check is a temporary measure to avoid such scenarios until immutable index | ||
// implementations are changed. | ||
if (!mutableIndex.canAddMore()) { | ||
_logger.debug( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log it as info, and capitalize the first letter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this log floods logs (logs: numColum x numOfIndexes), I update the condition to if (!_indexCapacityThresholdBreached && !mutableIndex.canAddMore())
condition
aggregationConfigs, false); | ||
} | ||
|
||
public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can revert this since the flag is removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed it, my bad.
Problem: Mutable indexes fails to convert to immutable indexes because implementation of immutable index varies (Like some immutable indexes have size limit or can only handle cardinality upto Integer.MAX, etc).
This can cause Runtime Exception when consuming segment is trying to transition to online.
Example: Num of values for a multi valued column exceeds Integer.MAX_VALUE and overflows resulting in RuntimeException during indexing in forward index. Or Immutable fwd index can only hold 2GB data whereas mutable fwd index is unbounded resulting in RuntimeException when immutable index is built.
Proposed Solution: [Short Term] Stop segment consumption if any problematic index cannot consume more rows.