Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka integration #392

Open
calvinlfer opened this issue May 20, 2024 · 9 comments
Open

Kafka integration #392

calvinlfer opened this issue May 20, 2024 · 9 comments

Comments

@calvinlfer
Copy link
Contributor

Many microservices rely on Apache Kafka to send and receive data asynchronously in a more fault tolerant manner. It is also notoriously difficult to build high level Kafka consumers and producers accounting for concerns like backpressure, rebalancing, streaming, etc.

It would be a very nice proposition to tick this box and give users one more reason to select Kyo for their next project

@fwbrasil
Copy link
Collaborator

/bounty $500

@varshith257
Copy link

varshith257 commented May 27, 2024

@fwbrasil I believe my experience with Kafka will get this solved, it needed codebase understanding to integrate it in Kyo. Putting my hat on to solve this issue

/attempt #392

@fwbrasil
Copy link
Collaborator

thank you @varshith257!

@lukestephenson
Copy link
Contributor

I had a look at the producer side of Kafka (curiosity, not committing to picking up this work). There are some earlier building blocks in kyo that I have some questions about first.

Stream chunking

Producing / consuming from Kafka and streaming often go together. Currently the streaming implementation hides chunking.  For example, the transform operation is implemented as:

    def transform[V2: Flat, S2](f: V => V2 < S2)(
            using tag2: Tag[Streams[V2]]
        ): Stream[T, V2, S & S2] =
            val handler =
                new Handler[Const[Chunk[V]], Streams[V], Streams[V2] & S & S2]:
                    def resume[T, U: Flat, S3](
                        command: Chunk[V],
                        k: T => U < (Streams[V] & S3)
                    )(using Tag[Streams[V]]): (U | Resume[U, S3]) < (Streams[V2] & S & S2 & S3) =
                        command.map(f).map { c =>
                            Streams.emitChunkAndThen(c) {
                                Resume((), k(().asInstanceOf[T]))
                            }
                        }
                    end resume
            Streams[V].handle(handler)((), s)
        end transform

Note the chunk is used to avoid the overhead of individual elements through the Stream, but it is hidden as an implementation detail.

Producing to Kafka can be expensive for effect system if it is modelled per record. We can achieve better performance producing to Kafka if the produce operation is expressed on a Chunk[ProducerRecord] rather than individual records (less IOs / completion promises to create).

How do folks feel about changing the Streams implementation so that chunking is exposed externally? Or not even a concern of the Streams implementation and you just create a Stream where each element emitted is a Chunk?

Modelling completion of publishing to Kafka.

Typically publication to Kafka is modelled as an IO[IO[RecordMetadata]].

  1. The outer IO represents the effect of the Kafka producer accepting the record into its internal buffer (if it doesn't get accepted, the call blocks).
  2. The inner IO represents when the Kafka brokers have acknowledge the record.

Now in kyo, we can't represent the two side effects with RecordMetadata < (IOs & IOs), so I'm assuming we want to have the return type of the operations of which publish to Kafka as Promise[RecordMetadata < Aborts[Exception]] < IOs. In this the IOs represents the record being accepted by the local producer buffer, and the Promise representing the broker processing. Thoughts?

Making blocking calls

Are there any concerns in kyo with a call to IOs.apply causing the thread of execution to block? If the producer buffer is full, this will occur. i.e. in  IOs.apply(kafkaproducer.send(producerRecord)), the kafkaproducer.send call may block.

@lukestephenson
Copy link
Contributor

I think I've answered by own question about the blocking calls in kyo. The blocking benchmark seems to indicate that it is acceptable in kyo to just use IOs.apply with no special indicator that the call is blocking.

@lukestephenson
Copy link
Contributor

With regards to modelling the producing to kafka as Promise[RecordMetadata < Aborts[Exception]] < IOs, this becomes pretty difficult to work with because as per the docs:

Kyo performs checks at compilation time to ensure that nested effects are not used

Or at least that is what I think is happening when I gave this a go. When I try to start using that I get an error: Please provide an implicit kyo.Tag[kyo.core.<[org.apache.kafka.clients.producer.RecordMetadata, kyo.aborts$package.Aborts[scala.Throwable]]] parameter.

I can work around this by using Promise[Try[RecordMetadata]] < IOs, but that doesn't feel very kyo like avoiding the Aborts effect.

@fwbrasil
Copy link
Collaborator

fwbrasil commented Oct 1, 2024

@varshith257 I'm redistributing Kyo's bounties. Have you been able to work on this? If not, can you cancel the attempt? Thank you!

@calvinlfer
Copy link
Contributor Author

An additional feature that would be great to have is to have a Consume Chunk API where users would not be exposed to stream of consumer records. Instead the API should allow users to submit an effectful function that works on a chunk of consumer records belonging to the same Kafka partition. The effectful function would be evaluated per partition concurrently. This is inspired by FS2 Kafka’s Consume Chunk API

@fwbrasil
Copy link
Collaborator

Removing bounty to redistribute the budget

@algora-pbc algora-pbc bot removed the 💎 Bounty label Jan 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants