Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delay before the peon drops the segments after publishing them #15373

Conversation

kaisun2000
Copy link
Contributor

@kaisun2000 kaisun2000 commented Nov 14, 2023

Fixes #12168

Description

Currently in the realtime ingestion (Kafka/Kinesis) case, after publishing the segments, upon acknowledgement from the coordinator that the segments are already placed in some historicals, the peon would unannounce the segments (basically saying the segments are not in this peon anymore to the whole cluster) and drop the segments from cache and sink timeline in one shot.

The in transit queries from the brokers that still thinks the segments are in the peon can get a NullPointer exception when the peon is unsetting the hydrants in the sinks.

The fix would let the peon to wait for a configurable delay period before dropping segments, remove segments from cache etc after the peon unannounce the segments.

This delayed approach is similar to how the historicals handle segments moving out.

See the details of discussion in Apache slack channel here

Fixed the bug ...

Renamed the class ...

Added a forbidden-apis entry ...

Release note


Key changed/added classes in this PR
  • MyFoo
  • OurBar
  • TheirBaz

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

This is similar to make sure in transit query seeing the segment
to be dropped in the peon would not fail with a NullPtr exception.
The delay approach is similar to the strategy the historical handles
segment moving out.
@kaisun2000 kaisun2000 changed the title Add delay before the peon drops segments after publishing them Add delay before the peon drops the segments after publishing them Nov 14, 2023
@abhishekagarwal87
Copy link
Contributor

@kaisun2000 - How does this compare to #15260?

@gianm
Copy link
Contributor

gianm commented Nov 20, 2023

We had some discussion of this in Slack. In summary, #15260 fixes an NPE that occurs when a query comes in for a segment that has just been unloaded. This PR adds a delay between unannounce and unload, which reduces the likelihood that a query will come in for a segment that has just been unloaded. Both changes are good IMO— we want this case to be rare (which this patch helps with), but when it does happen it needs to be handled properly (which #15260 helps with).

With this patch, the realtime tasks would behave more similarly to Historicals, which do already have a delay between unannouncing and unloading of 30 seconds (the druid.segmentCache.dropSegmentDelayMillis).

@kaisun2000
Copy link
Contributor Author

kaisun2000 commented Nov 20, 2023

@abhishekagarwal87,

The fix of #15260 from @gianm is in the query path to check the potential NullPtr pointer condition and then avoid them by returning to missing segment error to the broker. The broke seeing the missing segment would retry.

My fix is in the ingestion path to add a configurable delay in the segment handoff phase. More specifically after the peon gets the acknowledgement from the coordinator that the segments in this handoff batch are already placed in some historicals, the peon would wait for some time before the peon removes the segments from its timeline, resets the hydrants and drops the segments and the files backing the segments up in the file system. Actually as @gianm pointed it out, this pattern of racing was not just in the peon side, it is also in the historical data segment moving side. And the approach taken in historical side is also adding a configurable delay. Thus adding a configurable delay is a "good idea" in @gianm's own words.

Thinking about this race in a little bit more abstract way in both peon and historical cases, this race issue is caused because it would take some time for the segments placement change to be reflected (via Zookeeper) in the broker side. The in-transit queries may actually hit a data server (peon or historical) that already moved out the segment to be queried. The delay to drop the segment is to compensate the propagation time for the broker to see the most up-to-date placement of the segment, while in the mean time, the in-transit query missing the segment case would be eliminated.

So I would say the two approaches complement each other. The ingestion delay fix has the benefit of not causing unnecessary retry query load.

See the details of discussion in Apache slack channel here

@abhishekagarwal87
Copy link
Contributor

Thanks @gianm and @kaisun2000, for the context. One last question: does it need to be configurable? I don't think so. We can either hardcode it to 30 seconds or we can reuse the same config option that already exists.

druid.segmentCache.dropSegmentDelayMillis

There is no need of a per ingestion configuration here.

@kaisun2000
Copy link
Contributor Author

Thanks @gianm and @kaisun2000, for the context. One last question: does it need to be configurable? I don't think so. We can either hardcode it to 30 seconds or we can reuse the same config option that already exists.

druid.segmentCache.dropSegmentDelayMillis

There is no need of a per ingestion configuration here.

@abhishekagarwal87, I am pretty open to any suggestions.

As you said, maybe a per Peon/middlemanager configuration is just as good. However, one peon takes one ingestion spec and that is why I put it in ingestion spec.

Can you supply another example of per middle manager configuration which will be carried to peons? I can also change the configuration that way.

@kaisun2000
Copy link
Contributor Author

ping @abhishekagarwal87

@abhishekagarwal87
Copy link
Contributor

