-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding maxBytesPerTrigger tag for Pulsar Admission Control #151
Conversation
@ericm-db Thanks for picking this idea up! Since we do not use Pulsar anymore my work on the other PR was seriously down-prioritized. I'm glad though that this might be present in the connector in some form, since I personally think that this is very useful for eg. some CDC usecases. |
072e45b
to
e7e87b6
Compare
src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Outdated
Show resolved
Hide resolved
fdfa436
to
dbbbb68
Compare
src/test/scala/org/apache/spark/sql/pulsar/PulsarSourceSuiteBase.scala
Outdated
Show resolved
Hide resolved
Please also update documentation for admin url and maxBytesPerTrigger in README.md |
} | ||
val newTopics = topicPartitions.toSet.diff(existingStartOffsets.keySet) | ||
val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition | ||
=> topicPartition -> MessageId.earliest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have some concern here.
For newly discovered topic, admission control start from Message.earliest(-1, -1, -1), this assume that all ledgers exposed by the stats are readable. Is this assumption valid? @nlu90
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be okay
src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlHelper.scala
Outdated
Show resolved
Hide resolved
c72e42e
to
305fb2c
Compare
@@ -122,7 +135,9 @@ private[pulsar] case class PulsarHelper( | |||
offset.foreach { case (tp, mid) => | |||
try { | |||
val (subscription, _) = extractSubscription(predefinedSubscription, tp) | |||
CachedConsumer.getOrCreate(tp, subscription, client).seek(mid) | |||
val consumer = CachedConsumer.getOrCreate(tp, subscription, client) | |||
if (!consumer.isConnected) consumer.getLastMessageId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is a bug that pulsar consumer do not attempt to reconnect when doing seek(), can you leave a comment here explaining why this change is needed and TODO that we will get rid of this once we upgraded to a version that has the fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Motivation
Some users that request Pulsar Spark connector also request that the Pulsar source has ratelimit functionality. They would like to control the rate of data processing and resource consumption of streaming queries that use the Pulsar source. This can be achieved by implementing admission control in pulsar source.
Modifications
Added a config called
maxBytesPerTrigger
which allows users to configure how many bytes are consumed for each microbatch and shared between topic-partitionsVerifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
This change is already covered by existing tests, such as:
This change added tests and can be verified as follows: Run the PulsarAdmissionControlSuite
Documentation
Check the box below.
Need to update docs?
doc-required
no-need-doc
doc