diff --git a/application/src/main/java/net/corda/v5/application/messaging/FlowMessaging.java b/application/src/main/java/net/corda/v5/application/messaging/FlowMessaging.java index 14c9db118e..b83aa6a93d 100644 --- a/application/src/main/java/net/corda/v5/application/messaging/FlowMessaging.java +++ b/application/src/main/java/net/corda/v5/application/messaging/FlowMessaging.java @@ -84,6 +84,28 @@ public interface FlowMessaging { @NotNull FlowSession initiateFlow(@NotNull MemberX500Name x500Name); + + /** + * Creates a communication session with a counterparty's {@link ResponderFlow}. Subsequently, you may send/receive using + * this session object. Note that this function does not communicate in itself. The counter-flow will be kicked off + * by the first send/receive. + *

+ * Initiated flows are initiated with context based on the context of the initiating flow at the point in time this + * method is called. The context of the initiating flow is snapshotted by the returned session. Altering the flow + * context has no effect on the context of the session after this point, and therefore it has no effect on the + * context of the initiated flow either. + * + * @param x500Name The X500 name of the member to communicate with. + * @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close() + * and the initiating party will suspend and wait to receive the message when they call FlowSession.close(). + * When set to false the session is marked as terminated immediately when close() is called. + * + * @return The session. + */ + @Suspendable + @NotNull + FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose); + /** * Creates a communication session with another member. Subsequently, you may send/receive using this session object. * Note that this function does not communicate in itself. The counter-flow will be kicked off by the first @@ -124,6 +146,49 @@ public interface FlowMessaging { @NotNull FlowSession initiateFlow(@NotNull MemberX500Name x500Name, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder); + /** + * Creates a communication session with another member. Subsequently, you may send/receive using this session object. + * Note that this function does not communicate in itself. The counter-flow will be kicked off by the first + * send/receive. + *

+ * This overload takes a builder of context properties. Any properties set or modified against the context passed to + * this builder will be propagated to initiated flows and all that flow's initiated flows and sub flows down the + * stack. The properties passed to the builder are pre-populated with the current flow context properties, see + * {@link FlowContextProperties}. Altering the current flow context has no effect on the context of the session after the + * builder is applied and the session returned by this method, and therefore it has no effect on the context of the + * initiated flow either. + *

+ * Example of use in Kotlin. + * ```Kotlin + * val flowSession = flowMessaging.initiateFlow(virtualNodeName) { flowContextProperties -> + * flowContextProperties["key"] = "value" + * } + * ``` + * Example of use in Java. + * ```Java + * FlowSession flowSession = flowMessaging.initiateFlow(virtualNodeName, (flowContextProperties) -> { + * flowContextProperties.put("key", "value"); + * }); + * ``` + * + * @param x500Name The X500 name of the member to communicate with. + * @param requireClose When set to true, the initiated party will send a close message after calling FlowSession.close() + * and the initiating party will suspend and wait to receive the message when they call FlowSession.close(). + * When set to false the session is marked as terminated immediately when close() is called. + * @param flowContextPropertiesBuilder A builder of context properties. + * + * @return The session. + * + * @throws IllegalArgumentException if the builder tries to set a property for which a platform property already + * exists or if the key is prefixed by {@link FlowContextProperties#CORDA_RESERVED_PREFIX}. See also + * {@link FlowContextProperties}. Any other + * exception thrown by the builder will also be thrown through here and should be avoided in the provided + * implementation, see {@link FlowContextPropertiesBuilder}. + */ + @Suspendable + @NotNull + FlowSession initiateFlow(@NotNull MemberX500Name x500Name, boolean requireClose, @NotNull FlowContextPropertiesBuilder flowContextPropertiesBuilder); + /** * Suspends until a message has been received for each session in the specified {@code sessions}. *

diff --git a/application/src/test/java/net/corda/v5/application/messaging/FlowMessagingJavaApiTest.java b/application/src/test/java/net/corda/v5/application/messaging/FlowMessagingJavaApiTest.java index 52220ba31d..20caf32c3b 100644 --- a/application/src/test/java/net/corda/v5/application/messaging/FlowMessagingJavaApiTest.java +++ b/application/src/test/java/net/corda/v5/application/messaging/FlowMessagingJavaApiTest.java @@ -35,4 +35,26 @@ public void initiateFlowPartyWithBuilder() { Assertions.assertThat(result).isNotNull(); Assertions.assertThat(result).isEqualTo(flowSession); } + + @Test + public void initiateFlowPartyWithBuilderRequireCloseTrue() { + final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB"); + when(flowMessaging.initiateFlow(eq(counterparty), eq(true), any())).thenReturn(flowSession); + + FlowSession result = flowMessaging.initiateFlow(counterparty, true, (contextProperties) -> contextProperties.put("key", "value")); + + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result).isEqualTo(flowSession); + } + + @Test + public void initiateFlowPartyWithBuilderRequireCloseFalse() { + final MemberX500Name counterparty = new MemberX500Name("Alice Corp", "LDN", "GB"); + when(flowMessaging.initiateFlow(eq(counterparty), eq(false), any())).thenReturn(flowSession); + + FlowSession result = flowMessaging.initiateFlow(counterparty, false, (contextProperties) -> contextProperties.put("key", "value")); + + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result).isEqualTo(flowSession); + } } diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/SessionEvent.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/SessionEvent.avsc index cdfb497324..d40abd5746 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/SessionEvent.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/SessionEvent.avsc @@ -47,19 +47,6 @@ "type": "net.corda.data.identity.HoldingIdentity", "doc": "Identity of party in the session who was initiated." }, - { - "name": "receivedSequenceNum", - "type": "int", - "doc": "Sequence number of the last contiguous message received from a counterparty. 0 if no messages received." - }, - { - "name": "outOfOrderSequenceNums", - "type": { - "type": "array", - "items": "int" - }, - "doc": "The sequence numbers of events received with a value greater than the last contiguous event received. i.e out of order messages received with a value greater than the receivedSequenceNum." - }, { "name": "payload", "type": [ @@ -67,9 +54,13 @@ "net.corda.data.flow.event.session.SessionConfirm", "net.corda.data.flow.event.session.SessionData", "net.corda.data.flow.event.session.SessionClose", - "net.corda.data.flow.event.session.SessionAck", "net.corda.data.flow.event.session.SessionError" ] + }, + { + "name": "contextSessionProperties", + "type": ["null", "net.corda.data.KeyValuePairList"], + "doc": "A map of context properties received from a counterparty related to this flow session. This is static data and will be set to null when previously sent." } ] -} +} \ No newline at end of file diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionAck.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionAck.avsc deleted file mode 100644 index 6ee5191d26..0000000000 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionAck.avsc +++ /dev/null @@ -1,7 +0,0 @@ -{ - "type": "record", - "name": "SessionAck", - "doc" : "Acknowledge to another party that their session message was received.", - "namespace": "net.corda.data.flow.event.session", - "fields": [] -} diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionConfirm.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionConfirm.avsc index 70449f9dd8..d51b9f99a5 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionConfirm.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionConfirm.avsc @@ -3,11 +3,5 @@ "name": "SessionConfirm", "doc" : "Acknowledge to counterparty that the session has been confirmed and is ready to send and receive messages.", "namespace": "net.corda.data.flow.event.session", - "fields": [ - { - "name": "contextSessionProperties", - "type": "net.corda.data.KeyValuePairList", - "doc": "A map of context properties received from a counterparty related to this flow session. This contains information such as protocol name and version that this flow is running." - } - ] + "fields": [] } diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionData.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionData.avsc index 7185c830d9..2217593b80 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionData.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionData.avsc @@ -10,6 +10,11 @@ "net.corda.data.chunking.Chunk", "bytes" ] + }, + { + "name": "sessionInit", + "type": ["null", "net.corda.data.flow.event.session.SessionInit"], + "doc": "Contains information that can be used to start an initiated flow, piggybacked on initial data messages. Will be null for messages sent to the initiator. Will be null when initiated party is confirmed to be present to ensure out of order messages that arrive first contain this info." } ] } diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionInit.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionInit.avsc index 6ab146262e..522c1a8e02 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionInit.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/session/SessionInit.avsc @@ -25,18 +25,6 @@ "name": "contextPlatformProperties", "type": "net.corda.data.KeyValuePairList", "doc": "A map of context platform properties made available to the flow which will also be propagated to sub flows, initiated flows and services" - }, - { - "name": "contextSessionProperties", - "type": "net.corda.data.KeyValuePairList", - "doc": "A map of context properties to send to a counterparty related to this flow session. This contains information such as protocol name and versions supported." - }, - { - "name": "payload", - "type": [ - "null", - "bytes" - ] } ] } diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/session/SessionState.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/session/SessionState.avsc index 4b6599c03c..dea2cbcfaa 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/session/SessionState.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/state/session/SessionState.avsc @@ -24,23 +24,15 @@ }, "doc": "Time ([Instant]) in milliseconds when the last session event was received from a counterparty" }, - { - "name": "lastSentMessageTime", - "type": { - "type": "long", - "logicalType": "timestamp-millis" - }, - "doc": "Time ([Instant]) in milliseconds of the last message sent to a counterparty" - }, { "name": "counterpartyIdentity", "type": "net.corda.data.identity.HoldingIdentity", "doc": "Identity of the counterparty in the session." }, { - "name": "sendAck", + "name": "requireClose", "type": "boolean", - "doc": "True if there are messages to ack to a counterparty. False if there are no messages received that have not been acked." + "doc": "True if the user has set requireClose to be true when calling initiate flow. False otherwise." }, { "name": "receivedEventsState", @@ -50,7 +42,7 @@ { "name": "sendEventsState", "type": "net.corda.data.flow.state.session.SessionProcessState", - "doc": "Record the sequence number of the last event sent to the counterparty. Record all events sent but not yet acknowledged by the counterparty as well as any messages to send to the counterparty." + "doc": "Record the sequence number of the last event sent to the counterparty. Record all events to be sent to the counterparty." }, { "name": "status", @@ -61,7 +53,6 @@ "CREATED", "CONFIRMED", "CLOSING", - "WAIT_FOR_FINAL_ACK", "CLOSED", "ERROR" ] @@ -74,12 +65,12 @@ "doc": "Whether the session state has already scheduled a cleanup event with the flow mapper." }, { - "name": "counterpartySessionProperties", + "name": "sessionProperties", "type": [ "null", "net.corda.data.KeyValuePairList" ], - "doc": "A map of context properties received from a counterparty related to this flow session. This contains information such as protocol name and the version running." + "doc": "A map of context properties related to this flow session. This contains information such as protocol name and the version running." } ] } diff --git a/data/config-schema/src/main/java/net/corda/schema/configuration/FlowConfig.java b/data/config-schema/src/main/java/net/corda/schema/configuration/FlowConfig.java index 5389e1b9a8..57359f35ab 100644 --- a/data/config-schema/src/main/java/net/corda/schema/configuration/FlowConfig.java +++ b/data/config-schema/src/main/java/net/corda/schema/configuration/FlowConfig.java @@ -6,9 +6,7 @@ private FlowConfig() { public static final String EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW = "event.messageResendWindow"; public static final String EXTERNAL_EVENT_MAX_RETRIES = "event.maxRetries"; - public static final String SESSION_MESSAGE_RESEND_WINDOW = "session.messageResendWindow"; - public static final String SESSION_HEARTBEAT_TIMEOUT_WINDOW = "session.heartbeatTimeout"; - public static final String SESSION_MISSING_COUNTERPARTY_TIMEOUT_WINDOW = "session.missingCounterpartyTimeout"; + public static final String SESSION_TIMEOUT_WINDOW = "session.timeout"; public static final String SESSION_P2P_TTL = "session.p2pTTL"; public static final String SESSION_FLOW_CLEANUP_TIME = "session.cleanupTime"; public static final String PROCESSING_MAX_RETRY_ATTEMPTS = "processing.maxRetryAttempts"; diff --git a/data/config-schema/src/main/resources/net/corda/schema/configuration/flow/1.0/corda.flow.json b/data/config-schema/src/main/resources/net/corda/schema/configuration/flow/1.0/corda.flow.json index 37beabbf67..6e88aed16e 100644 --- a/data/config-schema/src/main/resources/net/corda/schema/configuration/flow/1.0/corda.flow.json +++ b/data/config-schema/src/main/resources/net/corda/schema/configuration/flow/1.0/corda.flow.json @@ -47,26 +47,12 @@ "type": "object", "default": {}, "properties": { - "messageResendWindow": { - "description": "The length of time in milliseconds that Corda waits before resending unacknowledged flow session messages.", - "type": "integer", - "minimum": 1000, - "maximum": 2147483647, - "default": 120000 - }, - "heartbeatTimeout": { - "description": "The length of time in milliseconds that Corda waits when no message has been received from a counterparty before causing the session to error. This should be set at least 2 times larger than session.messageResendWindow.", + "timeout": { + "description": "The length of time in milliseconds that Corda waits when no message has been received from a counterparty before causing the session to error.", "type": "integer", "minimum": 1000, "maximum": 2147483647, "default": 1800000 - }, - "missingCounterpartyTimeout": { - "description": "The length of time in milliseconds to wait when the counterparty can't be found in a member lookup before causing the session to error", - "type": "integer", - "minimum": 1000, - "maximum": 2147483647, - "default": 300000 }, "p2pTTL": { "description": "The TTL set in milliseconds. This is added to the current time and set on messages passed to the P2P layer to send to a counterparty. Messages received with a TTL timestamp set in the past will be discarded.", diff --git a/data/topic-schema/src/main/resources/net/corda/schema/P2P.yaml b/data/topic-schema/src/main/resources/net/corda/schema/P2P.yaml index c334410005..b829f43389 100644 --- a/data/topic-schema/src/main/resources/net/corda/schema/P2P.yaml +++ b/data/topic-schema/src/main/resources/net/corda/schema/P2P.yaml @@ -54,6 +54,7 @@ topics: consumers: - link-manager - membership + - flowMapper producers: - rest # Dynamic Network registration - membership # Static Network registration diff --git a/gradle.properties b/gradle.properties index 01c5e3da10..f53ffc0dc2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -9,7 +9,7 @@ cordaProductVersion = 5.1.0 # NOTE: update this each time this module contains a breaking change ## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to ## a per module property in which case module versions can change independently. -cordaApiRevision = 17 +cordaApiRevision = 18 # Main kotlinVersion = 1.8.21