From f1827d8b46703f1c5ff05d21b34692d3122c9a04 Mon Sep 17 00:00:00 2001 From: Dakshit Babbar <100972343+DakshitBabbar@users.noreply.github.com> Date: Fri, 25 Oct 2024 07:13:49 +0530 Subject: [PATCH] Enabling Unclean Session Publish Re-Transmits (#308) Description ----------- This PR enables the coreMQTT library to resend unacked publishes on an unclean session connection. Following is a brief summary of changes: 1. Add a new API `MQTT_InitRetransmits` that will initialise the context to handle publish retransmits on an unclean session connection 2. Add signatures of callback function pointers that users will define in order to: a. copy and store outgoing publishes b. retrieve copied publish on an unclean session connection to resend c. clear a copied publish when a `PUBACK`/`PUBREC` is received d. clear all copied publishes on a clean session connection 3. Update the API's to check if callback's are defined and implement resend publishes as required. Following are the specifics of the changes: 1. Add 3 new MQTTStatus_t values: MQTTPublishStoreFailed, MQTTPublishRetrieveFailed and MQTTPublishClearAllFailed 2. Update `MQTTContext_t` to hold the callback function pointers a. `MQTTRetransmitStorePacket storeFunction` b. `MQTTRetransmitRetrievePacket retrieveFunction` c. `MQTTRetransmitClearPacket clearFunction` d. `MQTTRetransmitClearAllPackets clearAllFunction` 3. Update the `MQTT_Status_strerror` function to handle the new `MQTTStatus_t` values 4. Add a new API function `MQTT_InitRetransmits` that will initialise the new callback functions in the `MQTTContext_t` 5. Add this API to the core_mqtt.h file to make it available to users 6. Modify `MQTT_Publish` a. copy the outgoing publish packet in form of an array of `TransportOutVector_t` if the callback if defined b. if copy fails then bubble up corresponding error status code 7. Modify `MQTT_ReceiveLoop` a. on receiving a `PUBACK`/`PUBREC` clear the copy of that particular publish after the state of the publish record has been successfully updated, if the callback if defined 8. Modify `MQTT_Connect` a. on a clean session clear all the copies of publishes stored if the callback is defined b. if clear all fails then bubble up corresponding error status code c. on an unclean session get the packetID of the unacked publishes and retrieve the copies of those if the callback is defined d. if retrieve fails then bubble up corresponding error status code Approaches Taken --------------- - To let user know about the changes we have made we will add them to a changelog and have a minor version bump - To be in line with the zero copy principle in our library we chose to provide and retrieve the publish packets for storing and resending in form of an array of `TransportOutVector_t` - Code is written in a way that on receiving a `PUBACK`/`PUBREC` the copy will be cleared after the state of the publish record is changed so that if state update fails the copy won't be cleared. Otherwise if the state does not change and the copy is cleared then when a connection is made with an unclean session there will be a retrieve fail as the system is in an inconsistent state. - We are storing the copies of the publishes with the Duplicate flag set this is because on retrieving the packet we will get it in the form of a `TransportOutVector_t` that holds the data in a `const` pointer which cannot be changed after retrieving. Pending Tasks --------------- - [ ] Changelog - [ ] Minor version bump - [x] Doxygen example for the new API - [x] Better API Names - [x] Unit Test Updates - [x] CBMC Proof --------- Co-authored-by: Dakshit Babbar Co-authored-by: GitHub Action Co-authored-by: AniruddhaKanhere <60444055+AniruddhaKanhere@users.noreply.github.com> --- .github/.cSpellWords.txt | 2 + docs/doxygen/include/size_table.md | 12 +- source/core_mqtt.c | 207 ++++- source/core_mqtt_serializer.c | 32 + source/include/core_mqtt.h | 212 +++++ source/include/core_mqtt_serializer.h | 51 +- test/unit-test/core_mqtt_serializer_utest.c | 40 + test/unit-test/core_mqtt_utest.c | 912 +++++++++++++++++++- 8 files changed, 1391 insertions(+), 77 deletions(-) diff --git a/.github/.cSpellWords.txt b/.github/.cSpellWords.txt index d2ba4f174..71edb8704 100644 --- a/.github/.cSpellWords.txt +++ b/.github/.cSpellWords.txt @@ -20,6 +20,7 @@ DLIBRARY DNDEBUG DUNITTEST DUNITY +getbytesinmqttvec getpacketid isystem lcov @@ -34,6 +35,7 @@ NONDET pylint pytest pyyaml +serializemqttvec sinclude UNACKED unpadded diff --git a/docs/doxygen/include/size_table.md b/docs/doxygen/include/size_table.md index 1fa92ff64..5f654632f 100644 --- a/docs/doxygen/include/size_table.md +++ b/docs/doxygen/include/size_table.md @@ -9,8 +9,8 @@ core_mqtt.c -
4.4K
-
3.8K
+
4.9K
+
4.2K
core_mqtt_state.c @@ -19,12 +19,12 @@ core_mqtt_serializer.c -
2.8K
-
2.2K
+
2.9K
+
2.3K
Total estimates -
8.9K
-
7.3K
+
9.5K
+
7.8K
diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 23324974d..0e1180f5b 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -90,6 +90,12 @@ */ #define CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ( 2U ) +struct MQTTVec +{ + TransportOutVector_t * pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */ + size_t vectorLen; /**< Length of the transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */ +}; + /*-----------------------------------------------------------*/ /** @@ -444,8 +450,10 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext ); * @brief Clears existing state records for a clean session. * * @param[in] pContext Initialized MQTT context. + * + * @return #MQTTSuccess always otherwise. */ -static void handleCleanSession( MQTTContext_t * pContext ); +static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext ); /** * @brief Send the publish packet without copying the topic string and payload in @@ -463,7 +471,7 @@ static void handleCleanSession( MQTTContext_t * pContext ); */ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, const MQTTPublishInfo_t * pPublishInfo, - const uint8_t * pMqttHeader, + uint8_t * pMqttHeader, size_t headerSize, uint16_t packetId ); @@ -1597,6 +1605,15 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext, } } + if( ( ackType == MQTTPuback ) || ( ackType == MQTTPubrec ) ) + { + if( ( status == MQTTSuccess ) && + ( pContext->clearFunction != NULL ) ) + { + pContext->clearFunction( pContext, packetIdentifier ); + } + } + if( status == MQTTSuccess ) { /* Set fields of deserialized struct. */ @@ -2133,13 +2150,14 @@ static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext, static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, const MQTTPublishInfo_t * pPublishInfo, - const uint8_t * pMqttHeader, + uint8_t * pMqttHeader, size_t headerSize, uint16_t packetId ) { MQTTStatus_t status = MQTTSuccess; size_t ioVectorLength; size_t totalMessageLength; + bool dupFlagChanged = false; /* Bytes required to encode the packet ID in an MQTT header according to * the MQTT specification. */ @@ -2190,7 +2208,42 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, totalMessageLength += pPublishInfo->payloadLength; } - if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) + /* If not already set, set the dup flag before storing a copy of the publish + * this is because on retrieving back this copy we will get it in the form of an + * array of TransportOutVector_t that holds the data in a const pointer which cannot be + * changed after retrieving. */ + if( pPublishInfo->dup != true ) + { + MQTT_UpdateDuplicatePublishFlag( pMqttHeader, true ); + + dupFlagChanged = true; + } + + /* store a copy of the publish for retransmission purposes */ + if( ( pPublishInfo->qos > MQTTQoS0 ) && + ( pContext->storeFunction != NULL ) ) + { + MQTTVec_t mqttVec; + + mqttVec.pVector = pIoVector; + mqttVec.vectorLen = ioVectorLength; + + if( pContext->storeFunction( pContext, packetId, &mqttVec ) != true ) + { + status = MQTTPublishStoreFailed; + } + } + + /* change the value of the dup flag to its original, if it was changed */ + if( dupFlagChanged ) + { + MQTT_UpdateDuplicatePublishFlag( pMqttHeader, false ); + + dupFlagChanged = false; + } + + if( ( status == MQTTSuccess ) && + ( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) ) { status = MQTTSendFailed; } @@ -2477,6 +2530,8 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext ) MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER; uint16_t packetId = MQTT_PACKET_ID_INVALID; MQTTPublishState_t state = MQTTStateNull; + size_t totalMessageLength; + uint8_t * pMqttPacket; assert( pContext != NULL ); @@ -2492,17 +2547,71 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext ) packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); } + if( ( status == MQTTSuccess ) && + ( pContext->retrieveFunction != NULL ) ) + { + cursor = MQTT_STATE_CURSOR_INITIALIZER; + + /* Resend all the PUBLISH for which PUBACK/PUBREC is not received + * after session is reestablished. */ + do + { + packetId = MQTT_PublishToResend( pContext, &cursor ); + + if( packetId != MQTT_PACKET_ID_INVALID ) + { + if( pContext->retrieveFunction( pContext, packetId, &pMqttPacket, &totalMessageLength ) != true ) + { + status = MQTTPublishRetrieveFailed; + break; + } + + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + if( sendBuffer( pContext, pMqttPacket, totalMessageLength ) != ( int32_t ) totalMessageLength ) + { + status = MQTTSendFailed; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); + } + } while( ( packetId != MQTT_PACKET_ID_INVALID ) && + ( status == MQTTSuccess ) ); + } + return status; } -static void handleCleanSession( MQTTContext_t * pContext ) +static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext ) { + MQTTStatus_t status = MQTTSuccess; + MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER; + uint16_t packetId = MQTT_PACKET_ID_INVALID; + assert( pContext != NULL ); /* Reset the index and clear the buffer when a new session is established. */ pContext->index = 0; ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size ); + if( pContext->clearFunction != NULL ) + { + cursor = MQTT_STATE_CURSOR_INITIALIZER; + + /* Resend all the PUBLISH for which PUBACK/PUBREC is not received + * after session is reestablished. */ + do + { + packetId = MQTT_PublishToResend( pContext, &cursor ); + + if( packetId != MQTT_PACKET_ID_INVALID ) + { + pContext->clearFunction( pContext, packetId ); + } + } while( ( packetId != MQTT_PACKET_ID_INVALID ) && + ( status == MQTTSuccess ) ); + } + if( pContext->outgoingPublishRecordMaxCount > 0U ) { /* Clear any existing records if a new session is established. */ @@ -2517,6 +2626,8 @@ static void handleCleanSession( MQTTContext_t * pContext ) 0x00, pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) ); } + + return status; } static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext, @@ -2681,6 +2792,46 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext, /*-----------------------------------------------------------*/ +MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext, + MQTTStorePacketForRetransmit storeFunction, + MQTTRetrievePacketForRetransmit retrieveFunction, + MQTTClearPacketForRetransmit clearFunction ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( pContext == NULL ) + { + LogError( ( "Argument cannot be NULL: pContext=%p\n", + ( void * ) pContext ) ); + status = MQTTBadParameter; + } + else if( storeFunction == NULL ) + { + LogError( ( "Invalid parameter: storeFunction is NULL" ) ); + status = MQTTBadParameter; + } + else if( retrieveFunction == NULL ) + { + LogError( ( "Invalid parameter: retrieveFunction is NULL" ) ); + status = MQTTBadParameter; + } + else if( clearFunction == NULL ) + { + LogError( ( "Invalid parameter: clearFunction is NULL" ) ); + status = MQTTBadParameter; + } + else + { + pContext->storeFunction = storeFunction; + pContext->retrieveFunction = retrieveFunction; + pContext->clearFunction = clearFunction; + } + + return status; +} + +/*-----------------------------------------------------------*/ + MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext, uint16_t packetId ) { @@ -2820,7 +2971,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, if( ( status == MQTTSuccess ) && ( *pSessionPresent != true ) ) { - handleCleanSession( pContext ); + status = handleCleanSession( pContext ); } if( status == MQTTSuccess ) @@ -2837,7 +2988,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) ) { - /* Resend PUBRELs when reestablishing a session */ + /* Resend PUBRELs and PUBLISHES when reestablishing a session */ status = handleUncleanSessionResumption( pContext ); } @@ -3560,6 +3711,14 @@ const char * MQTT_Status_strerror( MQTTStatus_t status ) str = "MQTTStatusDisconnectPending"; break; + case MQTTPublishStoreFailed: + str = "MQTTPublishStoreFailed"; + break; + + case MQTTPublishRetrieveFailed: + str = "MQTTPublishRetrieveFailed"; + break; + default: str = "Invalid MQTT Status code"; break; @@ -3569,3 +3728,37 @@ const char * MQTT_Status_strerror( MQTTStatus_t status ) } /*-----------------------------------------------------------*/ + +size_t MQTT_GetBytesInMQTTVec( MQTTVec_t * pVec ) +{ + size_t memoryRequired = 0; + size_t i; + TransportOutVector_t * pTransportVec = pVec->pVector; + size_t vecLen = pVec->vectorLen; + + for( i = 0; i < vecLen; i++ ) + { + memoryRequired += pTransportVec[ i ].iov_len; + } + + return memoryRequired; +} + +/*-----------------------------------------------------------*/ + +void MQTT_SerializeMQTTVec( uint8_t * pAllocatedMem, + MQTTVec_t * pVec ) +{ + TransportOutVector_t * pTransportVec = pVec->pVector; + const size_t vecLen = pVec->vectorLen; + size_t index = 0; + size_t i = 0; + + for( i = 0; i < vecLen; i++ ) + { + memcpy( &pAllocatedMem[ index ], pTransportVec[ i ].iov_base, pTransportVec[ i ].iov_len ); + index += pTransportVec[ i ].iov_len; + } +} + +/*-----------------------------------------------------------*/ diff --git a/source/core_mqtt_serializer.c b/source/core_mqtt_serializer.c index 97022034c..458e9c770 100644 --- a/source/core_mqtt_serializer.c +++ b/source/core_mqtt_serializer.c @@ -101,6 +101,11 @@ */ #define UINT8_SET_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) | ( 0x01U << ( position ) ) ) ) +/** + * @brief Clear a bit in an 8-bit unsigned integer. + */ +#define UINT8_CLEAR_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) & ( ~( 0x01U << ( position ) ) ) ) ) + /** * @brief Macro for checking if a bit is set in a 1-byte unsigned int. * @@ -2623,6 +2628,33 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc, /*-----------------------------------------------------------*/ +MQTTStatus_t MQTT_UpdateDuplicatePublishFlag( uint8_t * pHeader, + bool set ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( pHeader == NULL ) + { + status = MQTTBadParameter; + } + else if( ( ( *pHeader ) & 0xF0 ) != MQTT_PACKET_TYPE_PUBLISH ) + { + status = MQTTBadParameter; + } + else if( set == true ) + { + UINT8_SET_BIT( *pHeader, MQTT_PUBLISH_FLAG_DUP ); + } + else + { + UINT8_CLEAR_BIT( *pHeader, MQTT_PUBLISH_FLAG_DUP ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + MQTTStatus_t MQTT_ProcessIncomingPacketTypeAndLength( const uint8_t * pBuffer, const size_t * pIndex, MQTTPacketInfo_t * pIncomingPacket ) diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h index ba5219348..e7e6f77fd 100644 --- a/source/include/core_mqtt.h +++ b/source/include/core_mqtt.h @@ -64,6 +64,12 @@ struct MQTTPubAckInfo; struct MQTTContext; struct MQTTDeserializedInfo; +/** + * @ingroup mqtt_struct_types + * @brief An opaque structure provided by the library to the #MQTTStorePacketForRetransmit function when using #MQTTStorePacketForRetransmit. + */ +typedef struct MQTTVec MQTTVec_t; + /** * @ingroup mqtt_callback_types * @brief Application provided function to query the time elapsed since a given @@ -101,6 +107,59 @@ typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext, struct MQTTPacketInfo * pPacketInfo, struct MQTTDeserializedInfo * pDeserializedInfo ); +/** + * @brief User defined callback used to store outgoing publishes. Used to track any publish + * retransmit on an unclean session connection. + * + * @param[in] pContext Initialised MQTT Context. + * @param[in] packetId Outgoing publish packet identifier. + * @param[in] pMqttVec Pointer to the opaque mqtt vector structure. Users should use MQTT_SerializeMQTTVec + * and MQTT_GetBytesInMQTTVec functions to get the memory required and to serialize the + * MQTTVec_t in the provided memory respectively. + * + * @return True if the copy is successful else false. + */ +/* @[define_mqtt_retransmitstorepacket] */ +typedef bool ( * MQTTStorePacketForRetransmit)( struct MQTTContext * pContext, + uint16_t packetId, + MQTTVec_t * pMqttVec ); +/* @[define_mqtt_retransmitstorepacket] */ + +/** + * @brief User defined callback used to retreive a copied publish for resend operation. Used to + * track any publish retransmit on an unclean session connection. + * + * @param[in] pContext Initialised MQTT Context. + * @param[in] packetId Copied publish packet identifier. + * @param[out] pSerializedMqttVec Output parameter to store the pointer to the serialized MQTTVec_t + * using MQTT_SerializeMQTTVec. + * @param[out] pSerializedMqttVecLen Output parameter to return the number of bytes used to store the + * MQTTVec_t. This value should be the same as the one received from MQTT_GetBytesInMQTTVec + * when storing the packet. + * + * @return True if the retreive is successful else false. + */ +/* @[define_mqtt_retransmitretrievepacket] */ +typedef bool ( * MQTTRetrievePacketForRetransmit)( struct MQTTContext * pContext, + uint16_t packetId, + uint8_t ** pSerializedMqttVec, + size_t * pSerializedMqttVecLen ); +/* @[define_mqtt_retransmitretrievepacket] */ + +/** + * @brief User defined callback used to clear a particular copied publish packet. Used to + * track any publish retransmit on an unclean session connection. + * + * @param[in] pContext Initialised MQTT Context. + * @param[in] packetId Copied publish packet identifier. + * + * @return True if the clear is successful else false. + */ +/* @[define_mqtt_retransmitclearpacket] */ +typedef void (* MQTTClearPacketForRetransmit)( struct MQTTContext * pContext, + uint16_t packetId ); +/* @[define_mqtt_retransmitclearpacket] */ + /** * @ingroup mqtt_enum_types * @brief Values indicating if an MQTT connection exists. @@ -247,6 +306,21 @@ typedef struct MQTTContext uint16_t keepAliveIntervalSec; /**< @brief Keep Alive interval. */ uint32_t pingReqSendTimeMs; /**< @brief Timestamp of the last sent PINGREQ. */ bool waitingForPingResp; /**< @brief If the library is currently awaiting a PINGRESP. */ + + /** + * @brief User defined API used to store outgoing publishes. + */ + MQTTStorePacketForRetransmit storeFunction; + + /** + * @brief User defined API used to retreive a copied publish for resend operation. + */ + MQTTRetrievePacketForRetransmit retrieveFunction; + + /** + * @brief User defined API used to clear a particular copied publish packet. + */ + MQTTClearPacketForRetransmit clearFunction; } MQTTContext_t; /** @@ -415,6 +489,98 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext, size_t incomingPublishCount ); /* @[declare_mqtt_initstatefulqos] */ +/** + * @brief Initialize an MQTT context for publish retransmits for QoS > 0. + * + * This function must be called on an #MQTTContext_t after MQTT_InitstatefulQoS and before any other function. + * + * @param[in] pContext The context to initialize. + * @param[in] storeFunction User defined API used to store outgoing publishes. + * @param[in] retrieveFunction User defined API used to retreive a copied publish for resend operation. + * @param[in] clearFunction User defined API used to clear a particular copied publish packet. + * + * @return #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + * + * Example + * @code{c} + * + * // Function for obtaining a timestamp. + * uint32_t getTimeStampMs(); + * // Callback function for receiving packets. + * void eventCallback( + * MQTTContext_t * pContext, + * MQTTPacketInfo_t * pPacketInfo, + * MQTTDeserializedInfo_t * pDeserializedInfo + * ); + * // Network send. + * int32_t networkSend( NetworkContext_t * pContext, const void * pBuffer, size_t bytes ); + * // Network receive. + * int32_t networkRecv( NetworkContext_t * pContext, void * pBuffer, size_t bytes ); + * // User defined callback used to store outgoing publishes + * bool publishStoreCallback(struct MQTTContext* pContext, + * uint16_t packetId, + * MQTTVec_t* pIoVec); + * // User defined callback used to retreive a copied publish for resend operation + * bool publishRetrieveCallback(struct MQTTContext* pContext, + * uint16_t packetId, + * TransportOutVector_t** pIoVec, + * size_t* ioVecCount); + * // User defined callback used to clear a particular copied publish packet + * bool publishClearCallback(struct MQTTContext* pContext, + * uint16_t packetId); + * // User defined callback used to clear all copied publish packets + * bool publishClearAllCallback(struct MQTTContext* pContext); + * + * MQTTContext_t mqttContext; + * TransportInterface_t transport; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ 1024 ]; + * const size_t outgoingPublishCount = 30; + * MQTTPubAckInfo_t outgoingPublishes[ outgoingPublishCount ]; + * + * // Clear context. + * memset( ( void * ) &mqttContext, 0x00, sizeof( MQTTContext_t ) ); + * + * // Set transport interface members. + * transport.pNetworkContext = &someTransportContext; + * transport.send = networkSend; + * transport.recv = networkRecv; + * + * // Set buffer members. + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = 1024; + * + * status = MQTT_Init( &mqttContext, &transport, getTimeStampMs, eventCallback, &fixedBuffer ); + * + * if( status == MQTTSuccess ) + * { + * // We do not expect any incoming publishes in this example, therefore the incoming + * // publish pointer is NULL and the count is zero. + * status = MQTT_InitStatefulQoS( &mqttContext, outgoingPublishes, outgoingPublishCount, NULL, 0 ); + * + * // Now QoS1 and/or QoS2 publishes can be sent with this context. + * } + * + * if( status == MQTTSuccess ) + * { + * status = MQTT_InitRetransmits( &mqttContext, publishStoreCallback, + * publishRetrieveCallback, + * publishClearCallback, + * publishClearAllCallback ); + * + * // Now unacked Publishes can be resent on an unclean session resumption. + * } + * @endcode + */ + +/* @[declare_mqtt_initretransmits] */ +MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext, + MQTTStorePacketForRetransmit storeFunction, + MQTTRetrievePacketForRetransmit retrieveFunction, + MQTTClearPacketForRetransmit clearFunction ); +/* @[declare_mqtt_initretransmits] */ + /** * @brief Checks the MQTT connection status with the broker. * @@ -471,6 +637,11 @@ MQTTStatus_t MQTT_CheckConnectStatus( MQTTContext_t * pContext ); * #MQTTRecvFailed if transport receive failed for CONNACK; * #MQTTNoDataAvailable if no data available to receive in transport until * the @p timeoutMs for CONNACK; + * #MQTTStatusConnected if the connection is already established + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API + * MQTTPublishRetrieveFailed if on an unclean session connection, the copied + * publishes are not retrieved successfully for retransmission * #MQTTSuccess otherwise. * * @note This API may spend more time than provided in the timeoutMS parameters in @@ -559,6 +730,9 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, * hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; + * #MQTTStatusNotConnected if the connection is not established yet + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API * #MQTTSuccess otherwise. * * Example @@ -612,6 +786,11 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; + * #MQTTStatusNotConnected if the connection is not established yet + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API + * #MQTTPublishStoreFailed if the user provided callback to copy and store the + * outgoing publish packet fails * #MQTTSuccess otherwise. * * Example @@ -679,6 +858,9 @@ MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext, * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; + * #MQTTStatusNotConnected if the connection is not established yet + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API * #MQTTSuccess otherwise. */ /* @[declare_mqtt_ping] */ @@ -698,6 +880,9 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ); * hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; + * #MQTTStatusNotConnected if the connection is not established yet + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API * #MQTTSuccess otherwise. * * Example @@ -749,6 +934,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, * hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport send failed; + * #MQTTStatusNotConnected if the connection is already disconnected * #MQTTSuccess otherwise. */ /* @[declare_mqtt_disconnect] */ @@ -783,6 +969,10 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ); * invalid transition for the internal state machine; * #MQTTNeedMoreBytes if MQTT_ProcessLoop has received * incomplete data; it should be called again (probably after a delay); + * #MQTTStatusNotConnected if the connection is not established yet and a PING + * or an ACK is being sent. + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API * #MQTTSuccess on success. * * Example @@ -1037,6 +1227,28 @@ MQTTStatus_t MQTT_GetSubAckStatusCodes( const MQTTPacketInfo_t * pSubackPacket, const char * MQTT_Status_strerror( MQTTStatus_t status ); /* @[declare_mqtt_status_strerror] */ +/** + * @brief Get the bytes in a #MQTTVec pointer which can store the whole array as a an MQTT packet when calling MQTT_SerializeMQTTVec( void * pAllocatedMem, MQTTVec_t *pVec ) function. + * + * @param[in] pVec The #MQTTVec pointer. + * + * @return The bytes in the provided #MQTTVec array which can then be used to set aside memory to be used with MQTT_SerializeMQTTVec( void * pAllocatedMem, MQTTVec_t *pVec ) function. + */ +/* @[declare_mqtt_getbytesinmqttvec] */ +size_t MQTT_GetBytesInMQTTVec( MQTTVec_t * pVec ); +/* @[declare_mqtt_getbytesinmqttvec] */ + +/** + * @brief Serialize the bytes in an array of #MQTTVec in the provided \p pAllocatedMem + * + * @param[in] pAllocatedMem Memory in which to serialize the data in the #MQTTVec array. It must be of size provided by MQTT_GetBytesInMQTTVec( MQTTVec_t *pVec ). + * @param[in] pVec The #MQTTVec pointer. + */ +/* @[declare_mqtt_serializemqttvec] */ +void MQTT_SerializeMQTTVec( uint8_t * pAllocatedMem, + MQTTVec_t * pVec ); +/* @[declare_mqtt_serializemqttvec] */ + /* *INDENT-OFF* */ #ifdef __cplusplus } diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index 86b3c6396..92c3ceeae 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -85,23 +85,27 @@ struct MQTTPacketInfo; */ typedef enum MQTTStatus { - MQTTSuccess = 0, /**< Function completed successfully. */ - MQTTBadParameter, /**< At least one parameter was invalid. */ - MQTTNoMemory, /**< A provided buffer was too small. */ - MQTTSendFailed, /**< The transport send function failed. */ - MQTTRecvFailed, /**< The transport receive function failed. */ - MQTTBadResponse, /**< An invalid packet was received from the server. */ - MQTTServerRefused, /**< The server refused a CONNECT or SUBSCRIBE. */ - MQTTNoDataAvailable, /**< No data available from the transport interface. */ - MQTTIllegalState, /**< An illegal state in the state record. */ - MQTTStateCollision, /**< A collision with an existing state record entry. */ - MQTTKeepAliveTimeout, /**< Timeout while waiting for PINGRESP. */ - MQTTNeedMoreBytes, /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received - incomplete data; it should be called again (probably after - a delay). */ - MQTTStatusConnected, /**< MQTT connection is established with the broker */ - MQTTStatusNotConnected, /**< MQTT connection is not established with the broker */ - MQTTStatusDisconnectPending /**< Transport Interface has failed and MQTT connection needs to be closed */ + MQTTSuccess = 0, /**< Function completed successfully. */ + MQTTBadParameter, /**< At least one parameter was invalid. */ + MQTTNoMemory, /**< A provided buffer was too small. */ + MQTTSendFailed, /**< The transport send function failed. */ + MQTTRecvFailed, /**< The transport receive function failed. */ + MQTTBadResponse, /**< An invalid packet was received from the server. */ + MQTTServerRefused, /**< The server refused a CONNECT or SUBSCRIBE. */ + MQTTNoDataAvailable, /**< No data available from the transport interface. */ + MQTTIllegalState, /**< An illegal state in the state record. */ + MQTTStateCollision, /**< A collision with an existing state record entry. */ + MQTTKeepAliveTimeout, /**< Timeout while waiting for PINGRESP. */ + MQTTNeedMoreBytes, /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received + incomplete data; it should be called again (probably after + a delay). */ + MQTTStatusConnected, /**< MQTT connection is established with the broker. */ + MQTTStatusNotConnected, /**< MQTT connection is not established with the broker. */ + MQTTStatusDisconnectPending, /**< Transport Interface has failed and MQTT connection needs to be closed. */ + MQTTPublishStoreFailed, /**< User provided API to store a copy of outgoing publish for retransmission purposes, + has failed. */ + MQTTPublishRetrieveFailed /**< User provided API to retrieve the copy of a publish while reconnecting + with an unclean session has failed. */ } MQTTStatus_t; /** @@ -1220,6 +1224,19 @@ MQTTStatus_t MQTT_ProcessIncomingPacketTypeAndLength( const uint8_t * pBuffer, MQTTPacketInfo_t * pIncomingPacket ); /* @[declare_mqtt_processincomingpackettypeandlength] */ +/** + * @brief Update the duplicate publish flag within the given header of the publish packet. + * + * @param[in] pHeader The buffer holding the header content + * @param[in] set If true then the flag will be set else cleared + * + * @return #MQTTSuccess on successful setting of the duplicate flag, + * #MQTTBadParameter for invalid parameters + */ + /* @[declare_mqtt_updateduplicatepublishflag] */ +MQTTStatus_t MQTT_UpdateDuplicatePublishFlag( uint8_t * pHeader , bool set); +/* @[declare_mqtt_updateduplicatepublishflag] */ + /** * @fn uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, size_t remainingLength ); * @brief Serialize the fixed part of the connect packet header. diff --git a/test/unit-test/core_mqtt_serializer_utest.c b/test/unit-test/core_mqtt_serializer_utest.c index e91aae72a..c359e296c 100644 --- a/test/unit-test/core_mqtt_serializer_utest.c +++ b/test/unit-test/core_mqtt_serializer_utest.c @@ -2878,4 +2878,44 @@ void test_MQTT_SerializeDisconnect_Happy_Path() TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); } +/* ================== Testing MQTT_UpdateDuplicatePublishFlag ===================== */ + +/** + * @brief Call MQTT_UpdateDuplicatePublishFlag using a NULL pHeader and a header that does + * not come from a publish packet, in order to receive MQTTBadParameter errors. + */ +void test_MQTT_UpdateDuplicatePublishFlag_Invalid_Params() +{ + MQTTStatus_t mqttStatus = MQTTSuccess; + uint8_t pHeader = MQTT_PACKET_TYPE_SUBSCRIBE; + + /* Test NULL pHeader. */ + mqttStatus = MQTT_UpdateDuplicatePublishFlag( NULL, true ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); + + /* Test a non-publish header. */ + mqttStatus = MQTT_UpdateDuplicatePublishFlag( &pHeader, true ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); +} + +/** + * @brief This method calls MQTT_UpdateDuplicatePublishFlag successfully in order to + * get full coverage on the method. + */ +void test_MQTT_UpdateDuplicatePublishFlag_Happy_Path() +{ + MQTTStatus_t mqttStatus = MQTTSuccess; + uint8_t pHeader = MQTT_PACKET_TYPE_PUBLISH; + + /* Test to set the flag. */ + mqttStatus = MQTT_UpdateDuplicatePublishFlag( &pHeader, true ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + TEST_ASSERT_NOT_EQUAL_INT( ( pHeader ) & ( 0x01U << ( 3 ) ), 0 ); + + /* Test to clear the flag. */ + mqttStatus = MQTT_UpdateDuplicatePublishFlag( &pHeader, false ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + TEST_ASSERT_EQUAL_INT( ( pHeader ) & ( 0x01U << ( 3 ) ), 0 ); +} + /* ========================================================================== */ diff --git a/test/unit-test/core_mqtt_utest.c b/test/unit-test/core_mqtt_utest.c index 131f255df..e4346b436 100644 --- a/test/unit-test/core_mqtt_utest.c +++ b/test/unit-test/core_mqtt_utest.c @@ -184,6 +184,15 @@ typedef struct ProcessLoopReturns uint32_t timeoutMs; /**< @brief The timeout value to call MQTT_ProcessLoop API with. */ } ProcessLoopReturns_t; +/** + * @brief An opaque structure provided by the library to the #MQTTStorePacketForRetransmit function when using #MQTTStorePacketForRetransmit. + */ +struct MQTTVec +{ + TransportOutVector_t * pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */ + size_t vectorLen; /**< Length of the transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */ +}; + /** * @brief The packet type to be received by the process loop. * IMPORTANT: Make sure this is set before calling expectProcessLoopCalls(...). @@ -208,6 +217,16 @@ static uint32_t globalEntryTime = 0; */ static uint8_t mqttBuffer[ MQTT_TEST_BUFFER_LENGTH ] = { 0 }; +/** + * @brief A static buffer used by the MQTT library for storing publishes for retransmiting purpose. + */ +static uint8_t * publishCopyBuffer = NULL; + +/** + * @brief Size of the publishCopyBuffer array + */ +static size_t publishCopyBufferSize = 0; + /** * @brief A flag to indicate whether event callback is called from * MQTT_ProcessLoop. @@ -396,6 +415,146 @@ static int32_t transportWritevSuccess( NetworkContext_t * pNetworkContext, return bytesToWrite; } +/** + * @brief Mocked successful publish store function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if store is successful else false + */ +bool publishStoreCallbackSuccess( struct MQTTContext * pContext, + uint16_t packetId, + MQTTVec_t * pMqttVec ) +{ + ( void ) pContext; + ( void ) packetId; + ( void ) pMqttVec; + + return true; +} + +/** + * @brief Mocked failed publish store function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if store is successful else false + */ +bool publishStoreCallbackFailed( struct MQTTContext * pContext, + uint16_t packetId, + MQTTVec_t * pMqttVec ) +{ + ( void ) pContext; + ( void ) packetId; + ( void ) pMqttVec; + + return false; +} + +/** + * @brief Mocked successful publish retrieve function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if retrieve is successful else false + */ +bool publishRetrieveCallbackSuccess( struct MQTTContext * pContext, + uint16_t packetId, + uint8_t ** pPacket, + size_t * pPacketSize ) +{ + ( void ) pContext; + ( void ) packetId; + + *pPacket = publishCopyBuffer; + *pPacketSize = publishCopyBufferSize; + + return true; +} + +/** + * @brief Mocked publish retrieve function. Succeeds once then fails + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if retrieve is successful else false + */ +bool publishRetrieveCallbackSuccessThenFail( struct MQTTContext * pContext, + uint16_t packetId, + uint8_t ** pPacket, + size_t * pPacketSize ) +{ + ( void ) pContext; + ( void ) packetId; + + bool ret = true; + static int count = 0; + + *pPacket = publishCopyBuffer; + *pPacketSize = publishCopyBufferSize; + + if( count != 0 ) + { + count = 0; + ret = false; + } + + count++; + + return ret; +} + +/** + * @brief Mocked failed publish retrieve function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if retrieve is successful else false + */ +bool publishRetrieveCallbackFailed( struct MQTTContext * pContext, + uint16_t packetId, + uint8_t ** pPacket, + size_t * pPacketSize ) +{ + ( void ) pContext; + ( void ) packetId; + ( void ) pPacket; + ( void ) pPacketSize; + + return false; +} + +/** + * @brief Mocked publish clear function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * + * @return true if clear is successful else false + */ +void publishClearCallback( struct MQTTContext * pContext, + uint16_t packetId ) +{ + ( void ) pContext; + ( void ) packetId; +} + + static void verifyEncodedTopicString( TransportOutVector_t * pIoVectorIterator, char * pTopicFilter, size_t topicFilterLength, @@ -737,6 +896,30 @@ static int32_t transportSendSucceedThenFail( NetworkContext_t * pNetworkContext, return retVal; } +/** + * @brief Mocked transport send that succeeds when sending connect then fails after that. + */ +static int32_t transportSendSucceedThenFailAfterConnect( NetworkContext_t * pNetworkContext, + const void * pMessage, + size_t bytesToSend ) +{ + int32_t retVal = bytesToSend; + static int counter = 0; + + ( void ) pNetworkContext; + ( void ) pMessage; + + counter++; + + if( counter == 3 ) + { + retVal = -1; + counter = 0; + } + + return retVal; +} + /** * @brief Mocked transport send that only sends half of the bytes. */ @@ -1201,6 +1384,43 @@ void test_MQTT_CheckConnectStatus_return_correct_status( void ) } +/* ========================================================================== */ + +/** + * @brief Test that any NULL parameter causes MQTT_InitRetransmits to return MQTTBadParameter. + */ +void test_MQTT_InitRetransmits_Invalid_Params( void ) +{ + MQTTStatus_t mqttStatus = { 0 }; + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + /* Check that MQTTBadParameter is returned if any NULL parameters are passed. */ + mqttStatus = MQTT_InitRetransmits( NULL, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); + + mqttStatus = MQTT_InitRetransmits( &context, NULL, + publishRetrieveCallbackSuccess, + publishClearCallback ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); + + mqttStatus = MQTT_InitRetransmits( &context, publishStoreCallbackSuccess, + NULL, + publishClearCallback ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); + + mqttStatus = MQTT_InitRetransmits( &context, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + NULL ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); +} + /* ========================================================================== */ static uint8_t * MQTT_SerializeConnectFixedHeader_cb( uint8_t * pIndex, @@ -2033,68 +2253,423 @@ void test_MQTT_Connect_resendPendingAcks( void ) MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack successful. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack successful. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Query for any remaining packets pending to ack. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_ID_INVALID ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); + TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); + + /* Test 5. Three packets found in ack pending state. Sent PUBREL successfully + * for first and failed for second and no attempt for third. */ + mqttContext.keepAliveIntervalSec = 0; + mqttContext.connectStatus = MQTTNotConnected; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + /* First packet. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack successful. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Second packet. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack failed. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTBadParameter ); + /* Query for any remaining packets pending to ack. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 2 ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); + TEST_ASSERT_EQUAL_INT( MQTTBadParameter, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); + + /* Test 6. Two packets found in ack pending state. Sent PUBREL successfully + * for first and failed for second. */ + mqttContext.connectStatus = MQTTNotConnected; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + /* First packet. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack successful. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Second packet. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack successful. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Query for any remaining packets pending to ack. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_ID_INVALID ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); + TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); +} + +/** + * @brief Test resend of publshes in MQTT_Connect. + */ +void test_MQTT_Connect_resendUnAckedPublishes( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent, sessionPresentResult; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + + /* Test 1. Connecting with a clean session. Clear all callback successfully executes */ + /* successful receive CONNACK packet. */ + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + /* Return with a session present flag. */ + sessionPresent = false; + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + + MQTT_PublishToResend_ExpectAnyArgsAndReturn( 1 ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); + TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); + TEST_ASSERT_FALSE( sessionPresentResult ); +} + +void test_MQTT_Connect_resendUnAckedPublishes2( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent, sessionPresentResult; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + mqttContext.clearFunction = publishClearCallback; + + /* Test 3. No publishes to resend reestablishing a session. */ + /* successful receive CONNACK packet. */ + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + /* Return with a session present flag. */ + sessionPresent = true; + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + /* No acks to resend. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + /* No publishes to resend. */ + MQTT_PublishToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); + TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); + TEST_ASSERT_TRUE( sessionPresentResult ); +} + +void test_MQTT_Connect_resendUnAckedPublishes3( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent, sessionPresentResult; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + uint16_t packetIdentifier = 1; + /* MQTTPublishState_t pubRelState = MQTTPubRelSend; */ + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + /* MQTTPublishState_t expectedState = { 0 }; */ + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + /* Test 4. One publish packet found to resend, but retrieve failed. */ + sessionPresentResult = false; + mqttContext.connectStatus = MQTTNotConnected; + mqttContext.keepAliveIntervalSec = 0; + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + sessionPresent = true; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + mqttContext.retrieveFunction = publishRetrieveCallbackFailed; + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); + TEST_ASSERT_EQUAL_INT( MQTTPublishRetrieveFailed, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); + TEST_ASSERT_TRUE( sessionPresentResult ); +} + +void test_MQTT_Connect_resendUnAckedPublishes4( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent, sessionPresentResult; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + uint16_t packetIdentifier = 1; + /* MQTTPublishState_t pubRelState = MQTTPubRelSend; */ + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + /* MQTTPublishState_t expectedState = { 0 }; */ + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + mqttContext.retrieveFunction = publishRetrieveCallbackSuccess; + + /* Test 5. One publish packet found to resend, but Transport Send failed. */ + sessionPresentResult = false; + mqttContext.connectStatus = MQTTNotConnected; + mqttContext.keepAliveIntervalSec = 0; + mqttContext.transportInterface.writev = NULL; + mqttContext.transportInterface.send = transportSendSucceedThenFailAfterConnect; + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + sessionPresent = true; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); + TEST_ASSERT_EQUAL_INT( MQTTSendFailed, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); + TEST_ASSERT_TRUE( sessionPresentResult ); +} + +void test_MQTT_Connect_resendUnAckedPublishes5( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + uint16_t packetIdentifier = 1; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + mqttContext.transportInterface.send = transportSendSuccess; + + /* Test 6. One publish packet found to resend, Sent successfully. */ + mqttContext.connectStatus = MQTTNotConnected; + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + sessionPresent = true; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); /* Query for any remaining packets pending to ack. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_ID_INVALID ); status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); +} - /* Test 5. Three packets found in ack pending state. Sent PUBREL successfully - * for first and failed for second and no attempt for third. */ - mqttContext.keepAliveIntervalSec = 0; - mqttContext.connectStatus = MQTTNotConnected; - MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); - MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); - /* First packet. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack successful. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - /* Second packet. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack failed. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTBadParameter ); - /* Query for any remaining packets pending to ack. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 2 ); - status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); - TEST_ASSERT_EQUAL_INT( MQTTBadParameter, status ); - TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); +void test_MQTT_Connect_resendUnAckedPublishes6( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + uint16_t packetIdentifier = 1; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; - /* Test 6. Two packets found in ack pending state. Sent PUBREL successfully + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccessThenFail, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + + /* Test 7. Two publish packets found to resend. Sent successfully * for first and failed for second. */ + mqttContext.keepAliveIntervalSec = 0; mqttContext.connectStatus = MQTTNotConnected; + sessionPresent = true; + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); /* First packet. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack successful. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier ); /* Second packet. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack successful. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - /* Query for any remaining packets pending to ack. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_ID_INVALID ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); - TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); - TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); - TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); + TEST_ASSERT_EQUAL_INT( MQTTPublishRetrieveFailed, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); } /** @@ -2534,6 +3109,104 @@ void test_MQTT_Publish6( void ) TEST_ASSERT_EQUAL_INT( MQTTNoMemory, status ); } +/** + * @brief Test that MQTT_Publish works as intended. + */ +void test_MQTT_Publish_Storing_Publish_Success( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTPublishInfo_t publishInfo = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTStatus_t status; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + MQTTPublishState_t expectedState = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + transport.send = transportSendFailure; + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &publishInfo, 0x0, sizeof( publishInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + mqttContext.connectStatus = MQTTConnected; + + publishInfo.qos = MQTTQoS1; + + expectedState = MQTTPublishSend; + + MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_ReserveState_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateStatePublish_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStatePublish_ReturnThruPtr_pNewState( &expectedState ); + + mqttContext.transportInterface.send = transportSendSuccess; + status = MQTT_Publish( &mqttContext, &publishInfo, 1 ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); +} + +/** + * @brief Test that MQTT_Publish works as intended. + */ +void test_MQTT_Publish_Storing_Publish_Failed( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTPublishInfo_t publishInfo = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTStatus_t status; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + transport.send = transportSendFailure; + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &publishInfo, 0x0, sizeof( publishInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackFailed, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + mqttContext.connectStatus = MQTTConnected; + + publishInfo.qos = MQTTQoS1; + + MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_ReserveState_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + + mqttContext.transportInterface.send = transportSendSuccess; + status = MQTT_Publish( &mqttContext, &publishInfo, 1 ); + TEST_ASSERT_EQUAL_INT( MQTTPublishStoreFailed, status ); +} + /** * @brief Test that MQTT_Publish works as intended. */ @@ -2678,6 +3351,8 @@ void test_MQTT_Publish_WriteVSendsPartialBytes( void ) MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ReturnThruPtr_headerSize( &headerLen ); MQTT_ReserveState_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_UpdateStatePublish_ExpectAnyArgsAndReturn( MQTTSuccess ); mqttContext.outgoingPublishRecordMaxCount = 10; @@ -2717,6 +3392,8 @@ void test_MQTT_Publish7( void ) MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); mqttContext.connectStatus = MQTTConnected; /* We need sendPacket to be called with at least 1 byte to send, so that @@ -2754,6 +3431,10 @@ void test_MQTT_Publish8( void ) mqttContext.transportInterface.send = transportSendSucceedThenFail; MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + publishInfo.pPayload = "Test"; publishInfo.payloadLength = 4; status = MQTT_Publish( &mqttContext, &publishInfo, 0 ); @@ -2783,6 +3464,10 @@ void test_MQTT_Publish9( void ) MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + mqttContext.transportInterface.send = transportSendSuccess; status = MQTT_Publish( &mqttContext, &publishInfo, 0 ); TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); @@ -2814,6 +3499,10 @@ void test_MQTT_Publish10( void ) publishInfo.payloadLength = 0; MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + status = MQTT_Publish( &mqttContext, &publishInfo, 0 ); TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); } @@ -2896,6 +3585,8 @@ void test_MQTT_Publish12( void ) MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_ReserveState_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_UpdateStatePublish_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_UpdateStatePublish_ReturnThruPtr_pNewState( &expectedState ); @@ -3043,6 +3734,8 @@ void test_MQTT_Publish_Send_Timeout( void ) publishInfo.payloadLength = 4; MQTT_GetPublishPacketSize_IgnoreAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); /* Call the API function under test and expect that it detects a timeout in sending * MQTT packet over the network. */ @@ -4090,6 +4783,72 @@ void test_MQTT_ProcessLoop_handleIncomingAck_Happy_Paths( void ) expectProcessLoopCalls( &context, &expectParams ); } +/** + * @brief This test case covers all calls to the private method, + * handleIncomingAck(...), + * that result in the process loop returning successfully. + */ +void test_MQTT_ProcessLoop_handleIncomingAck_Clear_Publish_Copies( void ) +{ + MQTTStatus_t mqttStatus; + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + ProcessLoopReturns_t expectParams = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + mqttStatus = MQTT_InitRetransmits( &context, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + context.connectStatus = MQTTConnected; + + modifyIncomingPacketStatus = MQTTSuccess; + + /* Mock the receiving of a PUBACK packet type and expect the appropriate + * calls made from the process loop. */ + currentPacketType = MQTT_PACKET_TYPE_PUBACK; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPublishDone; + expectParams.stateAfterSerialize = MQTTPublishDone; + expectProcessLoopCalls( &context, &expectParams ); + + /* Mock the receiving of a PUBREC packet type and expect the appropriate + * calls made from the process loop. */ + currentPacketType = MQTT_PACKET_TYPE_PUBREC; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPubRelSend; + expectParams.stateAfterSerialize = MQTTPubCompPending; + expectProcessLoopCalls( &context, &expectParams ); + + context.clearFunction = publishClearCallback; + + /* Mock the receiving of a PUBACK packet type and expect the appropriate + * calls made from the process loop. */ + currentPacketType = MQTT_PACKET_TYPE_PUBACK; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPublishDone; + expectParams.stateAfterSerialize = MQTTPublishDone; + expectProcessLoopCalls( &context, &expectParams ); + + /* Mock the receiving of a PUBREC packet type and expect the appropriate + * calls made from the process loop. */ + currentPacketType = MQTT_PACKET_TYPE_PUBREC; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPubRelSend; + expectParams.stateAfterSerialize = MQTTPubCompPending; + expectProcessLoopCalls( &context, &expectParams ); +} + /** * @brief This test case covers all calls to the private method, * handleIncomingAck(...), @@ -6418,7 +7177,15 @@ void test_MQTT_Status_strerror( void ) str = MQTT_Status_strerror( status ); TEST_ASSERT_EQUAL_STRING( "MQTTStatusDisconnectPending", str ); - status = MQTTStatusDisconnectPending + 1; + status = MQTTPublishStoreFailed; + str = MQTT_Status_strerror( status ); + TEST_ASSERT_EQUAL_STRING( "MQTTPublishStoreFailed", str ); + + status = MQTTPublishRetrieveFailed; + str = MQTT_Status_strerror( status ); + TEST_ASSERT_EQUAL_STRING( "MQTTPublishRetrieveFailed", str ); + + status = MQTTPublishRetrieveFailed + 1; str = MQTT_Status_strerror( status ); TEST_ASSERT_EQUAL_STRING( "Invalid MQTT Status code", str ); } @@ -6570,3 +7337,54 @@ void test_MQTT_InitStatefulQoS_callback_is_null( void ) TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); } /* ========================================================================== */ + +void test_MQTT_GetBytesInMQTTVec( void ) +{ + TransportOutVector_t pTransportArray[ 10 ] = + { + { .iov_base = NULL, .iov_len = 1 }, + { .iov_base = NULL, .iov_len = 2 }, + { .iov_base = NULL, .iov_len = 3 }, + { .iov_base = NULL, .iov_len = 4 }, + { .iov_base = NULL, .iov_len = 5 }, + { .iov_base = NULL, .iov_len = 6 }, + { .iov_base = NULL, .iov_len = 7 }, + { .iov_base = NULL, .iov_len = 8 }, + { .iov_base = NULL, .iov_len = 9 }, + { .iov_base = NULL, .iov_len = 10 }, + }; + + MQTTVec_t mqttVec; + + mqttVec.pVector = pTransportArray; + mqttVec.vectorLen = 10; + + size_t ret = MQTT_GetBytesInMQTTVec( &mqttVec ); + + TEST_ASSERT_EQUAL( 55, ret ); +} +/* ========================================================================== */ + +void test_MQTT_SerializeMQTTVec( void ) +{ + TransportOutVector_t pTransportArray[ 6 ] = + { + { .iov_base = "This ", .iov_len = strlen( "This " ) }, + { .iov_base = "is ", .iov_len = strlen( "is " ) }, + { .iov_base = "a ", .iov_len = strlen( "a " ) }, + { .iov_base = "coreMQTT ", .iov_len = strlen( "coreMQTT " ) }, + { .iov_base = "unit-test ", .iov_len = strlen( "unit-test " ) }, + { .iov_base = "string.", .iov_len = strlen( "string." ) } + }; + + uint8_t array[ 50 ] = { 0 }; + MQTTVec_t mqttVec; + + mqttVec.pVector = pTransportArray; + mqttVec.vectorLen = 6; + + MQTT_SerializeMQTTVec( array, &mqttVec ); + + TEST_ASSERT_EQUAL_MEMORY( "This is a coreMQTT unit-test string.", array, strlen( "This is a coreMQTT unit-test string." ) ); + TEST_ASSERT_EQUAL_MEMORY( "\0\0\0\0\0\0\0\0\0\0\0\0\0", &array[ 37 ], 13 ); +}