Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ingestion delay configurable: with concurrency fixes #14142

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

KKcorps
Copy link
Contributor

@KKcorps KKcorps commented Oct 2, 2024

We ran into some errors with previous version #14074 and hence it was reverted with #14127 . Here's the new PR with multiple fixes after the revert

The ingestion offset lag metric currently makes a request to kafka broker to get the latest offset on every update.

However, this has lead to an increasing amount of load on kafka brokers for multiple users.

The PR adds a way to enable/disable this metric and also configure its interval using the following properties in the server or cluster configs

pinot.server.instance.offset.lag.tracking.enable: true
pinot.server.instance.offset.lag.tracking.update.interval: 60000

@codecov-commenter
Copy link

codecov-commenter commented Oct 2, 2024

Codecov Report

Attention: Patch coverage is 0% with 68 lines in your changes missing coverage. Please review.

Project coverage is 34.18%. Comparing base (59551e4) to head (979c83c).
Report is 1264 commits behind head on master.

Files with missing lines Patch % Lines
...e/data/manager/realtime/IngestionDelayTracker.java 0.00% 67 Missing ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 0.00% 1 Missing ⚠️

❗ There is a different number of reports uploaded between BASE (59551e4) and HEAD (979c83c). Click for more details.

HEAD has 9 uploads less than BASE
Flag BASE (59551e4) HEAD (979c83c)
integration2 3 2
temurin 12 11
skip-bytebuffers-false 7 5
unittests 5 3
unittests1 2 0
java-11 5 4
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #14142       +/-   ##
=============================================
- Coverage     61.75%   34.18%   -27.57%     
- Complexity      207      765      +558     
=============================================
  Files          2436     2660      +224     
  Lines        133233   145739    +12506     
  Branches      20636    22297     +1661     
=============================================
- Hits          82274    49826    -32448     
- Misses        44911    91894    +46983     
+ Partials       6048     4019     -2029     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 34.18% <0.00%> (-27.53%) ⬇️
java-21 34.18% <0.00%> (-27.45%) ⬇️
skip-bytebuffers-false 34.18% <0.00%> (-27.56%) ⬇️
skip-bytebuffers-true 34.16% <0.00%> (+6.43%) ⬆️
temurin 34.18% <0.00%> (-27.57%) ⬇️
unittests 34.18% <0.00%> (-27.57%) ⬇️
unittests1 ?
unittests2 34.18% <0.00%> (+6.45%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang Jackie-Jiang added enhancement documentation Configuration Config changes (addition/deletion/change in behavior) ingestion labels Oct 7, 2024
private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();

private final Cache<String, Boolean> _segmentsToIgnore =
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, TimeUnit.MINUTES).build();

// TODO: Make thread pool a server/cluster level config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we should keep this TODO

volatile StreamPartitionMsgOffset _latestOffset;
final StreamMetadataProvider _streamMetadataProvider;

IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long firstStreamIngestionTimeMs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are ingestionTimeMs or firstStreamIngestionTimeMs ever null?


IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long firstStreamIngestionTimeMs,
@Nullable StreamPartitionMsgOffset currentOffset,
@Nullable StreamMetadataProvider streamMetadataProvider) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamMetadataProvider is also always non-null, and we should be able to remove several unnecessary null checks

volatile Long _firstStreamIngestionTimeMs;
volatile StreamPartitionMsgOffset _currentOffset;
volatile StreamPartitionMsgOffset _latestOffset;
final StreamMetadataProvider _streamMetadataProvider;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Suggest moving this to the top since it is final

Comment on lines 532 to 533
StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) This is almost never accurate because current offset and latest offset are updated separately. We probably should directly track the lag and update the lag when fetching the latest offset.
Seems currentOffset is used only here, so we can just remove both offsets and only keep lag in IngestionInfo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the issue, that we can get the currentOffset only via the updateIngestionMetric method call
and the latest offset we are calculating in seperate periodic thread

if I just want to store the lag, i'll either have to calculate latestOffset every time updateIngestionMetrics is called with currentOffset for the partition. I can't do this since it'll defeat the purpose of this PR.

throws RuntimeException {
_serverMetrics = serverMetrics;
_tableNameWithType = tableNameWithType;
_metricName = tableNameWithType;
_realTimeTableDataManager = realtimeTableDataManager;
_clock = Clock.systemUTC();
_isServerReadyToServeQueries = isServerReadyToServeQueries;

StreamConfig streamConfig =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should directly pass in StreamConfig to avoid parsing it multiple times

@Jackie-Jiang
Copy link
Contributor

@swaminathanmanish Please help review this

@KKcorps KKcorps force-pushed the ingestion_delay_configured_resolved branch from 85de7c9 to 6ccff15 Compare October 17, 2024 13:29
@Jackie-Jiang
Copy link
Contributor

Before the PR, the logic is:

  • For every message batch, we make a separate metadata fetch request to get the latest offset

This PR:

  • Move the latest offset fetching logic into IngestionDelayTracker and periodically refresh IngestionInfo._latestOffset
  • Still use _latestOffset - _currentOffset to compute the ingestion lag
  • The problem of this approach is that _latestOffset and _currentOffset are not updated together

To solve the above problem:

  • Move the latest offset fetching logic into IngestionDelayTracker
  • Fetch latest offset within updateIngestionMetrics(), but with the given minimum interval (not fetch for every invocation)
  • Directly store _offsetLag instead of _latestOffset in the IngestionInfo

@Jackie-Jiang
Copy link
Contributor

Alternatively, we can actually directly calculate the lag in the RealtimeSegmentDataManager with the given minimum interval (put null when latest offset is not fetched). This way we don't need to manage metadata fetch within IngestionDelayTracker

@KKcorps
Copy link
Contributor Author

KKcorps commented Oct 30, 2024

@Jackie-Jiang went with the first alternative

@Jackie-Jiang
Copy link
Contributor

I kind of prefer the second way, so that we don't need to manage metadata fetch within IngestionDelayTracker. It should be able to greatly simplify the logic. Can you give it a try?
Please also take a look at the test failure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) documentation enhancement ingestion
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants