Skip to content

Commit

Permalink
fix: add handshake for readFn and ackFn (#88)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Sep 20, 2024
1 parent c3af770 commit 7be70f4
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 42 deletions.
5 changes: 2 additions & 3 deletions examples/simple-source/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ update:

.PHONY: image
image: update
cd ../../ && docker build \
cd ../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
-t ${IMAGE_REGISTRY} . --platform linux/amd64,linux/arm64 --push

.PHONY: clean
clean:
Expand Down
15 changes: 8 additions & 7 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let source_handle = simple_source::SimpleSource::new("Hello World!".to_string());
let source_handle = simple_source::SimpleSource::new();
numaflow::source::Server::new(source_handle).start().await
}

pub(crate) mod simple_source {
use std::{collections::HashSet, sync::RwLock};

use chrono::Utc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{collections::HashSet, sync::RwLock};
use tokio::sync::mpsc::Sender;

use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
Expand All @@ -18,15 +18,15 @@ pub(crate) mod simple_source {
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
/// does not provide a mutable reference as explained in [`Sourcer`]
pub(crate) struct SimpleSource {
payload: String,
yet_to_ack: RwLock<HashSet<String>>,
counter: AtomicUsize,
}

impl SimpleSource {
pub(crate) fn new(payload: String) -> Self {
pub(crate) fn new() -> Self {
Self {
payload,
yet_to_ack: RwLock::new(HashSet::new()),
counter: AtomicUsize::new(0),
}
}
}
Expand All @@ -42,9 +42,10 @@ pub(crate) mod simple_source {
let mut message_offsets = Vec::with_capacity(request.count);
for i in 0..request.count {
let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i);
let payload = self.counter.fetch_add(1, Ordering::Relaxed).to_string();
transmitter
.send(Message {
value: format!("{}-{}", self.payload, event_time).into_bytes(),
value: payload.into_bytes(),
event_time,
offset: Offset {
offset: offset.clone().into_bytes(),
Expand Down
27 changes: 21 additions & 6 deletions proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ package source.v1;

service Source {
// Read returns a stream of datum responses.
// The size of the returned ReadResponse is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list).
// The size of the returned responses is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned responses will contain all the datum that have been read (which could be an empty list).
// The server will continue to read and respond to subsequent ReadRequests until the client closes the stream.
// Once it has sent all the datum, the server will send a ReadResponse with the end of transmission flag set to true.
rpc ReadFn(stream ReadRequest) returns (stream ReadResponse);

// AckFn acknowledges a stream of datum offsets.
// When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex.
// The caller (numa) expects the AckFn to be successful, and it does not expect any errors.
// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request,
// then it is best to crash because there are no other retry mechanisms possible.
rpc AckFn(stream AckRequest) returns (AckResponse);
// Clients sends n requests and expects n responses.
rpc AckFn(stream AckRequest) returns (stream AckResponse);

// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);
Expand All @@ -29,6 +31,14 @@ service Source {
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/*
* ReadRequest is the request for reading datum stream from user defined source.
*/
Expand All @@ -43,6 +53,7 @@ message ReadRequest {
}
// Required field indicating the request.
Request request = 1;
optional Handshake handshake = 2;
}

/*
Expand Down Expand Up @@ -82,14 +93,15 @@ message ReadResponse {
// End of transmission flag.
bool eot = 1;
Code code = 2;
Error error = 3;
optional Error error = 3;
optional string msg = 4;
}
// Required field holding the result.
Result result = 1;
// Status of the response. Holds the end of transmission flag and the status code.
//
Status status = 2;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 3;
}

/*
Expand All @@ -103,6 +115,7 @@ message AckRequest {
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
optional Handshake handshake = 2;
}

/*
Expand All @@ -122,6 +135,8 @@ message AckResponse {
}
// Required field holding the result.
Result result = 1;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 2;
}

/*
Expand Down Expand Up @@ -170,4 +185,4 @@ message Offset {
// It is useful for sources that have multiple partitions. e.g. Kafka.
// If the partition_id is not specified, it is assumed that the source has a single partition.
int32 partition_id = 2;
}
}
Loading

0 comments on commit 7be70f4

Please sign in to comment.