Skip to content

Commit

Permalink
Optional Idempotency id + check on emptyness (#19)
Browse files Browse the repository at this point in the history
* Squashed 'service-protocol/' changes from 0d6b476..9edfcd6

9edfcd6 Switch idempotency_key to optional field
5129eca Add idempotency_key to CallEntryMessage & OneWayCallEntryMessage. (#97)
65560bf Add CancelInvocation and GetCallInvocationId entries. (#96)

git-subtree-dir: service-protocol
git-subtree-split: 9edfcd6dd860483cbec08b301bf11f76e1735a87

* Add check for empty idempotency key
  • Loading branch information
slinkydeveloper authored Oct 2, 2024
1 parent 40b2e10 commit 12e945a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 12 deletions.
6 changes: 4 additions & 2 deletions service-protocol/dev/restate/service/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ message CallEntryMessage {
// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
string key = 5;

string idempotency_key = 6;
// If present, it must be non empty.
optional string idempotency_key = 6;

oneof result {
bytes value = 14;
Expand Down Expand Up @@ -349,7 +350,8 @@ message OneWayCallEntryMessage {
// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
string key = 6;

string idempotency_key = 7;
// If present, it must be non empty.
optional string idempotency_key = 7;

// Entry name
string name = 12;
Expand Down
10 changes: 6 additions & 4 deletions src/service_protocol/generated/dev.restate.service.protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,9 @@ pub struct CallEntryMessage {
/// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
#[prost(string, tag = "5")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag = "6")]
pub idempotency_key: ::prost::alloc::string::String,
/// If present, it must be non empty.
#[prost(string, optional, tag = "6")]
pub idempotency_key: ::core::option::Option<::prost::alloc::string::String>,
/// Entry name
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
Expand Down Expand Up @@ -401,8 +402,9 @@ pub struct OneWayCallEntryMessage {
/// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
#[prost(string, tag = "6")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag = "7")]
pub idempotency_key: ::prost::alloc::string::String,
/// If present, it must be non empty.
#[prost(string, optional, tag = "7")]
pub idempotency_key: ::core::option::Option<::prost::alloc::string::String>,
/// Entry name
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
Expand Down
5 changes: 5 additions & 0 deletions src/vm/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ pub const BAD_COMBINATOR_ENTRY: Error = Error::new_const(
"The combinator cannot be replayed. This is most likely caused by non deterministic code.",
);

pub const EMPTY_IDEMPOTENCY_KEY: Error = Error::new_const(
codes::INTERNAL,
"Trying to execute an idempotent request with an empty idempotency key, this is not supported",
);

// Other errors

#[derive(Debug, Clone, thiserror::Error)]
Expand Down
28 changes: 22 additions & 6 deletions src/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use crate::service_protocol::messages::{
};
use crate::service_protocol::{Decoder, RawMessage, Version};
use crate::vm::context::{EagerGetState, EagerGetStateKeys};
use crate::vm::errors::{UnexpectedStateError, UnsupportedFeatureForNegotiatedVersion};
use crate::vm::errors::{
UnexpectedStateError, UnsupportedFeatureForNegotiatedVersion, EMPTY_IDEMPOTENCY_KEY,
};
use crate::vm::transitions::*;
use crate::{
AsyncResultCombinator, AsyncResultHandle, CancelInvocationTarget, Error, GetInvocationIdTarget,
Expand Down Expand Up @@ -433,16 +435,23 @@ impl super::VM for CoreVM {
ret
)]
fn sys_call(&mut self, target: Target, input: Bytes) -> VMResult<AsyncResultHandle> {
if target.idempotency_key.is_some() {
self.verify_feature_support("attach idempotency key to one way call", Version::V3)?;
if let Some(idempotency_key) = &target.idempotency_key {
self.verify_feature_support("attach idempotency key to call", Version::V3)?;
if idempotency_key.is_empty() {
self.do_transition(HitError {
error: EMPTY_IDEMPOTENCY_KEY,
next_retry_delay: None,
})?;
unreachable!();
}
}
self.do_transition(SysCompletableEntry(
"SysCall",
CallEntryMessage {
service_name: target.service,
handler_name: target.handler,
key: target.key.unwrap_or_default(),
idempotency_key: target.idempotency_key.unwrap_or_default(),
idempotency_key: target.idempotency_key,
parameter: input,
..Default::default()
},
Expand All @@ -461,16 +470,23 @@ impl super::VM for CoreVM {
input: Bytes,
delay: Option<Duration>,
) -> VMResult<SendHandle> {
if target.idempotency_key.is_some() {
if let Some(idempotency_key) = &target.idempotency_key {
self.verify_feature_support("attach idempotency key to one way call", Version::V3)?;
if idempotency_key.is_empty() {
self.do_transition(HitError {
error: EMPTY_IDEMPOTENCY_KEY,
next_retry_delay: None,
})?;
unreachable!();
}
}
self.do_transition(SysNonCompletableEntry(
"SysOneWayCall",
OneWayCallEntryMessage {
service_name: target.service,
handler_name: target.handler,
key: target.key.unwrap_or_default(),
idempotency_key: target.idempotency_key.unwrap_or_default(),
idempotency_key: target.idempotency_key,
parameter: input,
invoke_time: delay
.map(|d| {
Expand Down

0 comments on commit 12e945a

Please sign in to comment.