Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAG] Fix missing states in channel pickling #46417

Merged
merged 1 commit into from
Jul 4, 2024

Conversation

ruisearch42
Copy link
Contributor

@ruisearch42 ruisearch42 commented Jul 3, 2024

Why are these changes needed?

When an ADAG channel is pickled, it currently does not include _writer_registered flag. However, when the channel is deserialized and passed to another node, the channel may be double registered, causing runtime failures.

Using the repro script of #46411 as an example:

  • The first registration (ensure_registered_as_writer()) happens when the driver calls do_allocate_channel() on the actor, _writer_registered is set to True
  • However, when the driver ray.get() on the channel, its _writer_registered is False as the field is not pickled
  • The second registration happens when driver calls do_exec_tasks() (-> _prep_task() -> output_writer.start() -> _output_channel.ensure_registered_as_writer()) on the actor, the task's output channel is passed in from driver (with _writer_registered==False`).
  • Since ensure_registered_as_writer() (if the reader is a remote node) eventually calls ExperimentalRegisterMutableObjectReaderRemote() (->HandleRegisterMutableObject()) on the remote node, where it inserts an entry to a hash map keyed with writer_object_id. If there is already an entry with the same key, the check fails as shown in the following snippet:
  bool success =
      remote_writer_object_to_local_reader_.insert({writer_object_id, info}).second;
  RAY_CHECK(success);

This PR fixes the issue by including these states in pickling. A new test test_pp is added to verify the fix.

This PR also introduces test_multi_node_dag and moves several tests from test_accelerated_dag since it got large.

Related issue number

Closes #46411

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@ruisearch42 ruisearch42 marked this pull request as ready for review July 3, 2024 18:02
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ensure_registered_as_writer and ensure_registered_as_reader should be idempotent, so double registration should be fine. We should make the registration idempotent in this PR.

def ensure_registered_as_writer(self):
"""
Check whether the process is a valid writer. This method must be idempotent.
"""
raise NotImplementedError
def ensure_registered_as_reader(self):
"""
Check whether the process is a valid reader. This method must be idempotent.
"""
raise NotImplementedError

@jackhumphries
Copy link
Contributor

Could you break down the sequence of steps in the ADAG that causes this double registration?

@ruisearch42
Copy link
Contributor Author

I think ensure_registered_as_writer and ensure_registered_as_reader should be idempotent, so double registration should be fine. We should make the registration idempotent in this PR.

Yeah the issue is not these methods are not idempotent, but a state (_writer_registered) of the channel is not preserved causing the method to be called twice when the channel is passed around. See the updated description for a detailed illustration of the steps.

@ruisearch42
Copy link
Contributor Author

ruisearch42 commented Jul 3, 2024

Could you break down the sequence of steps in the ADAG that causes this double registration?

Please see the updated description.

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: what's the reason why it didn't happen in a single node

@ruisearch42 ruisearch42 added the go add ONLY when ready to merge, run all tests label Jul 3, 2024
@ruisearch42
Copy link
Contributor Author

QQ: what's the reason why it didn't happen in a single node

Yeah ExperimentalRegisterMutableObjectReaderRemote() is only called if reader is on a remote node:

.ExperimentalRegisterMutableObjectReaderRemote(c_writer_ref,

@kevin85421
Copy link
Member

Yeah the issue is not these methods are not idempotent, but a state (_writer_registered) of the channel is not preserved causing the method to be called twice when the channel is passed around. See the updated description for a detailed illustration of the steps.

By definition, an idempotent operation should have no difference no matter whether we call the function 1 time or many times. If the second call of ensure_registered_as_writer fails, I think the operation is not idempotent.

@ruisearch42
Copy link
Contributor Author

ruisearch42 commented Jul 3, 2024

Yeah the issue is not these methods are not idempotent, but a state (_writer_registered) of the channel is not preserved causing the method to be called twice when the channel is passed around. See the updated description for a detailed illustration of the steps.

By definition, an idempotent operation should have no difference no matter whether we call the function 1 time or many times. If the second call of ensure_registered_as_writer fails, I think the operation is not idempotent.

The issue is that after serde the internal state of a channel changes. And it is arguable whether it is still the same object. I don't think any idempotent method can still guarantee to be idempotent when the object's internal state unexpectedly changes. This is to fix the exact state change bug between serde.

If you have a different idea, can you elaborate the fix you have on mind?

@kevin85421
Copy link
Member

Sync with @ruisearch42 offline. The PR currently makes sense to me. The idempotency of ensure_registered_as_writer also relies on the value of self._writer_registered. This PR makes sure the state of the channel is the same after serialization and deserialization. Hence, the function will become idempotent after this PR.

@jackhumphries
Copy link
Contributor

Let me take a look before this is merged.

@rkooo567 rkooo567 merged commit 86f395f into ray-project:master Jul 4, 2024
7 checks passed
@jackhumphries
Copy link
Contributor

Hmm, if a channel registers itself as the writer on some node A, and then the channel is passed to some unrelated node B, node B will not register itself as the writer. I think the right direction here would be to be the RAY_CHECK(success) to something that allows a duplicate registration for the same channel, but no other registration.

@ruisearch42
Copy link
Contributor Author

Thanks Jack.

node B will not register itself as the writer.

That sounds like the right behavior.

I think the right direction here would be to be the RAY_CHECK(success) to something that allows a duplicate registration for the same channel, but no other registration.

hmm, not sure if this is the same or a different problem you are targeting, and not sure the solution is what we want. I will reach out to discuss offline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[ADAG] Double channel registration at remote node fails assertion
5 participants