Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-15757 flow session refactor #1208

Merged
merged 20 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ed1fb37
CORE-15747 Initial schema changes to support initial session refactor
LWogan Aug 10, 2023
7aea712
CORE-15747 Initial schema changes to support initial session refactor
LWogan Aug 11, 2023
27b8691
CORE-15747 Initial schema changes to support initial session refactor
LWogan Aug 11, 2023
f9b77f2
CORE-15747 Reinstate sessioninit as it wil be used for counterparty f…
LWogan Aug 14, 2023
cf4ffc9
CORE-15747 make SessionInit nullable
LWogan Aug 14, 2023
0f203e2
CORE-15747 boolean not Boolean
LWogan Aug 14, 2023
e187310
Merge remote-tracking branch 'origin/release/os/5.1' into feature/COR…
LWogan Aug 16, 2023
a32163a
Merge remote-tracking branch 'origin/release/os/5.1' into feature/COR…
LWogan Aug 17, 2023
c8e060d
CORE-14523 flow worker now consumes from p2p locally hosted identity …
LWogan Aug 17, 2023
1d90b7a
Merge remote-tracking branch 'origin/release/os/5.1' into feature/COR…
LWogan Aug 23, 2023
bba5dd3
Merge remote-tracking branch 'origin/release/os/5.1' into feature/COR…
LWogan Aug 29, 2023
31810fb
CORE-16174 move session context props from SessionInit to SessionEven…
LWogan Aug 30, 2023
750f37a
Merge remote-tracking branch 'origin/release/os/5.1' into feature/COR…
LWogan Sep 1, 2023
b33ddf7
CORE-15757 reenable tests
LWogan Sep 1, 2023
31538d3
CORE-15757 add initiate flow api to preserve backwards compatability
LWogan Sep 1, 2023
b5bf94d
CORE-15757 add initiate flow api to preserve backwards compatability
LWogan Sep 1, 2023
164bd3c
CORE-15757 add initiate flow api to preserve backwards compatability
LWogan Sep 1, 2023
5c6ed56
CORE-15757 fix wildcard import
LWogan Sep 1, 2023
8be8e48
CORE-15757 update after merge
LWogan Sep 1, 2023
1b57846
CORE-15757 update after merge
LWogan Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name);


/**
* Creates a communication session with a counterparty's {@link ResponderFlow}. Subsequently, you may send/receive using
* this session object. Note that this function does not communicate in itself. The counter-flow will be kicked off
* by the first send/receive.
* <p>
* Initiated flows are initiated with context based on the context of the initiating flow at the point in time this
* method is called. The context of the initiating flow is snapshotted by the returned session. Altering the flow
* context has no effect on the context of the session after this point, and therefore it has no effect on the
* context of the initiated flow either.
*
* @param x500Name The X500 name of the member to communicate with.
* @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
*
* @return The session.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
Expand Down Expand Up @@ -124,6 +146,49 @@ public interface FlowMessaging {
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder);

/**
* Creates a communication session with another member. Subsequently, you may send/receive using this session object.
* Note that this function does not communicate in itself. The counter-flow will be kicked off by the first
* send/receive.
* <p>
* This overload takes a builder of context properties. Any properties set or modified against the context passed to
* this builder will be propagated to initiated flows and all that flow's initiated flows and sub flows down the
* stack. The properties passed to the builder are pre-populated with the current flow context properties, see
* {@link FlowContextProperties}. Altering the current flow context has no effect on the context of the session after the
* builder is applied and the session returned by this method, and therefore it has no effect on the context of the
* initiated flow either.
* <p>
* Example of use in Kotlin.
* ```Kotlin
* val flowSession = flowMessaging.initiateFlow(virtualNodeName) { flowContextProperties ->
* flowContextProperties["key"] = "value"
* }
* ```
* Example of use in Java.
* ```Java
* FlowSession flowSession = flowMessaging.initiateFlow(virtualNodeName, (flowContextProperties) -> {
* flowContextProperties.put("key", "value");
* });
* ```
*
* @param x500Name The X500 name of the member to communicate with.
* @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close()
* and the initiating party will suspend and wait to receive the message when they call FlowSession.close().
* When set to false the session is marked as terminated immediately when close() is called.
* @param flowContextPropertiesBuilder A builder of context properties.
*
* @return The session.
*
* @throws IllegalArgumentException if the builder tries to set a property for which a platform property already
* exists or if the key is prefixed by {@link FlowContextProperties#CORDA_RESERVED_PREFIX}. See also
* {@link FlowContextProperties}. Any other
* exception thrown by the builder will also be thrown through here and should be avoided in the provided
* implementation, see {@link FlowContextPropertiesBuilder}.
*/
@Suspendable
@NotNull
FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder);

/**
* Suspends until a message has been received for each session in the specified {@code sessions}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,26 @@ public void initiateFlowPartyWithBuilder() {
Assertions.assertThat(result).isNotNull();
Assertions.assertThat(result).isEqualTo(flowSession);
}

@Test
public void initiateFlowPartyWithBuilderRequireCloseTrue() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any())).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, true, (contextProperties) -> contextProperties.put("key", "value"));

Assertions.assertThat(result).isNotNull();
Assertions.assertThat(result).isEqualTo(flowSession);
}

@Test
public void initiateFlowPartyWithBuilderRequireCloseFalse() {
final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB");
when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any())).thenReturn(flowSession);

FlowSession result = flowMessaging.initiateFlow(counterparty, false, (contextProperties) -> contextProperties.put("key", "value"));

Assertions.assertThat(result).isNotNull();
Assertions.assertThat(result).isEqualTo(flowSession);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,20 @@
"type": "net.corda.data.identity.HoldingIdentity",
"doc": "Identity of party in the session who was initiated."
},
{
"name": "receivedSequenceNum",
"type": "int",
"doc": "Sequence number of the last contiguous message received from a counterparty. 0 if no messages received."
},
{
"name": "outOfOrderSequenceNums",
"type": {
"type": "array",
"items": "int"
},
"doc": "The sequence numbers of events received with a value greater than the last contiguous event received. i.e out of order messages received with a value greater than the receivedSequenceNum."
},
{
"name": "payload",
"type": [
"net.corda.data.flow.event.session.SessionInit",
"net.corda.data.flow.event.session.SessionConfirm",
"net.corda.data.flow.event.session.SessionData",
"net.corda.data.flow.event.session.SessionClose",
"net.corda.data.flow.event.session.SessionAck",
"net.corda.data.flow.event.session.SessionError"
]
},
{
"name": "contextSessionProperties",
"type": ["null", "net.corda.data.KeyValuePairList"],
"doc": "A map of context properties received from a counterparty related to this flow session. This is static data and will be set to null when previously sent."
}
]
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,5 @@
"name": "SessionConfirm",
"doc" : "Acknowledge to counterparty that the session has been confirmed and is ready to send and receive messages.",
"namespace": "net.corda.data.flow.event.session",
"fields": [
{
"name": "contextSessionProperties",
"type": "net.corda.data.KeyValuePairList",
"doc": "A map of context properties received from a counterparty related to this flow session. This contains information such as protocol name and version that this flow is running."
}
]
"fields": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
"net.corda.data.chunking.Chunk",
"bytes"
]
},
{
"name": "sessionInit",
"type": ["null", "net.corda.data.flow.event.session.SessionInit"],
"doc": "Contains information that can be used to start an initiated flow, piggybacked on initial data messages. Will be null for messages sent to the initiator. Will be null when initiated party is confirmed to be present to ensure out of order messages that arrive first contain this info."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,6 @@
"name": "contextPlatformProperties",
"type": "net.corda.data.KeyValuePairList",
"doc": "A map of context platform properties made available to the flow which will also be propagated to sub flows, initiated flows and services"
},
{
"name": "contextSessionProperties",
"type": "net.corda.data.KeyValuePairList",
"doc": "A map of context properties to send to a counterparty related to this flow session. This contains information such as protocol name and versions supported."
},
{
"name": "payload",
"type": [
"null",
"bytes"
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,15 @@
},
"doc": "Time ([Instant]) in milliseconds when the last session event was received from a counterparty"
},
{
"name": "lastSentMessageTime",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "Time ([Instant]) in milliseconds of the last message sent to a counterparty"
},
{
"name": "counterpartyIdentity",
"type": "net.corda.data.identity.HoldingIdentity",
"doc": "Identity of the counterparty in the session."
},
{
"name": "sendAck",
"name": "requireClose",
"type": "boolean",
"doc": "True if there are messages to ack to a counterparty. False if there are no messages received that have not been acked."
"doc": "True if the user has set requireClose to be true when calling initiate flow. False otherwise."
},
{
"name": "receivedEventsState",
Expand All @@ -50,7 +42,7 @@
{
"name": "sendEventsState",
"type": "net.corda.data.flow.state.session.SessionProcessState",
"doc": "Record the sequence number of the last event sent to the counterparty. Record all events sent but not yet acknowledged by the counterparty as well as any messages to send to the counterparty."
"doc": "Record the sequence number of the last event sent to the counterparty. Record all events to be sent to the counterparty."
},
{
"name": "status",
Expand All @@ -61,7 +53,6 @@
"CREATED",
"CONFIRMED",
"CLOSING",
"WAIT_FOR_FINAL_ACK",
"CLOSED",
"ERROR"
]
Expand All @@ -74,12 +65,12 @@
"doc": "Whether the session state has already scheduled a cleanup event with the flow mapper."
},
{
"name": "counterpartySessionProperties",
"name": "sessionProperties",
"type": [
"null",
"net.corda.data.KeyValuePairList"
],
"doc": "A map of context properties received from a counterparty related to this flow session. This contains information such as protocol name and the version running."
"doc": "A map of context properties related to this flow session. This contains information such as protocol name and the version running."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ private FlowConfig() {

public static final String EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW = "event.messageResendWindow";
public static final String EXTERNAL_EVENT_MAX_RETRIES = "event.maxRetries";
public static final String SESSION_MESSAGE_RESEND_WINDOW = "session.messageResendWindow";
public static final String SESSION_HEARTBEAT_TIMEOUT_WINDOW = "session.heartbeatTimeout";
public static final String SESSION_MISSING_COUNTERPARTY_TIMEOUT_WINDOW = "session.missingCounterpartyTimeout";
public static final String SESSION_TIMEOUT_WINDOW = "session.timeout";
public static final String SESSION_P2P_TTL = "session.p2pTTL";
public static final String SESSION_FLOW_CLEANUP_TIME = "session.cleanupTime";
public static final String PROCESSING_MAX_RETRY_ATTEMPTS = "processing.maxRetryAttempts";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,12 @@
"type": "object",
"default": {},
"properties": {
"messageResendWindow": {
"description": "The length of time in milliseconds that Corda waits before resending unacknowledged flow session messages.",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
"default": 120000
},
"heartbeatTimeout": {
"description": "The length of time in milliseconds that Corda waits when no message has been received from a counterparty before causing the session to error. This should be set at least 2 times larger than session.messageResendWindow.",
"timeout": {
"description": "The length of time in milliseconds that Corda waits when no message has been received from a counterparty before causing the session to error.",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
"default": 1800000
},
"missingCounterpartyTimeout": {
"description": "The length of time in milliseconds to wait when the counterparty can't be found in a member lookup before causing the session to error",
"type": "integer",
"minimum": 1000,
"maximum": 2147483647,
"default": 300000
},
"p2pTTL": {
"description": "The TTL set in milliseconds. This is added to the current time and set on messages passed to the P2P layer to send to a counterparty. Messages received with a TTL timestamp set in the past will be discarded.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ topics:
consumers:
- link-manager
- membership
- flow
producers:
- rest # Dynamic Network registration
- membership # Static Network registration
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cordaProductVersion = 5.1.0
# NOTE: update this each time this module contains a breaking change
## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to
## a per module property in which case module versions can change independently.
cordaApiRevision = 17
cordaApiRevision = 18

# Main
kotlinVersion = 1.8.21
Expand Down