Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aeron] introduce integration #1048

Merged
merged 8 commits into from
Jan 27, 2025
Merged

[aeron] introduce integration #1048

merged 8 commits into from
Jan 27, 2025

Conversation

fwbrasil
Copy link
Collaborator

@fwbrasil fwbrasil commented Jan 23, 2025

Problem

High-performance solutions often require using inter-process communication (IPC) for extremely low latencies. It essentially uses in-memory communication between different processes, without even touching the network stack. Another common technique for optimization is using UDP instead of TCP for remote communication. TCP has a series of mitigations for unreliable transport channels that aren't necessary in more stable topologies like data centers. Aeron provides more lightweight mechanisms for reliability on top of UDP.

Solution

This PR integrates with Aeron to provide both IPC and UDP communication with a simple API based on Stream. Please see the scaladocs for more information.

Notes

  • This is a first step for Remotes: Clustered Services #31
  • The implementation can't handle large messages. I had trouble to produce fragments in a way that FragmentAssembler can handle it. I'll follow up on this.
  • I imagine it's also possible to provide this integration in Scala Native given that Aeron has implementations in native languages.

Checklist

  • Unit test all changes
  • Update scaladocs if needed
  • Update the README if needed

result match
case Present((tag2, messages)) =>
// verify message type matches expected type
if tag2 != tag.toString then
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ideally find a way to disallow toString on opaque types... can we use raw instead here?

*
* Messages must have a ReadWriter instance to be published or consumed.
*/
type AsMessage[A] = ReadWriter[A]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for a type alias instead of opaque type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial attempt was an opaque type but upickle's macros don't like it. I also had tried fury but had issues with encoders never finishing to compile (it uses runtime code generation)

kyo-core/shared/src/main/scala/kyo/Retry.scala Outdated Show resolved Hide resolved
kyo-core/shared/src/main/scala/kyo/Retry.scala Outdated Show resolved Hide resolved
val backpressured = Abort.fail(Backpressured())

// temporary storage for reassembled message
var result: Maybe[(String, Chunk[A])] = Absent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe to be used with Choice? Seems highly uncommon combination...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the code here suspends all side effects with IO. The issue with Choice is related to mutability across multiple computation steps without IO suspensions.

@fwbrasil
Copy link
Collaborator Author

@hearnadam can we merge this?

@fwbrasil fwbrasil merged commit 630235e into main Jan 27, 2025
2 of 3 checks passed
@fwbrasil fwbrasil deleted the aeron2 branch January 27, 2025 20:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants