diff --git a/.github/.cSpellWords.txt b/.github/.cSpellWords.txt index d2ba4f17..71edb870 100644 --- a/.github/.cSpellWords.txt +++ b/.github/.cSpellWords.txt @@ -20,6 +20,7 @@ DLIBRARY DNDEBUG DUNITTEST DUNITY +getbytesinmqttvec getpacketid isystem lcov @@ -34,6 +35,7 @@ NONDET pylint pytest pyyaml +serializemqttvec sinclude UNACKED unpadded diff --git a/docs/doxygen/include/size_table.md b/docs/doxygen/include/size_table.md index 1fa92ff6..5f654632 100644 --- a/docs/doxygen/include/size_table.md +++ b/docs/doxygen/include/size_table.md @@ -9,8 +9,8 @@ core_mqtt.c -
4.4K
-
3.8K
+
4.9K
+
4.2K
core_mqtt_state.c @@ -19,12 +19,12 @@ core_mqtt_serializer.c -
2.8K
-
2.2K
+
2.9K
+
2.3K
Total estimates -
8.9K
-
7.3K
+
9.5K
+
7.8K
diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 23324974..0e1180f5 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -90,6 +90,12 @@ */ #define CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ( 2U ) +struct MQTTVec +{ + TransportOutVector_t * pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */ + size_t vectorLen; /**< Length of the transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */ +}; + /*-----------------------------------------------------------*/ /** @@ -444,8 +450,10 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext ); * @brief Clears existing state records for a clean session. * * @param[in] pContext Initialized MQTT context. + * + * @return #MQTTSuccess always otherwise. */ -static void handleCleanSession( MQTTContext_t * pContext ); +static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext ); /** * @brief Send the publish packet without copying the topic string and payload in @@ -463,7 +471,7 @@ static void handleCleanSession( MQTTContext_t * pContext ); */ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, const MQTTPublishInfo_t * pPublishInfo, - const uint8_t * pMqttHeader, + uint8_t * pMqttHeader, size_t headerSize, uint16_t packetId ); @@ -1597,6 +1605,15 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext, } } + if( ( ackType == MQTTPuback ) || ( ackType == MQTTPubrec ) ) + { + if( ( status == MQTTSuccess ) && + ( pContext->clearFunction != NULL ) ) + { + pContext->clearFunction( pContext, packetIdentifier ); + } + } + if( status == MQTTSuccess ) { /* Set fields of deserialized struct. */ @@ -2133,13 +2150,14 @@ static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext, static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, const MQTTPublishInfo_t * pPublishInfo, - const uint8_t * pMqttHeader, + uint8_t * pMqttHeader, size_t headerSize, uint16_t packetId ) { MQTTStatus_t status = MQTTSuccess; size_t ioVectorLength; size_t totalMessageLength; + bool dupFlagChanged = false; /* Bytes required to encode the packet ID in an MQTT header according to * the MQTT specification. */ @@ -2190,7 +2208,42 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext, totalMessageLength += pPublishInfo->payloadLength; } - if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) + /* If not already set, set the dup flag before storing a copy of the publish + * this is because on retrieving back this copy we will get it in the form of an + * array of TransportOutVector_t that holds the data in a const pointer which cannot be + * changed after retrieving. */ + if( pPublishInfo->dup != true ) + { + MQTT_UpdateDuplicatePublishFlag( pMqttHeader, true ); + + dupFlagChanged = true; + } + + /* store a copy of the publish for retransmission purposes */ + if( ( pPublishInfo->qos > MQTTQoS0 ) && + ( pContext->storeFunction != NULL ) ) + { + MQTTVec_t mqttVec; + + mqttVec.pVector = pIoVector; + mqttVec.vectorLen = ioVectorLength; + + if( pContext->storeFunction( pContext, packetId, &mqttVec ) != true ) + { + status = MQTTPublishStoreFailed; + } + } + + /* change the value of the dup flag to its original, if it was changed */ + if( dupFlagChanged ) + { + MQTT_UpdateDuplicatePublishFlag( pMqttHeader, false ); + + dupFlagChanged = false; + } + + if( ( status == MQTTSuccess ) && + ( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) ) { status = MQTTSendFailed; } @@ -2477,6 +2530,8 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext ) MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER; uint16_t packetId = MQTT_PACKET_ID_INVALID; MQTTPublishState_t state = MQTTStateNull; + size_t totalMessageLength; + uint8_t * pMqttPacket; assert( pContext != NULL ); @@ -2492,17 +2547,71 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext ) packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); } + if( ( status == MQTTSuccess ) && + ( pContext->retrieveFunction != NULL ) ) + { + cursor = MQTT_STATE_CURSOR_INITIALIZER; + + /* Resend all the PUBLISH for which PUBACK/PUBREC is not received + * after session is reestablished. */ + do + { + packetId = MQTT_PublishToResend( pContext, &cursor ); + + if( packetId != MQTT_PACKET_ID_INVALID ) + { + if( pContext->retrieveFunction( pContext, packetId, &pMqttPacket, &totalMessageLength ) != true ) + { + status = MQTTPublishRetrieveFailed; + break; + } + + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + if( sendBuffer( pContext, pMqttPacket, totalMessageLength ) != ( int32_t ) totalMessageLength ) + { + status = MQTTSendFailed; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); + } + } while( ( packetId != MQTT_PACKET_ID_INVALID ) && + ( status == MQTTSuccess ) ); + } + return status; } -static void handleCleanSession( MQTTContext_t * pContext ) +static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext ) { + MQTTStatus_t status = MQTTSuccess; + MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER; + uint16_t packetId = MQTT_PACKET_ID_INVALID; + assert( pContext != NULL ); /* Reset the index and clear the buffer when a new session is established. */ pContext->index = 0; ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size ); + if( pContext->clearFunction != NULL ) + { + cursor = MQTT_STATE_CURSOR_INITIALIZER; + + /* Resend all the PUBLISH for which PUBACK/PUBREC is not received + * after session is reestablished. */ + do + { + packetId = MQTT_PublishToResend( pContext, &cursor ); + + if( packetId != MQTT_PACKET_ID_INVALID ) + { + pContext->clearFunction( pContext, packetId ); + } + } while( ( packetId != MQTT_PACKET_ID_INVALID ) && + ( status == MQTTSuccess ) ); + } + if( pContext->outgoingPublishRecordMaxCount > 0U ) { /* Clear any existing records if a new session is established. */ @@ -2517,6 +2626,8 @@ static void handleCleanSession( MQTTContext_t * pContext ) 0x00, pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) ); } + + return status; } static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext, @@ -2681,6 +2792,46 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext, /*-----------------------------------------------------------*/ +MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext, + MQTTStorePacketForRetransmit storeFunction, + MQTTRetrievePacketForRetransmit retrieveFunction, + MQTTClearPacketForRetransmit clearFunction ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( pContext == NULL ) + { + LogError( ( "Argument cannot be NULL: pContext=%p\n", + ( void * ) pContext ) ); + status = MQTTBadParameter; + } + else if( storeFunction == NULL ) + { + LogError( ( "Invalid parameter: storeFunction is NULL" ) ); + status = MQTTBadParameter; + } + else if( retrieveFunction == NULL ) + { + LogError( ( "Invalid parameter: retrieveFunction is NULL" ) ); + status = MQTTBadParameter; + } + else if( clearFunction == NULL ) + { + LogError( ( "Invalid parameter: clearFunction is NULL" ) ); + status = MQTTBadParameter; + } + else + { + pContext->storeFunction = storeFunction; + pContext->retrieveFunction = retrieveFunction; + pContext->clearFunction = clearFunction; + } + + return status; +} + +/*-----------------------------------------------------------*/ + MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext, uint16_t packetId ) { @@ -2820,7 +2971,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, if( ( status == MQTTSuccess ) && ( *pSessionPresent != true ) ) { - handleCleanSession( pContext ); + status = handleCleanSession( pContext ); } if( status == MQTTSuccess ) @@ -2837,7 +2988,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) ) { - /* Resend PUBRELs when reestablishing a session */ + /* Resend PUBRELs and PUBLISHES when reestablishing a session */ status = handleUncleanSessionResumption( pContext ); } @@ -3560,6 +3711,14 @@ const char * MQTT_Status_strerror( MQTTStatus_t status ) str = "MQTTStatusDisconnectPending"; break; + case MQTTPublishStoreFailed: + str = "MQTTPublishStoreFailed"; + break; + + case MQTTPublishRetrieveFailed: + str = "MQTTPublishRetrieveFailed"; + break; + default: str = "Invalid MQTT Status code"; break; @@ -3569,3 +3728,37 @@ const char * MQTT_Status_strerror( MQTTStatus_t status ) } /*-----------------------------------------------------------*/ + +size_t MQTT_GetBytesInMQTTVec( MQTTVec_t * pVec ) +{ + size_t memoryRequired = 0; + size_t i; + TransportOutVector_t * pTransportVec = pVec->pVector; + size_t vecLen = pVec->vectorLen; + + for( i = 0; i < vecLen; i++ ) + { + memoryRequired += pTransportVec[ i ].iov_len; + } + + return memoryRequired; +} + +/*-----------------------------------------------------------*/ + +void MQTT_SerializeMQTTVec( uint8_t * pAllocatedMem, + MQTTVec_t * pVec ) +{ + TransportOutVector_t * pTransportVec = pVec->pVector; + const size_t vecLen = pVec->vectorLen; + size_t index = 0; + size_t i = 0; + + for( i = 0; i < vecLen; i++ ) + { + memcpy( &pAllocatedMem[ index ], pTransportVec[ i ].iov_base, pTransportVec[ i ].iov_len ); + index += pTransportVec[ i ].iov_len; + } +} + +/*-----------------------------------------------------------*/ diff --git a/source/core_mqtt_serializer.c b/source/core_mqtt_serializer.c index 97022034..458e9c77 100644 --- a/source/core_mqtt_serializer.c +++ b/source/core_mqtt_serializer.c @@ -101,6 +101,11 @@ */ #define UINT8_SET_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) | ( 0x01U << ( position ) ) ) ) +/** + * @brief Clear a bit in an 8-bit unsigned integer. + */ +#define UINT8_CLEAR_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) & ( ~( 0x01U << ( position ) ) ) ) ) + /** * @brief Macro for checking if a bit is set in a 1-byte unsigned int. * @@ -2623,6 +2628,33 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc, /*-----------------------------------------------------------*/ +MQTTStatus_t MQTT_UpdateDuplicatePublishFlag( uint8_t * pHeader, + bool set ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( pHeader == NULL ) + { + status = MQTTBadParameter; + } + else if( ( ( *pHeader ) & 0xF0 ) != MQTT_PACKET_TYPE_PUBLISH ) + { + status = MQTTBadParameter; + } + else if( set == true ) + { + UINT8_SET_BIT( *pHeader, MQTT_PUBLISH_FLAG_DUP ); + } + else + { + UINT8_CLEAR_BIT( *pHeader, MQTT_PUBLISH_FLAG_DUP ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + MQTTStatus_t MQTT_ProcessIncomingPacketTypeAndLength( const uint8_t * pBuffer, const size_t * pIndex, MQTTPacketInfo_t * pIncomingPacket ) diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h index ba521934..e7e6f77f 100644 --- a/source/include/core_mqtt.h +++ b/source/include/core_mqtt.h @@ -64,6 +64,12 @@ struct MQTTPubAckInfo; struct MQTTContext; struct MQTTDeserializedInfo; +/** + * @ingroup mqtt_struct_types + * @brief An opaque structure provided by the library to the #MQTTStorePacketForRetransmit function when using #MQTTStorePacketForRetransmit. + */ +typedef struct MQTTVec MQTTVec_t; + /** * @ingroup mqtt_callback_types * @brief Application provided function to query the time elapsed since a given @@ -101,6 +107,59 @@ typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext, struct MQTTPacketInfo * pPacketInfo, struct MQTTDeserializedInfo * pDeserializedInfo ); +/** + * @brief User defined callback used to store outgoing publishes. Used to track any publish + * retransmit on an unclean session connection. + * + * @param[in] pContext Initialised MQTT Context. + * @param[in] packetId Outgoing publish packet identifier. + * @param[in] pMqttVec Pointer to the opaque mqtt vector structure. Users should use MQTT_SerializeMQTTVec + * and MQTT_GetBytesInMQTTVec functions to get the memory required and to serialize the + * MQTTVec_t in the provided memory respectively. + * + * @return True if the copy is successful else false. + */ +/* @[define_mqtt_retransmitstorepacket] */ +typedef bool ( * MQTTStorePacketForRetransmit)( struct MQTTContext * pContext, + uint16_t packetId, + MQTTVec_t * pMqttVec ); +/* @[define_mqtt_retransmitstorepacket] */ + +/** + * @brief User defined callback used to retreive a copied publish for resend operation. Used to + * track any publish retransmit on an unclean session connection. + * + * @param[in] pContext Initialised MQTT Context. + * @param[in] packetId Copied publish packet identifier. + * @param[out] pSerializedMqttVec Output parameter to store the pointer to the serialized MQTTVec_t + * using MQTT_SerializeMQTTVec. + * @param[out] pSerializedMqttVecLen Output parameter to return the number of bytes used to store the + * MQTTVec_t. This value should be the same as the one received from MQTT_GetBytesInMQTTVec + * when storing the packet. + * + * @return True if the retreive is successful else false. + */ +/* @[define_mqtt_retransmitretrievepacket] */ +typedef bool ( * MQTTRetrievePacketForRetransmit)( struct MQTTContext * pContext, + uint16_t packetId, + uint8_t ** pSerializedMqttVec, + size_t * pSerializedMqttVecLen ); +/* @[define_mqtt_retransmitretrievepacket] */ + +/** + * @brief User defined callback used to clear a particular copied publish packet. Used to + * track any publish retransmit on an unclean session connection. + * + * @param[in] pContext Initialised MQTT Context. + * @param[in] packetId Copied publish packet identifier. + * + * @return True if the clear is successful else false. + */ +/* @[define_mqtt_retransmitclearpacket] */ +typedef void (* MQTTClearPacketForRetransmit)( struct MQTTContext * pContext, + uint16_t packetId ); +/* @[define_mqtt_retransmitclearpacket] */ + /** * @ingroup mqtt_enum_types * @brief Values indicating if an MQTT connection exists. @@ -247,6 +306,21 @@ typedef struct MQTTContext uint16_t keepAliveIntervalSec; /**< @brief Keep Alive interval. */ uint32_t pingReqSendTimeMs; /**< @brief Timestamp of the last sent PINGREQ. */ bool waitingForPingResp; /**< @brief If the library is currently awaiting a PINGRESP. */ + + /** + * @brief User defined API used to store outgoing publishes. + */ + MQTTStorePacketForRetransmit storeFunction; + + /** + * @brief User defined API used to retreive a copied publish for resend operation. + */ + MQTTRetrievePacketForRetransmit retrieveFunction; + + /** + * @brief User defined API used to clear a particular copied publish packet. + */ + MQTTClearPacketForRetransmit clearFunction; } MQTTContext_t; /** @@ -415,6 +489,98 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext, size_t incomingPublishCount ); /* @[declare_mqtt_initstatefulqos] */ +/** + * @brief Initialize an MQTT context for publish retransmits for QoS > 0. + * + * This function must be called on an #MQTTContext_t after MQTT_InitstatefulQoS and before any other function. + * + * @param[in] pContext The context to initialize. + * @param[in] storeFunction User defined API used to store outgoing publishes. + * @param[in] retrieveFunction User defined API used to retreive a copied publish for resend operation. + * @param[in] clearFunction User defined API used to clear a particular copied publish packet. + * + * @return #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + * + * Example + * @code{c} + * + * // Function for obtaining a timestamp. + * uint32_t getTimeStampMs(); + * // Callback function for receiving packets. + * void eventCallback( + * MQTTContext_t * pContext, + * MQTTPacketInfo_t * pPacketInfo, + * MQTTDeserializedInfo_t * pDeserializedInfo + * ); + * // Network send. + * int32_t networkSend( NetworkContext_t * pContext, const void * pBuffer, size_t bytes ); + * // Network receive. + * int32_t networkRecv( NetworkContext_t * pContext, void * pBuffer, size_t bytes ); + * // User defined callback used to store outgoing publishes + * bool publishStoreCallback(struct MQTTContext* pContext, + * uint16_t packetId, + * MQTTVec_t* pIoVec); + * // User defined callback used to retreive a copied publish for resend operation + * bool publishRetrieveCallback(struct MQTTContext* pContext, + * uint16_t packetId, + * TransportOutVector_t** pIoVec, + * size_t* ioVecCount); + * // User defined callback used to clear a particular copied publish packet + * bool publishClearCallback(struct MQTTContext* pContext, + * uint16_t packetId); + * // User defined callback used to clear all copied publish packets + * bool publishClearAllCallback(struct MQTTContext* pContext); + * + * MQTTContext_t mqttContext; + * TransportInterface_t transport; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ 1024 ]; + * const size_t outgoingPublishCount = 30; + * MQTTPubAckInfo_t outgoingPublishes[ outgoingPublishCount ]; + * + * // Clear context. + * memset( ( void * ) &mqttContext, 0x00, sizeof( MQTTContext_t ) ); + * + * // Set transport interface members. + * transport.pNetworkContext = &someTransportContext; + * transport.send = networkSend; + * transport.recv = networkRecv; + * + * // Set buffer members. + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = 1024; + * + * status = MQTT_Init( &mqttContext, &transport, getTimeStampMs, eventCallback, &fixedBuffer ); + * + * if( status == MQTTSuccess ) + * { + * // We do not expect any incoming publishes in this example, therefore the incoming + * // publish pointer is NULL and the count is zero. + * status = MQTT_InitStatefulQoS( &mqttContext, outgoingPublishes, outgoingPublishCount, NULL, 0 ); + * + * // Now QoS1 and/or QoS2 publishes can be sent with this context. + * } + * + * if( status == MQTTSuccess ) + * { + * status = MQTT_InitRetransmits( &mqttContext, publishStoreCallback, + * publishRetrieveCallback, + * publishClearCallback, + * publishClearAllCallback ); + * + * // Now unacked Publishes can be resent on an unclean session resumption. + * } + * @endcode + */ + +/* @[declare_mqtt_initretransmits] */ +MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext, + MQTTStorePacketForRetransmit storeFunction, + MQTTRetrievePacketForRetransmit retrieveFunction, + MQTTClearPacketForRetransmit clearFunction ); +/* @[declare_mqtt_initretransmits] */ + /** * @brief Checks the MQTT connection status with the broker. * @@ -471,6 +637,11 @@ MQTTStatus_t MQTT_CheckConnectStatus( MQTTContext_t * pContext ); * #MQTTRecvFailed if transport receive failed for CONNACK; * #MQTTNoDataAvailable if no data available to receive in transport until * the @p timeoutMs for CONNACK; + * #MQTTStatusConnected if the connection is already established + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API + * MQTTPublishRetrieveFailed if on an unclean session connection, the copied + * publishes are not retrieved successfully for retransmission * #MQTTSuccess otherwise. * * @note This API may spend more time than provided in the timeoutMS parameters in @@ -559,6 +730,9 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, * hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; + * #MQTTStatusNotConnected if the connection is not established yet + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API * #MQTTSuccess otherwise. * * Example @@ -612,6 +786,11 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; + * #MQTTStatusNotConnected if the connection is not established yet + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API + * #MQTTPublishStoreFailed if the user provided callback to copy and store the + * outgoing publish packet fails * #MQTTSuccess otherwise. * * Example @@ -679,6 +858,9 @@ MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext, * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; + * #MQTTStatusNotConnected if the connection is not established yet + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API * #MQTTSuccess otherwise. */ /* @[declare_mqtt_ping] */ @@ -698,6 +880,9 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ); * hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; + * #MQTTStatusNotConnected if the connection is not established yet + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API * #MQTTSuccess otherwise. * * Example @@ -749,6 +934,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, * hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport send failed; + * #MQTTStatusNotConnected if the connection is already disconnected * #MQTTSuccess otherwise. */ /* @[declare_mqtt_disconnect] */ @@ -783,6 +969,10 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ); * invalid transition for the internal state machine; * #MQTTNeedMoreBytes if MQTT_ProcessLoop has received * incomplete data; it should be called again (probably after a delay); + * #MQTTStatusNotConnected if the connection is not established yet and a PING + * or an ACK is being sent. + * #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect + * before calling any other API * #MQTTSuccess on success. * * Example @@ -1037,6 +1227,28 @@ MQTTStatus_t MQTT_GetSubAckStatusCodes( const MQTTPacketInfo_t * pSubackPacket, const char * MQTT_Status_strerror( MQTTStatus_t status ); /* @[declare_mqtt_status_strerror] */ +/** + * @brief Get the bytes in a #MQTTVec pointer which can store the whole array as a an MQTT packet when calling MQTT_SerializeMQTTVec( void * pAllocatedMem, MQTTVec_t *pVec ) function. + * + * @param[in] pVec The #MQTTVec pointer. + * + * @return The bytes in the provided #MQTTVec array which can then be used to set aside memory to be used with MQTT_SerializeMQTTVec( void * pAllocatedMem, MQTTVec_t *pVec ) function. + */ +/* @[declare_mqtt_getbytesinmqttvec] */ +size_t MQTT_GetBytesInMQTTVec( MQTTVec_t * pVec ); +/* @[declare_mqtt_getbytesinmqttvec] */ + +/** + * @brief Serialize the bytes in an array of #MQTTVec in the provided \p pAllocatedMem + * + * @param[in] pAllocatedMem Memory in which to serialize the data in the #MQTTVec array. It must be of size provided by MQTT_GetBytesInMQTTVec( MQTTVec_t *pVec ). + * @param[in] pVec The #MQTTVec pointer. + */ +/* @[declare_mqtt_serializemqttvec] */ +void MQTT_SerializeMQTTVec( uint8_t * pAllocatedMem, + MQTTVec_t * pVec ); +/* @[declare_mqtt_serializemqttvec] */ + /* *INDENT-OFF* */ #ifdef __cplusplus } diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index 86b3c639..92c3ceea 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -85,23 +85,27 @@ struct MQTTPacketInfo; */ typedef enum MQTTStatus { - MQTTSuccess = 0, /**< Function completed successfully. */ - MQTTBadParameter, /**< At least one parameter was invalid. */ - MQTTNoMemory, /**< A provided buffer was too small. */ - MQTTSendFailed, /**< The transport send function failed. */ - MQTTRecvFailed, /**< The transport receive function failed. */ - MQTTBadResponse, /**< An invalid packet was received from the server. */ - MQTTServerRefused, /**< The server refused a CONNECT or SUBSCRIBE. */ - MQTTNoDataAvailable, /**< No data available from the transport interface. */ - MQTTIllegalState, /**< An illegal state in the state record. */ - MQTTStateCollision, /**< A collision with an existing state record entry. */ - MQTTKeepAliveTimeout, /**< Timeout while waiting for PINGRESP. */ - MQTTNeedMoreBytes, /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received - incomplete data; it should be called again (probably after - a delay). */ - MQTTStatusConnected, /**< MQTT connection is established with the broker */ - MQTTStatusNotConnected, /**< MQTT connection is not established with the broker */ - MQTTStatusDisconnectPending /**< Transport Interface has failed and MQTT connection needs to be closed */ + MQTTSuccess = 0, /**< Function completed successfully. */ + MQTTBadParameter, /**< At least one parameter was invalid. */ + MQTTNoMemory, /**< A provided buffer was too small. */ + MQTTSendFailed, /**< The transport send function failed. */ + MQTTRecvFailed, /**< The transport receive function failed. */ + MQTTBadResponse, /**< An invalid packet was received from the server. */ + MQTTServerRefused, /**< The server refused a CONNECT or SUBSCRIBE. */ + MQTTNoDataAvailable, /**< No data available from the transport interface. */ + MQTTIllegalState, /**< An illegal state in the state record. */ + MQTTStateCollision, /**< A collision with an existing state record entry. */ + MQTTKeepAliveTimeout, /**< Timeout while waiting for PINGRESP. */ + MQTTNeedMoreBytes, /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received + incomplete data; it should be called again (probably after + a delay). */ + MQTTStatusConnected, /**< MQTT connection is established with the broker. */ + MQTTStatusNotConnected, /**< MQTT connection is not established with the broker. */ + MQTTStatusDisconnectPending, /**< Transport Interface has failed and MQTT connection needs to be closed. */ + MQTTPublishStoreFailed, /**< User provided API to store a copy of outgoing publish for retransmission purposes, + has failed. */ + MQTTPublishRetrieveFailed /**< User provided API to retrieve the copy of a publish while reconnecting + with an unclean session has failed. */ } MQTTStatus_t; /** @@ -1220,6 +1224,19 @@ MQTTStatus_t MQTT_ProcessIncomingPacketTypeAndLength( const uint8_t * pBuffer, MQTTPacketInfo_t * pIncomingPacket ); /* @[declare_mqtt_processincomingpackettypeandlength] */ +/** + * @brief Update the duplicate publish flag within the given header of the publish packet. + * + * @param[in] pHeader The buffer holding the header content + * @param[in] set If true then the flag will be set else cleared + * + * @return #MQTTSuccess on successful setting of the duplicate flag, + * #MQTTBadParameter for invalid parameters + */ + /* @[declare_mqtt_updateduplicatepublishflag] */ +MQTTStatus_t MQTT_UpdateDuplicatePublishFlag( uint8_t * pHeader , bool set); +/* @[declare_mqtt_updateduplicatepublishflag] */ + /** * @fn uint8_t * MQTT_SerializeConnectFixedHeader( uint8_t * pIndex, const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, size_t remainingLength ); * @brief Serialize the fixed part of the connect packet header. diff --git a/test/unit-test/core_mqtt_serializer_utest.c b/test/unit-test/core_mqtt_serializer_utest.c index e91aae72..c359e296 100644 --- a/test/unit-test/core_mqtt_serializer_utest.c +++ b/test/unit-test/core_mqtt_serializer_utest.c @@ -2878,4 +2878,44 @@ void test_MQTT_SerializeDisconnect_Happy_Path() TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); } +/* ================== Testing MQTT_UpdateDuplicatePublishFlag ===================== */ + +/** + * @brief Call MQTT_UpdateDuplicatePublishFlag using a NULL pHeader and a header that does + * not come from a publish packet, in order to receive MQTTBadParameter errors. + */ +void test_MQTT_UpdateDuplicatePublishFlag_Invalid_Params() +{ + MQTTStatus_t mqttStatus = MQTTSuccess; + uint8_t pHeader = MQTT_PACKET_TYPE_SUBSCRIBE; + + /* Test NULL pHeader. */ + mqttStatus = MQTT_UpdateDuplicatePublishFlag( NULL, true ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); + + /* Test a non-publish header. */ + mqttStatus = MQTT_UpdateDuplicatePublishFlag( &pHeader, true ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); +} + +/** + * @brief This method calls MQTT_UpdateDuplicatePublishFlag successfully in order to + * get full coverage on the method. + */ +void test_MQTT_UpdateDuplicatePublishFlag_Happy_Path() +{ + MQTTStatus_t mqttStatus = MQTTSuccess; + uint8_t pHeader = MQTT_PACKET_TYPE_PUBLISH; + + /* Test to set the flag. */ + mqttStatus = MQTT_UpdateDuplicatePublishFlag( &pHeader, true ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + TEST_ASSERT_NOT_EQUAL_INT( ( pHeader ) & ( 0x01U << ( 3 ) ), 0 ); + + /* Test to clear the flag. */ + mqttStatus = MQTT_UpdateDuplicatePublishFlag( &pHeader, false ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + TEST_ASSERT_EQUAL_INT( ( pHeader ) & ( 0x01U << ( 3 ) ), 0 ); +} + /* ========================================================================== */ diff --git a/test/unit-test/core_mqtt_utest.c b/test/unit-test/core_mqtt_utest.c index 131f255d..e4346b43 100644 --- a/test/unit-test/core_mqtt_utest.c +++ b/test/unit-test/core_mqtt_utest.c @@ -184,6 +184,15 @@ typedef struct ProcessLoopReturns uint32_t timeoutMs; /**< @brief The timeout value to call MQTT_ProcessLoop API with. */ } ProcessLoopReturns_t; +/** + * @brief An opaque structure provided by the library to the #MQTTStorePacketForRetransmit function when using #MQTTStorePacketForRetransmit. + */ +struct MQTTVec +{ + TransportOutVector_t * pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */ + size_t vectorLen; /**< Length of the transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */ +}; + /** * @brief The packet type to be received by the process loop. * IMPORTANT: Make sure this is set before calling expectProcessLoopCalls(...). @@ -208,6 +217,16 @@ static uint32_t globalEntryTime = 0; */ static uint8_t mqttBuffer[ MQTT_TEST_BUFFER_LENGTH ] = { 0 }; +/** + * @brief A static buffer used by the MQTT library for storing publishes for retransmiting purpose. + */ +static uint8_t * publishCopyBuffer = NULL; + +/** + * @brief Size of the publishCopyBuffer array + */ +static size_t publishCopyBufferSize = 0; + /** * @brief A flag to indicate whether event callback is called from * MQTT_ProcessLoop. @@ -396,6 +415,146 @@ static int32_t transportWritevSuccess( NetworkContext_t * pNetworkContext, return bytesToWrite; } +/** + * @brief Mocked successful publish store function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if store is successful else false + */ +bool publishStoreCallbackSuccess( struct MQTTContext * pContext, + uint16_t packetId, + MQTTVec_t * pMqttVec ) +{ + ( void ) pContext; + ( void ) packetId; + ( void ) pMqttVec; + + return true; +} + +/** + * @brief Mocked failed publish store function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if store is successful else false + */ +bool publishStoreCallbackFailed( struct MQTTContext * pContext, + uint16_t packetId, + MQTTVec_t * pMqttVec ) +{ + ( void ) pContext; + ( void ) packetId; + ( void ) pMqttVec; + + return false; +} + +/** + * @brief Mocked successful publish retrieve function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if retrieve is successful else false + */ +bool publishRetrieveCallbackSuccess( struct MQTTContext * pContext, + uint16_t packetId, + uint8_t ** pPacket, + size_t * pPacketSize ) +{ + ( void ) pContext; + ( void ) packetId; + + *pPacket = publishCopyBuffer; + *pPacketSize = publishCopyBufferSize; + + return true; +} + +/** + * @brief Mocked publish retrieve function. Succeeds once then fails + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if retrieve is successful else false + */ +bool publishRetrieveCallbackSuccessThenFail( struct MQTTContext * pContext, + uint16_t packetId, + uint8_t ** pPacket, + size_t * pPacketSize ) +{ + ( void ) pContext; + ( void ) packetId; + + bool ret = true; + static int count = 0; + + *pPacket = publishCopyBuffer; + *pPacketSize = publishCopyBufferSize; + + if( count != 0 ) + { + count = 0; + ret = false; + } + + count++; + + return ret; +} + +/** + * @brief Mocked failed publish retrieve function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * @param[in] pIoVec array of transportout vectors + * @param[in] ioVecCount number of vectors in the array + * + * @return true if retrieve is successful else false + */ +bool publishRetrieveCallbackFailed( struct MQTTContext * pContext, + uint16_t packetId, + uint8_t ** pPacket, + size_t * pPacketSize ) +{ + ( void ) pContext; + ( void ) packetId; + ( void ) pPacket; + ( void ) pPacketSize; + + return false; +} + +/** + * @brief Mocked publish clear function. + * + * @param[in] pContext initialised mqtt context. + * @param[in] packetId packet id + * + * @return true if clear is successful else false + */ +void publishClearCallback( struct MQTTContext * pContext, + uint16_t packetId ) +{ + ( void ) pContext; + ( void ) packetId; +} + + static void verifyEncodedTopicString( TransportOutVector_t * pIoVectorIterator, char * pTopicFilter, size_t topicFilterLength, @@ -737,6 +896,30 @@ static int32_t transportSendSucceedThenFail( NetworkContext_t * pNetworkContext, return retVal; } +/** + * @brief Mocked transport send that succeeds when sending connect then fails after that. + */ +static int32_t transportSendSucceedThenFailAfterConnect( NetworkContext_t * pNetworkContext, + const void * pMessage, + size_t bytesToSend ) +{ + int32_t retVal = bytesToSend; + static int counter = 0; + + ( void ) pNetworkContext; + ( void ) pMessage; + + counter++; + + if( counter == 3 ) + { + retVal = -1; + counter = 0; + } + + return retVal; +} + /** * @brief Mocked transport send that only sends half of the bytes. */ @@ -1201,6 +1384,43 @@ void test_MQTT_CheckConnectStatus_return_correct_status( void ) } +/* ========================================================================== */ + +/** + * @brief Test that any NULL parameter causes MQTT_InitRetransmits to return MQTTBadParameter. + */ +void test_MQTT_InitRetransmits_Invalid_Params( void ) +{ + MQTTStatus_t mqttStatus = { 0 }; + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + /* Check that MQTTBadParameter is returned if any NULL parameters are passed. */ + mqttStatus = MQTT_InitRetransmits( NULL, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); + + mqttStatus = MQTT_InitRetransmits( &context, NULL, + publishRetrieveCallbackSuccess, + publishClearCallback ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); + + mqttStatus = MQTT_InitRetransmits( &context, publishStoreCallbackSuccess, + NULL, + publishClearCallback ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); + + mqttStatus = MQTT_InitRetransmits( &context, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + NULL ); + TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); +} + /* ========================================================================== */ static uint8_t * MQTT_SerializeConnectFixedHeader_cb( uint8_t * pIndex, @@ -2033,68 +2253,423 @@ void test_MQTT_Connect_resendPendingAcks( void ) MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack successful. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack successful. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Query for any remaining packets pending to ack. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_ID_INVALID ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); + TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); + + /* Test 5. Three packets found in ack pending state. Sent PUBREL successfully + * for first and failed for second and no attempt for third. */ + mqttContext.keepAliveIntervalSec = 0; + mqttContext.connectStatus = MQTTNotConnected; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + /* First packet. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack successful. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Second packet. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack failed. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTBadParameter ); + /* Query for any remaining packets pending to ack. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 2 ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); + TEST_ASSERT_EQUAL_INT( MQTTBadParameter, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); + + /* Test 6. Two packets found in ack pending state. Sent PUBREL successfully + * for first and failed for second. */ + mqttContext.connectStatus = MQTTNotConnected; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + /* First packet. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack successful. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Second packet. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); + MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); + /* Serialize Ack successful. */ + MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + /* Query for any remaining packets pending to ack. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_ID_INVALID ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); + TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); +} + +/** + * @brief Test resend of publshes in MQTT_Connect. + */ +void test_MQTT_Connect_resendUnAckedPublishes( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent, sessionPresentResult; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + + /* Test 1. Connecting with a clean session. Clear all callback successfully executes */ + /* successful receive CONNACK packet. */ + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + /* Return with a session present flag. */ + sessionPresent = false; + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + + MQTT_PublishToResend_ExpectAnyArgsAndReturn( 1 ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); + TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); + TEST_ASSERT_FALSE( sessionPresentResult ); +} + +void test_MQTT_Connect_resendUnAckedPublishes2( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent, sessionPresentResult; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + mqttContext.clearFunction = publishClearCallback; + + /* Test 3. No publishes to resend reestablishing a session. */ + /* successful receive CONNACK packet. */ + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + /* Return with a session present flag. */ + sessionPresent = true; + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + /* No acks to resend. */ + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + /* No publishes to resend. */ + MQTT_PublishToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); + TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); + TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); + TEST_ASSERT_TRUE( sessionPresentResult ); +} + +void test_MQTT_Connect_resendUnAckedPublishes3( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent, sessionPresentResult; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + uint16_t packetIdentifier = 1; + /* MQTTPublishState_t pubRelState = MQTTPubRelSend; */ + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + /* MQTTPublishState_t expectedState = { 0 }; */ + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + /* Test 4. One publish packet found to resend, but retrieve failed. */ + sessionPresentResult = false; + mqttContext.connectStatus = MQTTNotConnected; + mqttContext.keepAliveIntervalSec = 0; + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + sessionPresent = true; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + mqttContext.retrieveFunction = publishRetrieveCallbackFailed; + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); + TEST_ASSERT_EQUAL_INT( MQTTPublishRetrieveFailed, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); + TEST_ASSERT_TRUE( sessionPresentResult ); +} + +void test_MQTT_Connect_resendUnAckedPublishes4( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent, sessionPresentResult; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + uint16_t packetIdentifier = 1; + /* MQTTPublishState_t pubRelState = MQTTPubRelSend; */ + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + /* MQTTPublishState_t expectedState = { 0 }; */ + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + mqttContext.retrieveFunction = publishRetrieveCallbackSuccess; + + /* Test 5. One publish packet found to resend, but Transport Send failed. */ + sessionPresentResult = false; + mqttContext.connectStatus = MQTTNotConnected; + mqttContext.keepAliveIntervalSec = 0; + mqttContext.transportInterface.writev = NULL; + mqttContext.transportInterface.send = transportSendSucceedThenFailAfterConnect; + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + sessionPresent = true; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); + status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult ); + TEST_ASSERT_EQUAL_INT( MQTTSendFailed, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); + TEST_ASSERT_TRUE( sessionPresentResult ); +} + +void test_MQTT_Connect_resendUnAckedPublishes5( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + uint16_t packetIdentifier = 1; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; + + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &connectInfo, 0x00, sizeof( connectInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + mqttContext.transportInterface.send = transportSendSuccess; + + /* Test 6. One publish packet found to resend, Sent successfully. */ + mqttContext.connectStatus = MQTTNotConnected; + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; + sessionPresent = true; + MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); + MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); /* Query for any remaining packets pending to ack. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_ID_INVALID ); status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); +} - /* Test 5. Three packets found in ack pending state. Sent PUBREL successfully - * for first and failed for second and no attempt for third. */ - mqttContext.keepAliveIntervalSec = 0; - mqttContext.connectStatus = MQTTNotConnected; - MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); - MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); - /* First packet. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack successful. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - /* Second packet. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack failed. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTBadParameter ); - /* Query for any remaining packets pending to ack. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 2 ); - status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); - TEST_ASSERT_EQUAL_INT( MQTTBadParameter, status ); - TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); +void test_MQTT_Connect_resendUnAckedPublishes6( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTConnectInfo_t connectInfo = { 0 }; + uint32_t timeout = 2; + bool sessionPresent; + MQTTStatus_t status; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTPacketInfo_t incomingPacket = { 0 }; + uint16_t packetIdentifier = 1; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + uint8_t * localPublishCopyBuffer = ( uint8_t * ) "Hello world!"; - /* Test 6. Two packets found in ack pending state. Sent PUBREL successfully + publishCopyBuffer = localPublishCopyBuffer; + publishCopyBufferSize = sizeof( "Hello world!" ); + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccessThenFail, + publishClearCallback ); + + MQTT_SerializeConnect_IgnoreAndReturn( MQTTSuccess ); + MQTT_GetConnectPacketSize_IgnoreAndReturn( MQTTSuccess ); + connectInfo.keepAliveSeconds = MQTT_SAMPLE_KEEPALIVE_INTERVAL_S; + + /* Test 7. Two publish packets found to resend. Sent successfully * for first and failed for second. */ + mqttContext.keepAliveIntervalSec = 0; mqttContext.connectStatus = MQTTNotConnected; + sessionPresent = true; + incomingPacket.type = MQTT_PACKET_TYPE_CONNACK; + incomingPacket.remainingLength = 2; MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); + MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID ); /* First packet. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack successful. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier ); /* Second packet. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); - MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); - /* Serialize Ack successful. */ - MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); - /* Query for any remaining packets pending to ack. */ - MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_ID_INVALID ); + MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); + MQTT_SerializeConnectFixedHeader_Stub( MQTT_SerializeConnectFixedHeader_cb ); status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); - TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); - TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); - TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); + TEST_ASSERT_EQUAL_INT( MQTTPublishRetrieveFailed, status ); + TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus ); } /** @@ -2534,6 +3109,104 @@ void test_MQTT_Publish6( void ) TEST_ASSERT_EQUAL_INT( MQTTNoMemory, status ); } +/** + * @brief Test that MQTT_Publish works as intended. + */ +void test_MQTT_Publish_Storing_Publish_Success( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTPublishInfo_t publishInfo = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTStatus_t status; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + MQTTPublishState_t expectedState = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + transport.send = transportSendFailure; + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &publishInfo, 0x0, sizeof( publishInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + mqttContext.connectStatus = MQTTConnected; + + publishInfo.qos = MQTTQoS1; + + expectedState = MQTTPublishSend; + + MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_ReserveState_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateStatePublish_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateStatePublish_ReturnThruPtr_pNewState( &expectedState ); + + mqttContext.transportInterface.send = transportSendSuccess; + status = MQTT_Publish( &mqttContext, &publishInfo, 1 ); + TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); +} + +/** + * @brief Test that MQTT_Publish works as intended. + */ +void test_MQTT_Publish_Storing_Publish_Failed( void ) +{ + MQTTContext_t mqttContext = { 0 }; + MQTTPublishInfo_t publishInfo = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + MQTTStatus_t status; + MQTTPubAckInfo_t incomingRecords = { 0 }; + MQTTPubAckInfo_t outgoingRecords = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + transport.send = transportSendFailure; + + memset( &mqttContext, 0x0, sizeof( mqttContext ) ); + memset( &publishInfo, 0x0, sizeof( publishInfo ) ); + MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer ); + + MQTT_InitStatefulQoS( &mqttContext, + &outgoingRecords, 4, + &incomingRecords, 4 ); + + MQTT_InitRetransmits( &mqttContext, publishStoreCallbackFailed, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + mqttContext.connectStatus = MQTTConnected; + + publishInfo.qos = MQTTQoS1; + + MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_ReserveState_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + + mqttContext.transportInterface.send = transportSendSuccess; + status = MQTT_Publish( &mqttContext, &publishInfo, 1 ); + TEST_ASSERT_EQUAL_INT( MQTTPublishStoreFailed, status ); +} + /** * @brief Test that MQTT_Publish works as intended. */ @@ -2678,6 +3351,8 @@ void test_MQTT_Publish_WriteVSendsPartialBytes( void ) MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ReturnThruPtr_headerSize( &headerLen ); MQTT_ReserveState_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_UpdateStatePublish_ExpectAnyArgsAndReturn( MQTTSuccess ); mqttContext.outgoingPublishRecordMaxCount = 10; @@ -2717,6 +3392,8 @@ void test_MQTT_Publish7( void ) MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); mqttContext.connectStatus = MQTTConnected; /* We need sendPacket to be called with at least 1 byte to send, so that @@ -2754,6 +3431,10 @@ void test_MQTT_Publish8( void ) mqttContext.transportInterface.send = transportSendSucceedThenFail; MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + publishInfo.pPayload = "Test"; publishInfo.payloadLength = 4; status = MQTT_Publish( &mqttContext, &publishInfo, 0 ); @@ -2783,6 +3464,10 @@ void test_MQTT_Publish9( void ) MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + mqttContext.transportInterface.send = transportSendSuccess; status = MQTT_Publish( &mqttContext, &publishInfo, 0 ); TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); @@ -2814,6 +3499,10 @@ void test_MQTT_Publish10( void ) publishInfo.payloadLength = 0; MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + status = MQTT_Publish( &mqttContext, &publishInfo, 0 ); TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); } @@ -2896,6 +3585,8 @@ void test_MQTT_Publish12( void ) MQTT_GetPublishPacketSize_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_ReserveState_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_UpdateStatePublish_ExpectAnyArgsAndReturn( MQTTSuccess ); MQTT_UpdateStatePublish_ReturnThruPtr_pNewState( &expectedState ); @@ -3043,6 +3734,8 @@ void test_MQTT_Publish_Send_Timeout( void ) publishInfo.payloadLength = 4; MQTT_GetPublishPacketSize_IgnoreAndReturn( MQTTSuccess ); MQTT_SerializePublishHeaderWithoutTopic_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); + MQTT_UpdateDuplicatePublishFlag_ExpectAnyArgsAndReturn( MQTTSuccess ); /* Call the API function under test and expect that it detects a timeout in sending * MQTT packet over the network. */ @@ -4090,6 +4783,72 @@ void test_MQTT_ProcessLoop_handleIncomingAck_Happy_Paths( void ) expectProcessLoopCalls( &context, &expectParams ); } +/** + * @brief This test case covers all calls to the private method, + * handleIncomingAck(...), + * that result in the process loop returning successfully. + */ +void test_MQTT_ProcessLoop_handleIncomingAck_Clear_Publish_Copies( void ) +{ + MQTTStatus_t mqttStatus; + MQTTContext_t context = { 0 }; + TransportInterface_t transport = { 0 }; + MQTTFixedBuffer_t networkBuffer = { 0 }; + ProcessLoopReturns_t expectParams = { 0 }; + + setupTransportInterface( &transport ); + setupNetworkBuffer( &networkBuffer ); + + mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer ); + TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus ); + + mqttStatus = MQTT_InitRetransmits( &context, publishStoreCallbackSuccess, + publishRetrieveCallbackSuccess, + publishClearCallback ); + + context.connectStatus = MQTTConnected; + + modifyIncomingPacketStatus = MQTTSuccess; + + /* Mock the receiving of a PUBACK packet type and expect the appropriate + * calls made from the process loop. */ + currentPacketType = MQTT_PACKET_TYPE_PUBACK; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPublishDone; + expectParams.stateAfterSerialize = MQTTPublishDone; + expectProcessLoopCalls( &context, &expectParams ); + + /* Mock the receiving of a PUBREC packet type and expect the appropriate + * calls made from the process loop. */ + currentPacketType = MQTT_PACKET_TYPE_PUBREC; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPubRelSend; + expectParams.stateAfterSerialize = MQTTPubCompPending; + expectProcessLoopCalls( &context, &expectParams ); + + context.clearFunction = publishClearCallback; + + /* Mock the receiving of a PUBACK packet type and expect the appropriate + * calls made from the process loop. */ + currentPacketType = MQTT_PACKET_TYPE_PUBACK; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPublishDone; + expectParams.stateAfterSerialize = MQTTPublishDone; + expectProcessLoopCalls( &context, &expectParams ); + + /* Mock the receiving of a PUBREC packet type and expect the appropriate + * calls made from the process loop. */ + currentPacketType = MQTT_PACKET_TYPE_PUBREC; + /* Set expected return values in the loop. */ + resetProcessLoopParams( &expectParams ); + expectParams.stateAfterDeserialize = MQTTPubRelSend; + expectParams.stateAfterSerialize = MQTTPubCompPending; + expectProcessLoopCalls( &context, &expectParams ); +} + /** * @brief This test case covers all calls to the private method, * handleIncomingAck(...), @@ -6418,7 +7177,15 @@ void test_MQTT_Status_strerror( void ) str = MQTT_Status_strerror( status ); TEST_ASSERT_EQUAL_STRING( "MQTTStatusDisconnectPending", str ); - status = MQTTStatusDisconnectPending + 1; + status = MQTTPublishStoreFailed; + str = MQTT_Status_strerror( status ); + TEST_ASSERT_EQUAL_STRING( "MQTTPublishStoreFailed", str ); + + status = MQTTPublishRetrieveFailed; + str = MQTT_Status_strerror( status ); + TEST_ASSERT_EQUAL_STRING( "MQTTPublishRetrieveFailed", str ); + + status = MQTTPublishRetrieveFailed + 1; str = MQTT_Status_strerror( status ); TEST_ASSERT_EQUAL_STRING( "Invalid MQTT Status code", str ); } @@ -6570,3 +7337,54 @@ void test_MQTT_InitStatefulQoS_callback_is_null( void ) TEST_ASSERT_EQUAL( MQTTBadParameter, mqttStatus ); } /* ========================================================================== */ + +void test_MQTT_GetBytesInMQTTVec( void ) +{ + TransportOutVector_t pTransportArray[ 10 ] = + { + { .iov_base = NULL, .iov_len = 1 }, + { .iov_base = NULL, .iov_len = 2 }, + { .iov_base = NULL, .iov_len = 3 }, + { .iov_base = NULL, .iov_len = 4 }, + { .iov_base = NULL, .iov_len = 5 }, + { .iov_base = NULL, .iov_len = 6 }, + { .iov_base = NULL, .iov_len = 7 }, + { .iov_base = NULL, .iov_len = 8 }, + { .iov_base = NULL, .iov_len = 9 }, + { .iov_base = NULL, .iov_len = 10 }, + }; + + MQTTVec_t mqttVec; + + mqttVec.pVector = pTransportArray; + mqttVec.vectorLen = 10; + + size_t ret = MQTT_GetBytesInMQTTVec( &mqttVec ); + + TEST_ASSERT_EQUAL( 55, ret ); +} +/* ========================================================================== */ + +void test_MQTT_SerializeMQTTVec( void ) +{ + TransportOutVector_t pTransportArray[ 6 ] = + { + { .iov_base = "This ", .iov_len = strlen( "This " ) }, + { .iov_base = "is ", .iov_len = strlen( "is " ) }, + { .iov_base = "a ", .iov_len = strlen( "a " ) }, + { .iov_base = "coreMQTT ", .iov_len = strlen( "coreMQTT " ) }, + { .iov_base = "unit-test ", .iov_len = strlen( "unit-test " ) }, + { .iov_base = "string.", .iov_len = strlen( "string." ) } + }; + + uint8_t array[ 50 ] = { 0 }; + MQTTVec_t mqttVec; + + mqttVec.pVector = pTransportArray; + mqttVec.vectorLen = 6; + + MQTT_SerializeMQTTVec( array, &mqttVec ); + + TEST_ASSERT_EQUAL_MEMORY( "This is a coreMQTT unit-test string.", array, strlen( "This is a coreMQTT unit-test string." ) ); + TEST_ASSERT_EQUAL_MEMORY( "\0\0\0\0\0\0\0\0\0\0\0\0\0", &array[ 37 ], 13 ); +}