abhishekagarwal87 commented Dec 2, 2023 via email

@kaisun2000
Copy link
Contributor Author

@abhishekagarwal87 I did some careful examination of peon config in order to achieve the goal -- avoid dropping delay config to be task spec specific. And here is the proposal.

First, the historical side delay config is here in SegmentLoaderConfig. This can't be used in the peon/middlemanager path as peon does not have SegmentLoader. Peon generates segments by itself and then ships them out to deep storage and other components

Thus, it looks like we need to introduce a similar config in the peon/middlemanager side. The proper place seems to be in the peon additional config . In the code, it is in the class TaskConfig.

In the Peon code side, the TaskConfig is bound in thebindTaskConfigAndClients method. It would be passed into TaskToolboxFactory class. Eventually, it would be inside of TaskToolbox class.

Then in runtime when the peon starts, ExecutorLifecycle would have its taskRunner of type SingleTaskBackgroundRunner to run the task. SingleTaskBackgroundRunner is injected with a copy of TaskToolboxFactory. It would delegate to the task to create a runner ('SeekableStreamIndexTaskRunner') to execute the ingestion logic. The nice thing is that the toolbox created by TaskToolboxFactory is passed along. Here the toolbox still have the TaskConfig.

The 'SeekableStreamIndexTaskRunner' would create the appenderator via task of type SeekableStreamIndexTask in its ingestion logic. So here, we can add the configuration of delay from TaskConfig in the toolbox as an instance variable to appenderator.

This seems to achieve the goal of adding a dropping delay config to peon without the config to be task spec specific.

Let me know if this proposal looks good? I will try to make the change accordingly next unless I hear it otherwise.

@abhishekagarwal87
Copy link
Contributor

@kaisun2000 - FWIW a peon can have segment loader config when it loads broadcast segments. I think we can use the same property name since the purpose is really just the same and we add the relevant documentation. Property name aside, does it really need to be added to TaskConfig? can it be accessed directly in same way as druid.indexer.fork.property.druid.processing.numMergeBuffers is accessed?

@kaisun2000
Copy link
Contributor Author

@kaisun2000 - FWIW a peon can have segment loader config when it loads broadcast segments. I think we can use the same property name since the purpose is really just the same and we add the relevant documentation. Property name aside, does it really need to be added to TaskConfig? can it be accessed directly in same way as druid.indexer.fork.property.druid.processing.numMergeBuffers is accessed?

@abhishekagarwal87, thanks for this further illustration.
Indeed, I see that StorageNodeModule is a base module that the CliPeon would use via Initialization:makeInjectorWithModules(). Since StorageNodeModule has the binding to SegmentLoaderConfig. We also have the druid.segmentCache.dropSegmentDelayMillis in Peon, as you said.

In this case, shall we add a SegmentLoaderConifg to the TaskToolboxFactory constructor?

@Inject
  public TaskToolboxFactory(
      SegmentLoaderConifg segmentLoadConfig,  ---> added 
      TaskConfig config,

Then, later, when the StreamAppenderator is created, we can pass in the SegmentLoaderConfig in its constructor?

Let me know if this sounds good? If so, I will make the change accordingly.

@kaisun2000
Copy link
Contributor Author

Revised the PR , using SegmentLoaderConfig.dropSegmentDelayMillis to control segment drop delay in the realtime path similar to the historical segment drop path. The following are the overview of class changes:

TaskToolboxFactory is injected with an instance of SegmentLoaderConfig
'TaskToolbox' built by TaskToolboxFactory would store a copy of SegmentLoaderConfig.
In the appenderator creation path, changes are made accordingly so that the StreamAppenderator would have a copy of SegmentLoaderConfig from the 'TaskToolbox'
The delay to drop logic is implemented in StreamAppenderator.

@abhishekagarwal87, can we have another review to see if this approach reflects our discussion here?

@kaisun2000
Copy link
Contributor Author

@abhishekagarwal87, ping?

@abhishekagarwal87
Copy link
Contributor

Overall LGTM. I just had a question.

would avoid blocking test case infinitely if something goes unexpected.
@kaisun2000
Copy link
Contributor Author

ping?

@abhishekagarwal87 abhishekagarwal87 merged commit a5e9b14 into apache:master Jan 2, 2024
82 of 83 checks passed
@abhishekagarwal87
Copy link
Contributor

@kaisun2000 - Merged. Thank you for your contribution.

@kaisun2000
Copy link
Contributor Author

@abhishekagarwal87, appreciate your effort here and also happy new year you.

@LakshSingla LakshSingla added this to the 29.0.0 milestone Jan 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Query realtime datasource may get NullPointerException just when segment unannouncing.
4 participants