Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: parameters to control number and the distribution of messages in a micro-batch #63

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

atezs82
Copy link
Contributor

@atezs82 atezs82 commented Oct 11, 2021

If we have a considerably large backlog for one or more topics that are read by the connector, then using the current implementation we cannot really place an upper limit on the number of messages that are processed at once in a Spark Streaming pipeline.
This PR attempts to address this problem by adding an experimental parameter called maxEntriesPerTrigger to the code (this behaves like the maxOffsetsPerTrigger parameter of the Kafka connector: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries). If the parameter is set, only a specified number of entries are read from the source. We are a little bit diverging from the Kafka connector here, since that one limits actual messages (we can limit Pulsar entries only with this approach).
The feature is based upon the Pulsar Admin API call https://pulsar.apache.org/docs/en/2.7.3/admin-api-topics/#get-internal-stats, which can return all ledgers and entries for a topic. The use of this approach is required so that current functionalities of the Pulsar server side can be used to achieve the task at hand.
In addition to this, since our input data is very much different in backlog sizes, I have added some additional parameters, called forwardStrategy and ensureEntriesPerTopic, so that topics with a very low backlog are also forwarded continuously if needed and we have options for consuming backlogs in different fashion if that is needed.

Please let me know what you think about this draft change. I can add further integration/unit tests and make final documentation changes if you agree with this approach outlined here.

We have also seen some (somewhat troubling) news about creating a brand new connector for Spark Streaming, can you please share additional information on that subject? Thanks in advance!

@atezs82 atezs82 requested review from jianyun8023 and a team as code owners October 11, 2021 07:42
@atezs82 atezs82 force-pushed the work/limitEntriesPerTrigger branch 3 times, most recently from b77373d to dd8645a Compare October 14, 2021 07:23
@atezs82 atezs82 force-pushed the work/limitEntriesPerTrigger branch 3 times, most recently from 028d348 to b9da6dc Compare November 5, 2021 12:30
@atezs82
Copy link
Contributor Author

atezs82 commented Nov 11, 2021

One thing to add here is that - as you might have seen in the PR - is that, unlike the Kafka connector we are forwarding by entries which can contain one or more messages. The reason behind this is that we could not find a suitable method for forwarding by an exact number of messages using a Reader in the getOffset call. Please let me know if you think that such method exists in Pulsar 2.7.x, I can modify the code accordingly (since the current solution is kind-of suboptimal and unpredictable due to this).

We have a close candidate though: I think we can use https://pulsar.apache.org/docs/en/next/admin-api-topics/#examine-messages, but the problem there is that it can look for messages relative to the first or the last message on a topic. If we can make this method work for any message ID, I think it can be sufficiently used in this feature as well. I was also thinking about we can modify the Reader interface so that it will be able to forward itself by n number of messages (on the other hand, that would require 2 API calls per topic to get the forwarded message ID, now we issue only 1, which already causes problems when we read ~100 topics using a connector due to the large number of API requests).

Please let me know what do you think about this.

Copy link
Collaborator

@nlu90 nlu90 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General Feedback:

  1. Can we separate the changes of introducing maxMesagesPerBatch config and the forward strategy feature into two PRs?
  2. If each micro-batch we are calling to get all topics stats, I'm afraid of there will be performance issues
  3. the "forward" used here is not very intuitive.
  4. Is is possible to construct an ending messageID and them check if it exists against pulsar?

caseInsensitiveParams.getOrElse(EnsureEntriesPerTopic, "0").toLong

private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String =
caseInsensitiveParams.getOrElse(ForwardStrategy, "simple")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"simple" should also be a defined strategy as "proportional" or "largefirst"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

@atezs82
Copy link
Contributor Author

atezs82 commented Dec 17, 2021

Can we separate the changes of introducing maxMesagesPerBatch config and the forward strategy feature into two PRs?

I can do that if needed, will add a new one about strategies then.

If each micro-batch we are calling to get all topics stats, I'm afraid of there will be performance issues

We are invoking a single admin.topics().getInternalStats(topic) currently to get the internal stats for the topic we want to stream from (that can be multiple topics though, since we can specify topicsPattern as a parameter, meaning one admin API request per parameter), very much like when the connector is just fetching the latest message ID. Is there something that I'm overlooking here? How can we be more efficient?

the "forward" used here is not very intuitive.

Will change the naming.

Is is possible to construct an ending messageID and them check if it exists against pulsar?

I believe we can do that, since we already construct that, but would that cause additional load on the Pulsar cluster? I think by creating a reader/consumer for this purpose this can be achieved. What do you think about this? Do you see any simpler ways for doing that? (I just do not want to place more load on the Pulsar cluster if that is not needed.)

@atezs82 atezs82 force-pushed the work/limitEntriesPerTrigger branch 4 times, most recently from a081323 to 631eb8b Compare December 20, 2021 11:07
@atezs82
Copy link
Contributor Author

atezs82 commented Dec 20, 2021

I have modified the PR with the following:

  • removed different "forward strategies" (code and documentation) - we can have a separate PR for that in the future if this one is accepted
  • renamed feature to fetchNextOffsetWithMaxEntries - we might want to use something like this when implementing the different strategies if that is needed.

I can make further changes based on the answers for the questions above, see my previous comment.

Copy link
Contributor

@syhily syhily left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atezs82 This PR contains a lot of conflicts. Are you willing to continue working on this?

@atezs82
Copy link
Contributor Author

atezs82 commented Dec 11, 2022

Yes. I will resolve the conflicts shortly. I was waiting for your opinion about the questions raised in #63 (comment) before moving forward with this.

@atezs82 atezs82 force-pushed the work/limitEntriesPerTrigger branch 2 times, most recently from 6650f4c to 48ce512 Compare December 11, 2022 22:27
This feature is planned to be put into a different PR.
@atezs82 atezs82 force-pushed the work/limitEntriesPerTrigger branch from 48ce512 to 4839989 Compare December 13, 2022 08:57
@atezs82
Copy link
Contributor Author

atezs82 commented Dec 13, 2022

Rebased on top of master, now looking for a way to fix the Codacy warnings and do not increase the code complexity too much in the effort. Please in the meantime let me know if there are any answers for my questions from #63 (comment). Thanks in advance!

@atezs82
Copy link
Contributor Author

atezs82 commented Dec 20, 2022

I have slightly reworked some imperative logic inside to TopicInternalStatsUtils to a step more Scala-ish, please let me know what do you think. Thanks in advance!

@syhily
Copy link
Contributor

syhily commented Jan 10, 2023

Yes. I will resolve the conflicts shortly. I was waiting for your opinion about the questions raised in #63 (comment) before moving forward with this.

Thanks for your contribution. I'll review this PR later this week. I was still busy working on flink-connector-pulsar.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants