From a8b20f02ecd0aba95b9b5da8bf6de143aad6494d Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Tue, 22 Aug 2023 11:10:35 +0200 Subject: [PATCH] fix(java): fix cancellation of workmanager workers --- config/android-uploader.yaml | 2 + .../workers/AbstractUploadWorker.mustache | 7 ++- .../workers/ProgressiveUploadWorker.mustache | 38 +++++++++++---- .../work/workers/UploadWorker.mustache | 46 +++++++++++++------ 4 files changed, 69 insertions(+), 24 deletions(-) diff --git a/config/android-uploader.yaml b/config/android-uploader.yaml index 9834c079..7526c847 100644 --- a/config/android-uploader.yaml +++ b/config/android-uploader.yaml @@ -1,4 +1,6 @@ changelog: + - 1.3.1 (2023-08-22): + - Fix cancellation of upload workers for the WorkManager API - 1.3.0 (2023-08-21): - Improve cancel of upload workers for the WorkManager API - 1.2.4 (2023-08-10): diff --git a/templates/java/android/work/workers/AbstractUploadWorker.mustache b/templates/java/android/work/workers/AbstractUploadWorker.mustache index e415c02d..7670fe8d 100644 --- a/templates/java/android/work/workers/AbstractUploadWorker.mustache +++ b/templates/java/android/work/workers/AbstractUploadWorker.mustache @@ -14,6 +14,7 @@ import {{invokerPackage}}.work.stores.NotificationConfigurationStore import {{invokerPackage}}.work.stores.NotificationConfigurationStore.channelId import {{invokerPackage}}.work.stores.NotificationConfigurationStore.notificationColorResourceId import {{invokerPackage}}.work.stores.NotificationConfigurationStore.notificationIconResourceId +import java.util.concurrent.Executors /** * Worker that uploads a video to api.video. @@ -146,7 +147,9 @@ abstract class AbstractUploadWorker( const val ERROR_KEY = "error" const val FILE_PATH_KEY = "filePath" - @OptIn(ExperimentalCoroutinesApi::class) - internal val limitedCoroutineContext = Dispatchers.IO.limitedParallelism(1) + /** + * A single thread executor to be used for upload to avoid parallel uploads. + */ + internal val uploaderExecutor = Executors.newSingleThreadExecutor() } } \ No newline at end of file diff --git a/templates/java/android/work/workers/ProgressiveUploadWorker.mustache b/templates/java/android/work/workers/ProgressiveUploadWorker.mustache index c5cdfee4..c728f471 100644 --- a/templates/java/android/work/workers/ProgressiveUploadWorker.mustache +++ b/templates/java/android/work/workers/ProgressiveUploadWorker.mustache @@ -6,12 +6,15 @@ import androidx.work.Data import androidx.work.WorkInfo import androidx.work.WorkerParameters import androidx.work.workDataOf -import kotlinx.coroutines.withContext +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.suspendCancellableCoroutine import {{invokerPackage}}.JSON import {{invokerPackage}}.upload.UploadPartProgressListener import {{invokerPackage}}.work.stores.ProgressiveUploadSessionStore import java.io.File import java.io.IOException +import kotlin.coroutines.cancellation.CancellationException +import kotlin.coroutines.resumeWithException /** @@ -40,6 +43,7 @@ class ProgressiveUploadWorker( ) : AbstractUploadWorker(context, workerParams), UploadPartProgressListener { + @OptIn(ExperimentalCoroutinesApi::class) override suspend fun doWork(): Result { createNotificationChannel() setForeground(createForegroundInfo(onUploadStarted())) @@ -60,14 +64,27 @@ class ProgressiveUploadWorker( val file = File(filePath) + /** + * Use an executor to make the coroutine cancellable. + */ return try { - val video = withContext(limitedCoroutineContext) { - if (partId != DEFAULT_PART_ID) { - ProgressiveUploadSessionStore.get(sessionIndex)!! - .uploadPart(file, partId, isLastPart, this@ProgressiveUploadWorker) - } else { - ProgressiveUploadSessionStore.get(sessionIndex)!! - .uploadPart(file, isLastPart, this@ProgressiveUploadWorker) + val video = suspendCancellableCoroutine { continuation -> + val future = uploaderExecutor.submit { + try { + val video = if (partId != DEFAULT_PART_ID) { + ProgressiveUploadSessionStore.get(sessionIndex)!! + .uploadPart(file, partId, isLastPart, this@ProgressiveUploadWorker) + } else { + ProgressiveUploadSessionStore.get(sessionIndex)!! + .uploadPart(file, isLastPart, this@ProgressiveUploadWorker) + } + continuation.resume(video, null) + } catch (e: Exception) { + continuation.resumeWithException(e) + } + } + continuation.invokeOnCancellation { + future.cancel(true) } } Result.success( @@ -76,8 +93,11 @@ class ProgressiveUploadWorker( VIDEO_KEY to JSON().serialize(video) ) ) + } catch (e: CancellationException) { + Log.i(TAG, "Upload part $partId cancelled") + Result.failure(workDataOf(ERROR_KEY to e.message)) } catch (e: Exception) { - Log.e(TAG, "Upload failed", e) + Log.e(TAG, "Upload part $partId failed", e) createErrorNotification(e) Result.failure(workDataOf(ERROR_KEY to e.message)) } diff --git a/templates/java/android/work/workers/UploadWorker.mustache b/templates/java/android/work/workers/UploadWorker.mustache index b1884cc3..19bd5ff7 100644 --- a/templates/java/android/work/workers/UploadWorker.mustache +++ b/templates/java/android/work/workers/UploadWorker.mustache @@ -6,7 +6,8 @@ import androidx.work.Data import androidx.work.WorkInfo import androidx.work.WorkerParameters import androidx.work.workDataOf -import kotlinx.coroutines.withContext +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.suspendCancellableCoroutine import {{invokerPackage}}.JSON import {{invokerPackage}}.upload.UploadProgressListener import {{invokerPackage}}.work.stores.VideosApiStore @@ -14,6 +15,8 @@ import {{invokerPackage}}.work.workers.UploadWorker.Companion.TOKEN_KEY import {{invokerPackage}}.work.workers.UploadWorker.Companion.VIDEO_ID_KEY import java.io.File import java.io.IOException +import kotlin.coroutines.cancellation.CancellationException +import kotlin.coroutines.resumeWithException /** * Worker that uploads a video to api.video. @@ -41,6 +44,7 @@ class UploadWorker( ) : AbstractUploadWorker(context, workerParams), UploadProgressListener { + @OptIn(ExperimentalCoroutinesApi::class) override suspend fun doWork(): Result { createNotificationChannel() setForeground(createForegroundInfo(onUploadStarted())) @@ -62,19 +66,32 @@ class UploadWorker( val file = File(filePath) + /** + * Use an executor to make the coroutine cancellable. + */ return try { - val video = withContext(limitedCoroutineContext) { - if (token == null) { - videosApi.upload( - videoId, file, this@UploadWorker - ) - } else { - videosApi.uploadWithUploadToken( - token, - file, - videoId, - this@UploadWorker - ) + val video = suspendCancellableCoroutine { continuation -> + val future = uploaderExecutor.submit { + try { + val video = if (token == null) { + videosApi.upload( + videoId, file, this@UploadWorker + ) + } else { + videosApi.uploadWithUploadToken( + token, + file, + videoId, + this@UploadWorker + ) + } + continuation.resume(video, null) + } catch (e: Exception) { + continuation.resumeWithException(e) + } + } + continuation.invokeOnCancellation { + future.cancel(true) } } Result.success( @@ -83,6 +100,9 @@ class UploadWorker( VIDEO_KEY to JSON().serialize(video) ) ) + } catch (e: CancellationException) { + Log.i(TAG, "Upload cancelled") + Result.failure(workDataOf(ERROR_KEY to e.message)) } catch (e: Exception) { Log.e(TAG, "Upload failed", e) createErrorNotification(e)