From 905e47d1638bf093e19c7da2d47dfa64f8131384 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Mon, 11 Nov 2024 21:32:50 -0800 Subject: [PATCH 1/8] Fix TSAN errors --- src/source/Ice/ConnectionListener.c | 13 +++++++------ src/source/Signaling/LwsApiCalls.c | 4 +++- tst/IceFunctionalityTest.cpp | 5 +++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/source/Ice/ConnectionListener.c b/src/source/Ice/ConnectionListener.c index 33eb21a37e..f798e828c1 100644 --- a/src/source/Ice/ConnectionListener.c +++ b/src/source/Ice/ConnectionListener.c @@ -62,9 +62,6 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) ATOMIC_STORE_BOOL(&pConnectionListener->terminate, TRUE); if (IS_VALID_MUTEX_VALUE(pConnectionListener->lock)) { - MUTEX_LOCK(pConnectionListener->lock); - threadId = pConnectionListener->receiveDataRoutine; - MUTEX_UNLOCK(pConnectionListener->lock); // TODO add support for windows socketpair // This writes to the socketpair, kicking the POLL() out early, @@ -72,12 +69,16 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) #ifndef _WIN32 socketWrite(pConnectionListener->kickSocket[CONNECTION_LISTENER_KICK_SOCKET_WRITE], msg, STRLEN(msg)); #endif - + // receiveDataRoutine TID should be used under pConnectionListener->lock lock. + MUTEX_LOCK(pConnectionListener->lock); + threadId = pConnectionListener->receiveDataRoutine; // wait for thread to finish. if (IS_VALID_TID_VALUE(threadId)) { - THREAD_JOIN(pConnectionListener->receiveDataRoutine, NULL); + THREAD_JOIN(threadId, NULL); + pConnectionListener->receiveDataRoutine = INVALID_MUTEX_VALUE; } - + + MUTEX_UNLOCK(pConnectionListener->lock); MUTEX_FREE(pConnectionListener->lock); } diff --git a/src/source/Signaling/LwsApiCalls.c b/src/source/Signaling/LwsApiCalls.c index ad6e069a6f..aa1bd6bb3c 100644 --- a/src/source/Signaling/LwsApiCalls.c +++ b/src/source/Signaling/LwsApiCalls.c @@ -2417,10 +2417,12 @@ STATUS wakeLwsServiceEventLoop(PSignalingClient pSignalingClient, UINT32 protoco // Early exit in case we don't need to do anything CHK(pSignalingClient != NULL && pSignalingClient->pLwsContext != NULL, retStatus); + // currentWsi should be used under lwsSerializerLock. + MUTEX_LOCK(pSignalingClient->lwsSerializerLock); if (pSignalingClient->currentWsi[protocolIndex] != NULL) { lws_callback_on_writable(pSignalingClient->currentWsi[protocolIndex]); } - + MUTEX_UNLOCK(pSignalingClient->lwsSerializerLock); CleanUp: LEAVES(); diff --git a/tst/IceFunctionalityTest.cpp b/tst/IceFunctionalityTest.cpp index 3a27b6324b..4ba3250299 100644 --- a/tst/IceFunctionalityTest.cpp +++ b/tst/IceFunctionalityTest.cpp @@ -198,14 +198,15 @@ TEST_F(IceFunctionalityTest, connectionListenerFunctionalityTest) newConnectionCount = pConnectionListener->socketCount; EXPECT_EQ(connectionCount, newConnectionCount); - // Keeping TSAN happy need to lock/unlock when retrieving the value of TID + // receiveDataRoutine TID should be used under pConnectionListener->lock lock. MUTEX_LOCK(pConnectionListener->lock); threadId = pConnectionListener->receiveDataRoutine; - MUTEX_UNLOCK(pConnectionListener->lock); EXPECT_TRUE(IS_VALID_TID_VALUE(threadId)); ATOMIC_STORE_BOOL(&pConnectionListener->terminate, TRUE); THREAD_JOIN(threadId, NULL); + pConnectionListener->receiveDataRoutine = INVALID_MUTEX_VALUE; + MUTEX_UNLOCK(pConnectionListener->lock); EXPECT_EQ(STATUS_SUCCESS, freeConnectionListener(&pConnectionListener)); From f32487e27bd92dcbb805d26c31c3bd0e7c84f44e Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Mon, 11 Nov 2024 21:43:12 -0800 Subject: [PATCH 2/8] Clang format --- src/source/Ice/ConnectionListener.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/source/Ice/ConnectionListener.c b/src/source/Ice/ConnectionListener.c index f798e828c1..3a63f8dfde 100644 --- a/src/source/Ice/ConnectionListener.c +++ b/src/source/Ice/ConnectionListener.c @@ -62,7 +62,6 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) ATOMIC_STORE_BOOL(&pConnectionListener->terminate, TRUE); if (IS_VALID_MUTEX_VALUE(pConnectionListener->lock)) { - // TODO add support for windows socketpair // This writes to the socketpair, kicking the POLL() out early, // otherwise wait for the POLL to timeout @@ -77,7 +76,7 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) THREAD_JOIN(threadId, NULL); pConnectionListener->receiveDataRoutine = INVALID_MUTEX_VALUE; } - + MUTEX_UNLOCK(pConnectionListener->lock); MUTEX_FREE(pConnectionListener->lock); } From 8fa7eb79ac176197bbe67fdb634647484c52389d Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Mon, 11 Nov 2024 23:17:25 -0800 Subject: [PATCH 3/8] Fix RollingBuffer lastIndex race --- src/source/Rtcp/RtpRollingBuffer.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/source/Rtcp/RtpRollingBuffer.c b/src/source/Rtcp/RtpRollingBuffer.c index 123f670a2c..d2d67b0ea6 100644 --- a/src/source/Rtcp/RtpRollingBuffer.c +++ b/src/source/Rtcp/RtpRollingBuffer.c @@ -68,7 +68,11 @@ STATUS rtpRollingBufferAddRtpPacket(PRtpRollingBuffer pRollingBuffer, PRtpPacket pRawPacketCopy = NULL; CHK_STATUS(rollingBufferAppendData(pRollingBuffer->pRollingBuffer, (UINT64) pRtpPacketCopy, &index)); + + CHK(pRollingBuffer->pRollingBuffer != NULL, STATUS_NULL_ARG); + MUTEX_LOCK(pRollingBuffer->pRollingBuffer->lock); pRollingBuffer->lastIndex = index; + MUTEX_UNLOCK(pRollingBuffer->pRollingBuffer->lock); CleanUp: SAFE_MEMFREE(pRawPacketCopy); @@ -90,9 +94,12 @@ STATUS rtpRollingBufferGetValidSeqIndexList(PRtpRollingBuffer pRollingBuffer, PU PUINT64 pCurSeqIndexListPtr; UINT16 seqNum; UINT32 size = 0; + BOOL rollingBufferLocked = FALSE; - CHK(pRollingBuffer != NULL && pValidSeqIndexList != NULL && pSequenceNumberList != NULL, STATUS_NULL_ARG); + CHK(pRollingBuffer != NULL && pRollingBuffer->pRollingBuffer && pValidSeqIndexList != NULL && pSequenceNumberList != NULL, STATUS_NULL_ARG); + MUTEX_LOCK(pRollingBuffer->pRollingBuffer->lock); + rollingBufferLocked = TRUE; CHK_STATUS(rollingBufferGetSize(pRollingBuffer->pRollingBuffer, &size)); // Empty buffer, just return CHK(size > 0, retStatus); @@ -124,6 +131,10 @@ STATUS rtpRollingBufferGetValidSeqIndexList(PRtpRollingBuffer pRollingBuffer, PU } CleanUp: + if (rollingBufferLocked) { + MUTEX_UNLOCK(pRollingBuffer->pRollingBuffer->lock); + } + CHK_LOG_ERR(retStatus); if (pValidIndexListLen != NULL) { From 7c3a5f0dd70649dfad6b1e7c2e37f404704623c4 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Nov 2024 00:30:05 -0800 Subject: [PATCH 4/8] Protect socket close() and recvfrom() calls from data race --- src/source/Ice/ConnectionListener.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/source/Ice/ConnectionListener.c b/src/source/Ice/ConnectionListener.c index 3a63f8dfde..6e747ee990 100644 --- a/src/source/Ice/ConnectionListener.c +++ b/src/source/Ice/ConnectionListener.c @@ -81,6 +81,8 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) MUTEX_FREE(pConnectionListener->lock); } + MUTEX_LOCK(pConnectionListener->lock); + // TODO add support for windows socketpair #ifndef _WIN32 if (pConnectionListener->kickSocket[CONNECTION_LISTENER_KICK_SOCKET_LISTEN] != -1) { @@ -91,6 +93,8 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) } #endif + MUTEX_UNLOCK(pConnectionListener->lock); + MEMFREE(pConnectionListener); *ppConnectionListener = NULL; @@ -332,8 +336,11 @@ PVOID connectionListenerReceiveDataRoutine(PVOID arg) if (canReadFd(localSocket, rfds, nfds)) { iterate = TRUE; while (iterate) { + MUTEX_LOCK(pConnectionListener->lock); readLen = recvfrom(localSocket, pConnectionListener->pBuffer, pConnectionListener->bufferLen, 0, (struct sockaddr*) &srcAddrBuff, &srcAddrBuffLen); + MUTEX_UNLOCK(pConnectionListener->lock); + if (readLen < 0) { switch (getErrorCode()) { case EWOULDBLOCK: From 9264d43f89bfa14ba42a3f36c1c23e53a739c871 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Nov 2024 01:04:44 -0800 Subject: [PATCH 5/8] Fix too early freeing of mutex --- src/source/Ice/ConnectionListener.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/source/Ice/ConnectionListener.c b/src/source/Ice/ConnectionListener.c index 6e747ee990..04bdb06ebd 100644 --- a/src/source/Ice/ConnectionListener.c +++ b/src/source/Ice/ConnectionListener.c @@ -78,7 +78,6 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) } MUTEX_UNLOCK(pConnectionListener->lock); - MUTEX_FREE(pConnectionListener->lock); } MUTEX_LOCK(pConnectionListener->lock); @@ -94,6 +93,7 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) #endif MUTEX_UNLOCK(pConnectionListener->lock); + MUTEX_FREE(pConnectionListener->lock); MEMFREE(pConnectionListener); From d7def8faa35a01af442d7df44a9e6ab1fbc28013 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Nov 2024 12:31:00 -0800 Subject: [PATCH 6/8] Add logs to debug CI tests --- src/source/Ice/ConnectionListener.c | 8 ++++++++ src/source/Rtcp/RtpRollingBuffer.c | 5 +++++ src/source/Signaling/LwsApiCalls.c | 3 +++ tst/IceFunctionalityTest.cpp | 2 ++ 4 files changed, 18 insertions(+) diff --git a/src/source/Ice/ConnectionListener.c b/src/source/Ice/ConnectionListener.c index 04bdb06ebd..2d40bb2ab2 100644 --- a/src/source/Ice/ConnectionListener.c +++ b/src/source/Ice/ConnectionListener.c @@ -68,6 +68,8 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) #ifndef _WIN32 socketWrite(pConnectionListener->kickSocket[CONNECTION_LISTENER_KICK_SOCKET_WRITE], msg, STRLEN(msg)); #endif + + DLOGW("[TESTING] LOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine."); // receiveDataRoutine TID should be used under pConnectionListener->lock lock. MUTEX_LOCK(pConnectionListener->lock); threadId = pConnectionListener->receiveDataRoutine; @@ -78,8 +80,10 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) } MUTEX_UNLOCK(pConnectionListener->lock); + DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine."); } + DLOGW("[TESTING] LOCKING pConnectionListener->lock for closeSocket."); MUTEX_LOCK(pConnectionListener->lock); // TODO add support for windows socketpair @@ -93,6 +97,8 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) #endif MUTEX_UNLOCK(pConnectionListener->lock); + DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for closeSocket."); + MUTEX_FREE(pConnectionListener->lock); MEMFREE(pConnectionListener); @@ -336,10 +342,12 @@ PVOID connectionListenerReceiveDataRoutine(PVOID arg) if (canReadFd(localSocket, rfds, nfds)) { iterate = TRUE; while (iterate) { + DLOGW("[TESTING] LOCKING pConnectionListener->lock for recvfrom."); MUTEX_LOCK(pConnectionListener->lock); readLen = recvfrom(localSocket, pConnectionListener->pBuffer, pConnectionListener->bufferLen, 0, (struct sockaddr*) &srcAddrBuff, &srcAddrBuffLen); MUTEX_UNLOCK(pConnectionListener->lock); + DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for recvfrom."); if (readLen < 0) { switch (getErrorCode()) { diff --git a/src/source/Rtcp/RtpRollingBuffer.c b/src/source/Rtcp/RtpRollingBuffer.c index d2d67b0ea6..c349297c36 100644 --- a/src/source/Rtcp/RtpRollingBuffer.c +++ b/src/source/Rtcp/RtpRollingBuffer.c @@ -70,9 +70,12 @@ STATUS rtpRollingBufferAddRtpPacket(PRtpRollingBuffer pRollingBuffer, PRtpPacket CHK_STATUS(rollingBufferAppendData(pRollingBuffer->pRollingBuffer, (UINT64) pRtpPacketCopy, &index)); CHK(pRollingBuffer->pRollingBuffer != NULL, STATUS_NULL_ARG); + + DLOGW("[TESTING] LOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex."); MUTEX_LOCK(pRollingBuffer->pRollingBuffer->lock); pRollingBuffer->lastIndex = index; MUTEX_UNLOCK(pRollingBuffer->pRollingBuffer->lock); + DLOGW("[TESTING] UNLOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex."); CleanUp: SAFE_MEMFREE(pRawPacketCopy); @@ -98,6 +101,7 @@ STATUS rtpRollingBufferGetValidSeqIndexList(PRtpRollingBuffer pRollingBuffer, PU CHK(pRollingBuffer != NULL && pRollingBuffer->pRollingBuffer && pValidSeqIndexList != NULL && pSequenceNumberList != NULL, STATUS_NULL_ARG); + DLOGW("[TESTING] LOCKING pRollingBuffer->lock for pRollingBuffer size."); MUTEX_LOCK(pRollingBuffer->pRollingBuffer->lock); rollingBufferLocked = TRUE; CHK_STATUS(rollingBufferGetSize(pRollingBuffer->pRollingBuffer, &size)); @@ -133,6 +137,7 @@ STATUS rtpRollingBufferGetValidSeqIndexList(PRtpRollingBuffer pRollingBuffer, PU CleanUp: if (rollingBufferLocked) { MUTEX_UNLOCK(pRollingBuffer->pRollingBuffer->lock); + DLOGW("[TESTING] UNLOCKING pRollingBuffer->lock for pRollingBuffer size."); } CHK_LOG_ERR(retStatus); diff --git a/src/source/Signaling/LwsApiCalls.c b/src/source/Signaling/LwsApiCalls.c index aa1bd6bb3c..4b72e84092 100644 --- a/src/source/Signaling/LwsApiCalls.c +++ b/src/source/Signaling/LwsApiCalls.c @@ -2417,12 +2417,15 @@ STATUS wakeLwsServiceEventLoop(PSignalingClient pSignalingClient, UINT32 protoco // Early exit in case we don't need to do anything CHK(pSignalingClient != NULL && pSignalingClient->pLwsContext != NULL, retStatus); + DLOGW("[TESTING] LOCKING pSignalingClient->lwsSerializerLock for pSignalingClient->currentWsi."); // currentWsi should be used under lwsSerializerLock. MUTEX_LOCK(pSignalingClient->lwsSerializerLock); if (pSignalingClient->currentWsi[protocolIndex] != NULL) { lws_callback_on_writable(pSignalingClient->currentWsi[protocolIndex]); } MUTEX_UNLOCK(pSignalingClient->lwsSerializerLock); + DLOGW("[TESTING] UNLOCKING pSignalingClient->lwsSerializerLock for pSignalingClient->currentWsi."); + CleanUp: LEAVES(); diff --git a/tst/IceFunctionalityTest.cpp b/tst/IceFunctionalityTest.cpp index 4ba3250299..d6f7e59515 100644 --- a/tst/IceFunctionalityTest.cpp +++ b/tst/IceFunctionalityTest.cpp @@ -199,6 +199,7 @@ TEST_F(IceFunctionalityTest, connectionListenerFunctionalityTest) EXPECT_EQ(connectionCount, newConnectionCount); // receiveDataRoutine TID should be used under pConnectionListener->lock lock. + DLOGW("[TESTING] LOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine."); MUTEX_LOCK(pConnectionListener->lock); threadId = pConnectionListener->receiveDataRoutine; EXPECT_TRUE(IS_VALID_TID_VALUE(threadId)); @@ -207,6 +208,7 @@ TEST_F(IceFunctionalityTest, connectionListenerFunctionalityTest) THREAD_JOIN(threadId, NULL); pConnectionListener->receiveDataRoutine = INVALID_MUTEX_VALUE; MUTEX_UNLOCK(pConnectionListener->lock); + DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine."); EXPECT_EQ(STATUS_SUCCESS, freeConnectionListener(&pConnectionListener)); From 5890840d5b9efdc82ef3b4d5bcf3a8bbc3940846 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:28:50 -0800 Subject: [PATCH 7/8] INVALID_MUTEX_VALUE -> INVALID_TID_VALUE --- src/source/Ice/ConnectionListener.c | 2 +- tst/IceFunctionalityTest.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/source/Ice/ConnectionListener.c b/src/source/Ice/ConnectionListener.c index 2d40bb2ab2..aef83ed945 100644 --- a/src/source/Ice/ConnectionListener.c +++ b/src/source/Ice/ConnectionListener.c @@ -76,7 +76,7 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) // wait for thread to finish. if (IS_VALID_TID_VALUE(threadId)) { THREAD_JOIN(threadId, NULL); - pConnectionListener->receiveDataRoutine = INVALID_MUTEX_VALUE; + pConnectionListener->receiveDataRoutine = INVALID_TID_VALUE; } MUTEX_UNLOCK(pConnectionListener->lock); diff --git a/tst/IceFunctionalityTest.cpp b/tst/IceFunctionalityTest.cpp index d6f7e59515..24945e12d6 100644 --- a/tst/IceFunctionalityTest.cpp +++ b/tst/IceFunctionalityTest.cpp @@ -206,7 +206,7 @@ TEST_F(IceFunctionalityTest, connectionListenerFunctionalityTest) ATOMIC_STORE_BOOL(&pConnectionListener->terminate, TRUE); THREAD_JOIN(threadId, NULL); - pConnectionListener->receiveDataRoutine = INVALID_MUTEX_VALUE; + pConnectionListener->receiveDataRoutine = INVALID_TID_VALUE; MUTEX_UNLOCK(pConnectionListener->lock); DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine."); From 42f7f87f53420b2dd7729f3e01ad09f2adb4b621 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Nov 2024 11:33:44 -0800 Subject: [PATCH 8/8] Comment out verbose testing log --- src/source/Rtcp/RtpRollingBuffer.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/source/Rtcp/RtpRollingBuffer.c b/src/source/Rtcp/RtpRollingBuffer.c index c349297c36..f5cbec1108 100644 --- a/src/source/Rtcp/RtpRollingBuffer.c +++ b/src/source/Rtcp/RtpRollingBuffer.c @@ -71,11 +71,11 @@ STATUS rtpRollingBufferAddRtpPacket(PRtpRollingBuffer pRollingBuffer, PRtpPacket CHK(pRollingBuffer->pRollingBuffer != NULL, STATUS_NULL_ARG); - DLOGW("[TESTING] LOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex."); + // DLOGW("[TESTING] LOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex."); MUTEX_LOCK(pRollingBuffer->pRollingBuffer->lock); pRollingBuffer->lastIndex = index; MUTEX_UNLOCK(pRollingBuffer->pRollingBuffer->lock); - DLOGW("[TESTING] UNLOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex."); + // DLOGW("[TESTING] UNLOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex."); CleanUp: SAFE_MEMFREE(pRawPacketCopy);