Skip to content

Commit

Permalink
CORE-15757 flow session refactor (#1208)
Browse files Browse the repository at this point in the history
Flow Session Refactor to reduce the total messages exchanged between session counterparties.

SessionInit info can now be piggybacked onto SessionData messages.

SessionAcks are removed.

SessionContextProperties are piggybacked onto any SessionEvents as SessionConfirm is not always sent.

Add new initiateFlow API to allow the initiator to define whether a SessionClose message should be sent by the initiated party when their flow finishes or when they call close. Whichever comes first.
  • Loading branch information
LWogan authored Sep 1, 2023
1 parent 7ea0236 commit 9ea9852
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name);


/**
* Creates a communication session with a counterparty's {@link ResponderFlow}. Subsequently, you may send/receive using
* this session object. Note that this function does not communicate in itself. The counter-flow will be kicked off
* by the first send/receive.
* <p>
* Initiated flows are initiated with context based on the context of the initiating flow at the point in time this
* method is called. The context of the initiating flow is snapshotted by the returned session. Altering the flow
* context has no effect on the context of the session after this point, and therefore it has no effect on the
* context of the initiated flow either.
*
* @param x500Name The X500 name of the member to communicate with.
* @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
*
* @return The session.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
Expand Down Expand Up @@ -124,6 +146,49 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
* send/receive.
* <p>
* This overload takes a builder of context properties. Any properties set or modified against the context passed to
* this builder will be propagated to initiated flows and all that flow's initiated flows and sub flows down the
* stack. The properties passed to the builder are pre-populated with the current flow context properties, see
* {@link FlowContextProperties}. Altering the current flow context has no effect on the context of the session after the
* builder is applied and the session returned by this method, and therefore it has no effect on the context of the
* initiated flow either.
* <p>
* Example of use in Kotlin.
* ```Kotlin
* val flowSession = flowMessaging.initiateFlow(virtualNodeName) { flowContextProperties ->
* flowContextProperties["key"] = "value"
* }
* ```
* Example of use in Java.
* ```Java
* FlowSession flowSession = flowMessaging.initiateFlow(virtualNodeName, (flowContextProperties) -> {
* flowContextProperties.put("key", "value");
* });
* ```
*
* @param x500Name The X500 name of the member to communicate with.
* @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
* @param flowContextPropertiesBuilder A builder of context properties.
*
* @return The session.
*
* @throws IllegalArgumentException if the builder tries to set a property for which a platform property already
* exists or if the key is prefixed by {@link FlowContextProperties#CORDA_RESERVED_PREFIX}. See also
* {@link FlowContextProperties}. Any other
* exception thrown by the builder will also be thrown through here and should be avoided in the provided
* implementation, see {@link FlowContextPropertiesBuilder}.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder);

/**
* Suspends until a message has been received for each session in the specified {@code sessions}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,26 @@ public void initiateFlowPartyWithBuilder() {
Assertions.assertThat(result).isNotNull();
Assertions.assertThat(result).isEqualTo(flowSession);
}

@Test
public void initiateFlowPartyWithBuilderRequireCloseTrue() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any())).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, true, (contextProperties) -> contextProperties.put("key", "value"));

Assertions.assertThat(result).isNotNull();
Assertions.assertThat(result).isEqualTo(flowSession);
}

@Test
public void initiateFlowPartyWithBuilderRequireCloseFalse() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any())).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, false, (contextProperties) -> contextProperties.put("key", "value"));

Assertions.assertThat(result).isNotNull();
Assertions.assertThat(result).isEqualTo(flowSession);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,20 @@
"type": "net.corda.data.identity.HoldingIdentity",
"doc": "Identity of party in the session who was initiated."
},
{
"name": "receivedSequenceNum",
"type": "int",
"doc": "Sequence number of the last contiguous message received from a counterparty. 0 if no messages received."
},
{
"name": "outOfOrderSequenceNums",
"type": {
"type": "array",
"items": "int"
},
"doc": "The sequence numbers of events received with a value greater than the last contiguous event received. i.e out of order messages received with a value greater than the receivedSequenceNum."
},
{
"name": "payload",
"type": [
"net.corda.data.flow.event.session.SessionInit",
"net.corda.data.flow.event.session.SessionConfirm",
"net.corda.data.flow.event.session.SessionData",
"net.corda.data.flow.event.session.SessionClose",
"net.corda.data.flow.event.session.SessionAck",
"net.corda.data.flow.event.session.SessionError"
]
},
{
"name": "contextSessionProperties",
"type": ["null", "net.corda.data.KeyValuePairList"],
"doc": "A map of context properties received from a counterparty related to this flow session. This is static data and will be set to null when previously sent."
}
]
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,5 @@
"name": "SessionConfirm",
"doc" : "Acknowledge to counterparty that the session has been confirmed and is ready to send and receive messages.",
"namespace": "net.corda.data.flow.event.session",
"fields": [
{
"name": "contextSessionProperties",
"type": "net.corda.data.KeyValuePairList",
"doc": "A map of context properties received from a counterparty related to this flow session. This contains information such as protocol name and version that this flow is running."
}
]
"fields": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
"net.corda.data.chunking.Chunk",
"bytes"
]
},
{
"name": "sessionInit",
"type": ["null", "net.corda.data.flow.event.session.SessionInit"],
"doc": "Contains information that can be used to start an initiated flow, piggybacked on initial data messages. Will be null for messages sent to the initiator. Will be null when initiated party is confirmed to be present to ensure out of order messages that arrive first contain this info."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,6 @@
"name": "contextPlatformProperties",
"type": "net.corda.data.KeyValuePairList",
"doc": "A map of context platform properties made available to the flow which will also be propagated to sub flows, initiated flows and services"
},
{
"name": "contextSessionProperties",
"type": "net.corda.data.KeyValuePairList",
"doc": "A map of context properties to send to a counterparty related to this flow session. This contains information such as protocol name and versions supported."
},
{
"name": "payload",
"type": [
"null",
"bytes"
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,15 @@
},
"doc": "Time ([Instant]) in milliseconds when the last session event was received from a counterparty"
},
{
"name": "lastSentMessageTime",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "Time ([Instant]) in milliseconds of the last message sent to a counterparty"
},
{
"name": "counterpartyIdentity",
"type": "net.corda.data.identity.HoldingIdentity",
"doc": "Identity of the counterparty in the session."
},
{
"name": "sendAck",
"name": "requireClose",
"type": "boolean",
"doc": "True if there are messages to ack to a counterparty. False if there are no messages received that have not been acked."
"doc": "True if the user has set requireClose to be true when calling initiate flow. False otherwise."
},
{
"name": "receivedEventsState",
Expand All @@ -50,7 +42,7 @@
{
"name": "sendEventsState",
"type": "net.corda.data.flow.state.session.SessionProcessState",
"doc": "Record the sequence number of the last event sent to the counterparty. Record all events sent but not yet acknowledged by the counterparty as well as any messages to send to the counterparty."
"doc": "Record the sequence number of the last event sent to the counterparty. Record all events to be sent to the counterparty."
},
{
"name": "status",
Expand All @@ -61,7 +53,6 @@
"CREATED",
"CONFIRMED",
"CLOSING",
"WAIT_FOR_FINAL_ACK",
"CLOSED",
"ERROR"
]
Expand All @@ -74,12 +65,12 @@
"doc": "Whether the session state has already scheduled a cleanup event with the flow mapper."
},
{
"name": "counterpartySessionProperties",
"name": "sessionProperties",
"type": [
"null",
"net.corda.data.KeyValuePairList"
],
"doc": "A map of context properties received from a counterparty related to this flow session. This contains information such as protocol name and the version running."
"doc": "A map of context properties related to this flow session. This contains information such as protocol name and the version running."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ private FlowConfig() {

public static final String EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW = "event.messageResendWindow";
public static final String EXTERNAL_EVENT_MAX_RETRIES = "event.maxRetries";
public static final String SESSION_MESSAGE_RESEND_WINDOW = "session.messageResendWindow";
public static final String SESSION_HEARTBEAT_TIMEOUT_WINDOW = "session.heartbeatTimeout";
public static final String SESSION_MISSING_COUNTERPARTY_TIMEOUT_WINDOW = "session.missingCounterpartyTimeout";
public static final String SESSION_TIMEOUT_WINDOW = "session.timeout";
public static final String SESSION_P2P_TTL = "session.p2pTTL";
public static final String SESSION_FLOW_CLEANUP_TIME = "session.cleanupTime";
public static final String PROCESSING_MAX_RETRY_ATTEMPTS = "processing.maxRetryAttempts";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,12 @@
"type": "object",
"default": {},
"properties": {
"messageResendWindow": {
"description": "The length of time in milliseconds that Corda waits before resending unacknowledged flow session messages.",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
"default": 120000
},
"heartbeatTimeout": {
"description": "The length of time in milliseconds that Corda waits when no message has been received from a counterparty before causing the session to error. This should be set at least 2 times larger than session.messageResendWindow.",
"timeout": {
"description": "The length of time in milliseconds that Corda waits when no message has been received from a counterparty before causing the session to error.",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
"default": 1800000
},
"missingCounterpartyTimeout": {
"description": "The length of time in milliseconds to wait when the counterparty can't be found in a member lookup before causing the session to error",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
"default": 300000
},
"p2pTTL": {
"description": "The TTL set in milliseconds. This is added to the current time and set on messages passed to the P2P layer to send to a counterparty. Messages received with a TTL timestamp set in the past will be discarded.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ topics:
consumers:
- link-manager
- membership
- flowMapper
producers:
- rest # Dynamic Network registration
- membership # Static Network registration
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cordaProductVersion = 5.1.0
# NOTE: update this each time this module contains a breaking change
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
## a per module property in which case module versions can change independently.
cordaApiRevision = 17
cordaApiRevision = 18

# Main
kotlinVersion = 1.8.21
Expand Down

0 comments on commit 9ea9852

Please sign in to comment.