diff --git a/stream-video-android-core/api/stream-video-android-core.api b/stream-video-android-core/api/stream-video-android-core.api index add60884b4..04eaf5ccb7 100644 --- a/stream-video-android-core/api/stream-video-android-core.api +++ b/stream-video-android-core/api/stream-video-android-core.api @@ -297,6 +297,7 @@ public final class io/getstream/video/android/core/ConnectionState$Disconnected public final class io/getstream/video/android/core/ConnectionState$Failed : io/getstream/video/android/core/ConnectionState { public fun (Ljava/lang/Error;)V + public final fun getError ()Ljava/lang/Error; } public final class io/getstream/video/android/core/ConnectionState$Loading : io/getstream/video/android/core/ConnectionState { @@ -824,7 +825,6 @@ public final class io/getstream/video/android/core/StreamVideoBuilder { public fun (Landroid/content/Context;Ljava/lang/String;Lio/getstream/video/android/core/GEO;Lio/getstream/video/android/model/User;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lio/getstream/video/android/core/logging/LoggingLevel;Lio/getstream/video/android/core/notifications/NotificationConfig;Lkotlin/jvm/functions/Function1;JZLjava/lang/String;ZLjava/lang/String;Lio/getstream/video/android/core/sounds/Sounds;ZLio/getstream/video/android/core/permission/android/StreamPermissionCheck;I)V public synthetic fun (Landroid/content/Context;Ljava/lang/String;Lio/getstream/video/android/core/GEO;Lio/getstream/video/android/model/User;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lio/getstream/video/android/core/logging/LoggingLevel;Lio/getstream/video/android/core/notifications/NotificationConfig;Lkotlin/jvm/functions/Function1;JZLjava/lang/String;ZLjava/lang/String;Lio/getstream/video/android/core/sounds/Sounds;ZLio/getstream/video/android/core/permission/android/StreamPermissionCheck;IILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun build ()Lio/getstream/video/android/core/StreamVideo; - public final fun getScope ()Lkotlinx/coroutines/CoroutineScope; } public abstract interface class io/getstream/video/android/core/StreamVideoConfig { @@ -4295,7 +4295,7 @@ public final class io/getstream/video/android/core/socket/ErrorResponse$Companio } public class io/getstream/video/android/core/socket/PersistentSocket : okhttp3/WebSocketListener { - public field connected Lkotlinx/coroutines/CancellableContinuation; + public field connectContinuation Lkotlinx/coroutines/CancellableContinuation; public fun (Ljava/lang/String;Lokhttp3/OkHttpClient;Lio/getstream/video/android/core/internal/network/NetworkStateProvider;Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function1;)V public synthetic fun (Ljava/lang/String;Lokhttp3/OkHttpClient;Lio/getstream/video/android/core/internal/network/NetworkStateProvider;Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V protected final fun ackHealthMonitor ()V @@ -4304,7 +4304,7 @@ public class io/getstream/video/android/core/socket/PersistentSocket : okhttp3/W public fun connect (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun connect$default (Lio/getstream/video/android/core/socket/PersistentSocket;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun disconnect (Lio/getstream/video/android/core/socket/PersistentSocket$DisconnectReason;)V - public final fun getConnected ()Lkotlinx/coroutines/CancellableContinuation; + public final fun getConnectContinuation ()Lkotlinx/coroutines/CancellableContinuation; public final fun getConnectionId ()Lkotlinx/coroutines/flow/StateFlow; public final fun getConnectionState ()Lkotlinx/coroutines/flow/StateFlow; protected final fun getDestroyed ()Z @@ -4319,7 +4319,7 @@ public class io/getstream/video/android/core/socket/PersistentSocket : okhttp3/W public fun onOpen (Lokhttp3/WebSocket;Lokhttp3/Response;)V public final fun reconnect (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun reconnect$default (Lio/getstream/video/android/core/socket/PersistentSocket;JLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; - public final fun setConnected (Lkotlinx/coroutines/CancellableContinuation;)V + public final fun setConnectContinuation (Lkotlinx/coroutines/CancellableContinuation;)V protected final fun setConnectedStateAndContinue (Lorg/openapitools/client/models/VideoEvent;)V protected final fun setDestroyed (Z)V public final fun setReconnectTimeout (J)V diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ClientState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ClientState.kt index 0fe0daf8eb..d76d8ac040 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ClientState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ClientState.kt @@ -28,6 +28,7 @@ import org.openapitools.client.models.CallCreatedEvent import org.openapitools.client.models.CallRingEvent import org.openapitools.client.models.ConnectedEvent import org.openapitools.client.models.VideoEvent +import java.net.ConnectException @Stable public sealed interface ConnectionState { @@ -36,7 +37,7 @@ public sealed interface ConnectionState { public data object Connected : ConnectionState public data object Reconnecting : ConnectionState public data object Disconnected : ConnectionState - public class Failed(error: Error) : ConnectionState + public class Failed(val error: Error) : ConnectionState } @Stable @@ -57,11 +58,12 @@ class ClientState(client: StreamVideo) { private val _user: MutableStateFlow = MutableStateFlow(client.user) public val user: StateFlow = _user - /** - * connectionState shows if we've established a connection with the coordinator - */ private val _connection: MutableStateFlow = MutableStateFlow(ConnectionState.PreConnect) + + /** + * Shows the Coordinator connection state + */ public val connection: StateFlow = _connection /** @@ -102,6 +104,12 @@ class ClientState(client: StreamVideo) { } } + internal fun handleError(error: Throwable) { + if (error is ConnectException) { + _connection.value = ConnectionState.Failed(error = Error(error)) + } + } + fun setActiveCall(call: Call) { removeRingingCall() maybeStartForegroundService(call, CallService.TRIGGER_ONGOING_CALL) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoBuilder.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoBuilder.kt index 73f9b3160c..d93bd1a5a1 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoBuilder.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoBuilder.kt @@ -38,6 +38,8 @@ import io.getstream.video.android.model.UserToken import io.getstream.video.android.model.UserType import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch +import java.lang.RuntimeException +import java.net.ConnectException /** * The [StreamVideoBuilder] is used to create a new instance of the [StreamVideo] client. This is the @@ -50,28 +52,33 @@ import kotlinx.coroutines.launch * geo = GEO.GlobalEdgeNetwork, * user = user, * token = token, - * loggingLevel = LoggingLevel.BODY - * ) + * loggingLevel = LoggingLevel.BODY, + * // ... + * ).build() *``` * * @property context Android [Context] to be used for initializing Android resources. - * @property apiKey Your Stream API Key, you can find it in the dashboard. - * @property geo Your GEO routing policy, supports geofencing for privacy concerns. - * @property user The user object, can be a regular user, guest user or anonymous. - * @property token The token for this user generated using your API secret on your server. - * @property tokenProvider If a token is expired, the token provider makes a request to your backend for a new token. + * @property apiKey Your Stream API Key. You can find it in the dashboard. + * @property geo Your GEO routing policy. Supports geofencing for privacy concerns. + * @property user The user object. Can be an authenticated user, guest user or anonymous. + * @property token The token for this user, generated using your API secret on your server. + * @property tokenProvider Used to make a request to your backend for a new token when the token has expired. * @property loggingLevel Represents and wraps the HTTP logging level for our API service. * @property notificationConfig The configurations for handling push notification. * @property ringNotification Overwrite the default notification logic for incoming calls. * @property connectionTimeoutInMs Connection timeout in seconds. - * @property ensureSingleInstance Verify that only 1 version of the video client exists, prevents integration mistakes. + * @property ensureSingleInstance Verify that only 1 version of the video client exists. Prevents integration mistakes. * @property videoDomain URL overwrite to allow for testing against a local instance of video. * @property runForegroundServiceForCalls If set to true, when there is an active call the SDK will run a foreground service to keep the process alive. (default: true) * @property localSfuAddress Local SFU address (IP:port) to be used for testing. Leave null if not needed. * @property sounds Overwrite the default SDK sounds. See [Sounds]. - * @property crashOnMissingPermission if [permissionCheck] returns false there will be an exception. - * @property permissionCheck used to check for system permission based on call capabilities. See [StreamPermissionCheck]. - * @property audioUsage used to signal to the system how to treat the audio tracks (voip or media). + * @property crashOnMissingPermission If [permissionCheck] returns false there will be an exception. + * @property permissionCheck Used to check for system permission based on call capabilities. See [StreamPermissionCheck]. + * @property audioUsage Used to signal to the system how to treat the audio tracks (voip or media). + * + * @see build + * @see ClientState.connection + * */ public class StreamVideoBuilder @JvmOverloads constructor( context: Context, @@ -94,21 +101,31 @@ public class StreamVideoBuilder @JvmOverloads constructor( private val audioUsage: Int = defaultAudioUsage, ) { private val context: Context = context.applicationContext - - val scope = CoroutineScope(DispatcherProvider.IO) - + private val scope = CoroutineScope(DispatcherProvider.IO) + + /** + * Builds the [StreamVideo] client. + * + * @return The [StreamVideo] client. + * + * @throws RuntimeException If an instance of the client already exists and [ensureSingleInstance] is set to true. + * @throws IllegalArgumentException If [apiKey] is blank. + * @throws IllegalArgumentException If [user] type is [UserType.Authenticated] and the [user] id is blank. + * @throws IllegalArgumentException If [user] type is [UserType.Authenticated] and both [token] and [tokenProvider] are empty. + * @throws ConnectException If the WebSocket connection fails. + */ public fun build(): StreamVideo { val lifecycle = ProcessLifecycleOwner.get().lifecycle val existingInstance = StreamVideo.instanceOrNull() if (existingInstance != null && ensureSingleInstance) { - throw IllegalArgumentException( + throw RuntimeException( "Creating 2 instance of the video client will cause bugs with call.state. Before creating a new client, please remove the old one. You can remove the old client using StreamVideo.removeClient()", ) } if (apiKey.isBlank()) { - throw IllegalArgumentException("The API key can not be empty") + throw IllegalArgumentException("The API key cannot be blank") } if (token.isBlank() && tokenProvider == null && user.type == UserType.Authenticated) { @@ -117,9 +134,9 @@ public class StreamVideoBuilder @JvmOverloads constructor( ) } - if (user.type == UserType.Authenticated && user.id.isEmpty()) { + if (user.type == UserType.Authenticated && user.id.isBlank()) { throw IllegalArgumentException( - "Please specify the user id for authenticated users", + "The user ID cannot be empty for authenticated users", ) } @@ -127,11 +144,11 @@ public class StreamVideoBuilder @JvmOverloads constructor( user = user.copy(role = "user") } - /** initialize Stream internal loggers. */ + // Initialize Stream internal loggers StreamLog.install(AndroidStreamLogger()) StreamLog.setValidator { priority, _ -> priority.level >= loggingLevel.priority.level } - /** android JSR-310 backport backport. */ + // Android JSR-310 backport backport AndroidThreeTen.init(context) // This connection module class exposes the connections to the various retrofit APIs. @@ -148,7 +165,7 @@ public class StreamVideoBuilder @JvmOverloads constructor( val deviceTokenStorage = DeviceTokenStorage(context) - // install the StreamNotificationManager to configure push notifications. + // Install the StreamNotificationManager to configure push notifications. val streamNotificationManager = StreamNotificationManager.install( context = context, scope = scope, @@ -157,7 +174,7 @@ public class StreamVideoBuilder @JvmOverloads constructor( deviceTokenStorage = deviceTokenStorage, ) - // create the client + // Create the client val client = StreamVideoImpl( context = context, _scope = scope, @@ -183,25 +200,30 @@ public class StreamVideoBuilder @JvmOverloads constructor( connectionModule.updateAuthType("anonymous") } - // establish a ws connection with the coordinator (we don't support this for anonymous users) + // Establish a WS connection with the coordinator (we don't support this for anonymous users) if (user.type != UserType.Anonymous) { scope.launch { - val result = client.connectAsync().await() - result.onSuccess { - streamLog { "connection succeed! (duration: ${result.getOrNull()})" } - }.onError { - streamLog { it.message } + try { + val result = client.connectAsync().await() + result.onSuccess { + streamLog { "Connection succeeded! (duration: ${result.getOrNull()})" } + }.onError { + streamLog { it.message } + } + } catch (e: Exception) { + // If the connect continuation was resumed with an exception, we catch it here. + streamLog { e.message.orEmpty() } } } } - // see which location is best to connect to + // See which location is best to connect to scope.launch { val location = client.loadLocationAsync().await() streamLog { "location initialized: ${location.getOrNull()}" } } - // installs Stream Video instance + // Installs Stream Video instance StreamVideo.install(client) // Needs to be started after the client is initialised because the VideoPushDelegate diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoImpl.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoImpl.kt index 817595ab71..71f0a15af1 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoImpl.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoImpl.kt @@ -121,6 +121,7 @@ import org.openapitools.client.models.UserRequest import org.openapitools.client.models.VideoEvent import org.openapitools.client.models.WSCallEvent import retrofit2.HttpException +import java.net.ConnectException import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.resumeWithException @@ -356,8 +357,12 @@ internal class StreamVideoImpl internal constructor( } scope.launch { connectionModule.coordinatorSocket.errors.collect { throwable -> - (throwable as? ErrorResponse)?.let { - if (it.code == VideoErrorCode.TOKEN_EXPIRED.code) refreshToken(it) + if (throwable is ConnectException) { + state.handleError(throwable) + } else { + (throwable as? ErrorResponse)?.let { + if (it.code == VideoErrorCode.TOKEN_EXPIRED.code) refreshToken(it) + } } } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/PersistentSocket.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/PersistentSocket.kt index 064a1b27b0..e874d9d51e 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/PersistentSocket.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/PersistentSocket.kt @@ -39,6 +39,7 @@ import org.openapitools.client.models.VideoEvent import stream.video.sfu.event.HealthCheckRequest import java.io.IOException import java.io.InterruptedIOException +import java.net.ConnectException import java.net.SocketTimeoutException import java.net.UnknownHostException import java.util.concurrent.Executors @@ -90,12 +91,12 @@ public open class PersistentSocket( val connectionId: StateFlow = _connectionId /** Continuation if the socket successfully connected and we've authenticated */ - lateinit var connected: CancellableContinuation + lateinit var connectContinuation: CancellableContinuation - internal var socket: WebSocket? = null + // Controls when we can resume the continuation + private var connectContinuationCompleted: Boolean = false - // prevent us from resuming the continuation twice - private var continuationCompleted: Boolean = false + internal var socket: WebSocket? = null // True if cleanup was called and socket is completely destroyed (intentionally). // You need to create a new instance (this is mainly used for the SfuSocket which is tied @@ -105,7 +106,11 @@ public open class PersistentSocket( internal var reconnectionAttempts = 0 /** - * Connect the socket, authenticate, start the healthmonitor and see if the network is online + * Connect the socket, authenticate, start the health monitor and see if the network is online + * @param invocation Provides a way to extend the [connect] method with additional behavior. + * This can be useful in cases where additional setup or checks need to be performed + * once the socket is connected but before the [connect] method returns. + * To return from [connect] and to resume the enclosing coroutine, use the provided [CancellableContinuation] parameter. */ open suspend fun connect( invocation: (CancellableContinuation) -> Unit = {}, @@ -115,9 +120,9 @@ public open class PersistentSocket( return null } - return suspendCancellableCoroutine { connectedContinuation -> + return suspendCancellableCoroutine { continuation -> logger.i { "[connect]" } - connected = connectedContinuation + connectContinuation = continuation _connectionState.value = SocketState.Connecting @@ -138,7 +143,7 @@ public open class PersistentSocket( networkStateProvider.subscribe(networkStateListener) // run the invocation - invocation.invoke(connectedContinuation) + invocation.invoke(continuation) } } } @@ -171,7 +176,7 @@ public open class PersistentSocket( is DisconnectReason.PermanentError -> SocketState.DisconnectedPermanently(disconnectReason.error) } - continuationCompleted = false + connectContinuationCompleted = false disconnectSocket() healthMonitor.stop() @@ -215,7 +220,16 @@ public open class PersistentSocket( reconnectionAttempts++ - connect() + tryConnect() + } + + private suspend fun tryConnect() { + try { + connectContinuationCompleted = false + connect() + } catch (e: Exception) { + logger.e { "[reconnect] failed to reconnect: $e" } + } } open fun authenticate() { } @@ -270,17 +284,41 @@ public open class PersistentSocket( protected fun setConnectedStateAndContinue(message: VideoEvent) { _connectionState.value = SocketState.Connected(message) - if (!continuationCompleted) { - continuationCompleted = true - connected.resume(message as T) + if (!connectContinuationCompleted) { + connectContinuationCompleted = true + connectContinuation.resume(message as T) } } - private fun setPermanentFailureAndContinue(error: Throwable) { - _connectionState.value = SocketState.DisconnectedPermanently(error) - if (!continuationCompleted) { - continuationCompleted = true - connected.resumeWithException(error) + internal fun handleError(error: Throwable) { + // onFailure, onClosed and the 2 onMessage can all generate errors + // temporary errors should be logged and retried + // permanent errors should be emitted so the app can decide how to handle it + if (destroyed) { + logger.d { "[handleError] Ignoring socket error - already closed $error" } + return + } + + val permanentError = isPermanentError(error) + if (permanentError) { + logger.e { "[handleError] Permanent error: $error" } + + _connectionState.value = SocketState.DisconnectedPermanently(error) + + // If the connect continuation is not completed, it means the error happened during the connection phase. + connectContinuationCompleted.not().let { isConnectionPhaseError -> + if (isConnectionPhaseError) { + emitError(error, isConnectionPhaseError = true) + resumeConnectionPhaseWithException(error) + } else { + emitError(error, isConnectionPhaseError = false) + } + } + } else { + logger.w { "[handleError] Temporary error: $error" } + + _connectionState.value = SocketState.DisconnectedTemporarily(error) + scope.launch { reconnect(reconnectTimeout) } } } @@ -306,31 +344,25 @@ public open class PersistentSocket( return isPermanent } - internal fun handleError(error: Throwable) { - // onFailure, onClosed and the 2 onMessage can all generate errors - // temporary errors should be logged and retried - // permanent errors should be emitted so the app can decide how to handle it - if (destroyed) { - logger.d { "[handleError] Ignoring socket error - already closed $error" } - return - } - - val permanentError = isPermanentError(error) - if (permanentError) { - // close the connection loop - setPermanentFailureAndContinue(error) - logger.e { "[handleError] permanent error: $error" } - // mark us permanently disconnected - scope.launch { + private fun emitError(error: Throwable, isConnectionPhaseError: Boolean) { + scope.launch { + if (isConnectionPhaseError) { + errors.emit( + ConnectException( + "Failed to establish WebSocket connection. Will try to reconnect. Cause: ${error.message}", + ), + ) + } else { errors.emit(error) } - } else { - logger.w { "[handleError] temporary error: $error" } - _connectionState.value = SocketState.DisconnectedTemporarily(error) - scope.launch { reconnect(reconnectTimeout) } } } + private fun resumeConnectionPhaseWithException(error: Throwable) { + connectContinuationCompleted = true + connectContinuation.resumeWithException(error) + } + /** * Invoked when the remote peer has indicated that no more incoming messages will be transmitted. */ diff --git a/stream-video-android-core/src/main/kotlin/org/openapitools/client/models/UserResponse.kt b/stream-video-android-core/src/main/kotlin/org/openapitools/client/models/UserResponse.kt index 315887a767..8ce9cf47e2 100644 --- a/stream-video-android-core/src/main/kotlin/org/openapitools/client/models/UserResponse.kt +++ b/stream-video-android-core/src/main/kotlin/org/openapitools/client/models/UserResponse.kt @@ -59,7 +59,7 @@ import org.openapitools.client.infrastructure.Serializer data class UserResponse ( @Json(name = "banned") - val banned: kotlin.Boolean, + val banned: kotlin.Boolean = false, /* Date/time of creation */ @Json(name = "created_at") diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ClientAndAuthTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ClientAndAuthTest.kt index f91b09df0c..c9c043a715 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ClientAndAuthTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ClientAndAuthTest.kt @@ -208,7 +208,7 @@ class ClientAndAuthTest : TestBase() { // } } - @Test(expected = IllegalArgumentException::class) + @Test(expected = RuntimeException::class) fun `two clients is not allowed`() = runTest { val builder = StreamVideoBuilder( context = context, diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/SocketTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/SocketTest.kt index 678d3cd308..1440dbec9b 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/SocketTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/SocketTest.kt @@ -197,7 +197,7 @@ class CoordinatorSocketTest : SocketTestBase() { ) socket.connect() - socket.connected.cancel() + socket.connectContinuation.cancel() // create a VideoEvent type that resembles a real one, but doesn't contain the necessary fields val testJson = "{\"type\":\"health.check\"}"