Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Facilitate asynchronous realtime ingestion on decoding & transformation #13695

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

lnbest0707-uber
Copy link
Contributor

@lnbest0707-uber lnbest0707-uber commented Jul 26, 2024

ingestion enhancement feature request
Resolving issues mentioned in #13319.
Pinot ingestion is currently using a strictly serial processing. It fetches a batch of messages from Kafka and then for messages in the batch, it would process one by one with the order of offsets to:

  1. Decode
  2. Transform
  3. Index

It provides benefits to reuse the objects created in between to achieve better memory efficiency but not able to utilize all system resources. There are multiple solutions with their pros and cons:

  1. Async processing for each step as this patch introduced.
    • It could still preserve the same order of messages and retain same offset control logics.
    • It still could not fully utilize the system resources.
    • It brings overhead on memory and GC as not able to reuse objects as before. (There could be TODOs to make size of each batch configurable)
  2. Batch (multiple executors) processing on decoding and transformation.
    • Hard to ensure the order of messages, it might only do at least once consumption instead of current (almost) exact once.
  3. Full batch process on all 3 steps.
    • Current indexing logics and data structures do not really support parallel processing.

Comparing the CPU usage and consumption speed on same server before and after (at 10:00) enabling "ASYNCHRONOUS":
image
image

The more CPU usage contributes a ~10% ingestion speed increase.

Notes:
The new mode is better performed on computation heavy ingestion but cannot really help on light weight use cases. In light computation use cases, the extra memory and GC overhead would compensate the async process gain.

@codecov-commenter
Copy link

codecov-commenter commented Jul 27, 2024

Codecov Report

Attention: Patch coverage is 61.58192% with 68 lines in your changes missing coverage. Please review.

Project coverage is 61.99%. Comparing base (59551e4) to head (c3ca31a).
Report is 804 commits behind head on master.

Files Patch % Lines
...a/manager/realtime/RealtimeSegmentDataManager.java 59.52% 53 Missing and 15 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13695      +/-   ##
============================================
+ Coverage     61.75%   61.99%   +0.24%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2555     +119     
  Lines        133233   140750    +7517     
  Branches      20636    21891    +1255     
============================================
+ Hits          82274    87257    +4983     
- Misses        44911    46850    +1939     
- Partials       6048     6643     +595     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.96% <61.58%> (+0.25%) ⬆️
java-21 61.87% <61.58%> (+0.24%) ⬆️
skip-bytebuffers-false 61.98% <61.58%> (+0.23%) ⬆️
skip-bytebuffers-true 61.84% <61.58%> (+34.12%) ⬆️
temurin 61.99% <61.58%> (+0.24%) ⬆️
unittests 61.99% <61.58%> (+0.24%) ⬆️
unittests1 46.47% <61.58%> (-0.43%) ⬇️
unittests2 27.73% <0.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you guarantee the message order when message batches are processed asynchronously?

Comment on lines +264 to +267
private AtomicInteger _numRowsConsumed = new AtomicInteger(0);
// Can be different from _numRowsConsumed when metrics update is enabled.
private AtomicInteger _numRowsIndexed = new AtomicInteger(0);
private AtomicInteger _numRowsErrored = new AtomicInteger(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be final?

BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new LinkedBlockingQueue<>();
AtomicInteger submittedMsgCount = new AtomicInteger(0);
// TODO: tune the number of threads
ExecutorService decodeAndTransformExecutor = Executors.newFixedThreadPool(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting a new executor per message batch can create big overhead. Consider creating an executor to be shared for different batches

}
});

indexingThread.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is executing this thread?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer thread for the partition is the one kicking off this "indexingThread". I don't understand why we kick off a separate thread, and then in the next line, we wait for it to finish. What's the difference if we don't spin off a new thread, and use the main thread (consuming thread) to do the indexing?

@mcvsubbu
Copy link
Contributor

cc: @sajjad-moradi

@mcvsubbu
Copy link
Contributor

Is is useful to create a sub-class of RealtimeSegmentDataManager that consumes asynchronously?

@Jackie-Jiang
Copy link
Contributor

To ensure the ingestion order, we might be able to use a producer-consumer pattern, where consumer thread creates MessageBatchs and put them into a queue; another ingestion thread pull MessageBatchs from the queue, transform the records and index them into segment. This way we can use 2 threads per partition.

@mcvsubbu
Copy link
Contributor

How would ingestion rate limiting work?

@lnbest0707-uber
Copy link
Contributor Author

To ensure the ingestion order, we might be able to use a producer-consumer pattern, where consumer thread creates MessageBatchs and put them into a queue; another ingestion thread pull MessageBatchs from the queue, transform the records and index them into segment. This way we can use 2 threads per partition.

Thanks for the review. Right now, it is only using 1 thread to guarantee the order. For multiple threads, I am thinking about adapting the way mentioned in Multi-topic ingestion support if it goes through the review.


BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new LinkedBlockingQueue<>();
AtomicInteger submittedMsgCount = new AtomicInteger(0);
// TODO: tune the number of threads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't have more than one thread here otherwise the order of indexed rows will be different, and that's something we can't tolerate.

}
});

indexingThread.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer thread for the partition is the one kicking off this "indexingThread". I don't understand why we kick off a separate thread, and then in the next line, we wait for it to finish. What's the difference if we don't spin off a new thread, and use the main thread (consuming thread) to do the indexing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) documentation enhancement ingestion real-time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants