Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream SDK 6.0.3 sending a new message with image or file attachments is not storing the message or its attachments #4973

Closed
SchramboVida opened this issue Sep 28, 2023 · 16 comments · Fixed by #5142
Labels
bug Something isn't working

Comments

@SchramboVida
Copy link

Describe the bug
Using Stream SDK io.getstream:stream-chat-android-compose:6.0.3 (we're upgrading from 5.17.10) we have the ChatClient configured to use a custom CDN uploader via .fileUploader(ChatFileUploader()).

Our implemention of ChatFileUploader() which is a subclass of io.getstream.chat.android.client.uploader.FileUploader provides the required sendImage(...) and sendFile(...) implementations and returns a gs:// schemed url that identifies the asset for subsequent access, for example gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime

We have custom image attachment composers that use the gs:// urls to obtain an authorization token that's needed to pull the actual image asset out of our CDN for viewing. All of this has been working perfectly up to SDK 5.17.10 but is not working at all in SDK 6.0.3 and we're a bit stumped why.

SDK version

  • 6.0.3

To Reproduce
Steps to reproduce the behavior:

  1. Configure the ChatClient with a custom uploader that returns a url-like string for the asset, e.g. io.getstream.result.Result.Success(UploadedImage(file = "gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime"))
  2. Create a new message with an image and/or file attachment and send it as usual:
val message = composerViewModel.buildNewMessage(message = text, attachments = attachments)
composerViewModel.sendMessage(message = message)
  1. Notice the new message and its attachments appear in the message list with uploading progress updates as expected.
  2. When uploading is finished, the message and its attachments are visible in the channel message list.
  3. Now try explicitly loading the message by its message id, or quit and re-launch the app to reload the message list.
chatManager.chatClient.getMessage(messageId).enqueue {
    messageResult = it // always io.getstream.result.Result.Failure
}
  1. The message and its uploaded attachment(s) are no longer present, do not exist.

Expected behavior
The new message with freshly uploaded attachment should persist in the channel message list. The message should remain loadable via chatClient.getMessage(messageId) and not return HTTP 400 not-found errors "Message with id *** doesn't exist".

One thing that's a bit odd is the error string — when I search the SDK 6.0.3 sources for the error string, the closest I can find is in io.getstream.chat.android.client.attachment.worker.UploadAttachmentsWorker.uploadAttachmentsForMessage(String) but that does not have the "doesn't" contraction visible in the error screen below, so I'm not sure this is actually the source of the error string.

Device:

  • Android Device, Pixel Fold, API 33
  • Android Emulator, Pixel 5, API 33
  • Android Emulator, Pixel 3a, API 26

Screenshots

New message with attachment ready to send New message with attachment sent successfully (or so it appears…)
Screenshot_20230928_124331 Screenshot_20230928_124347
Attempt to load the new message by its message id results in HTTP 400 not-found Quit the app and re-launch confirms the message and its attachment were not stored
Screenshot_20230928_124356 Screenshot_20230928_124530

Debug log output (with comments)

2023-09-28 10:53:00.404 15710-15710 Chat:MessageComposerController       I  (main:2) [sendMessage] message.attachments.size: 1
2023-09-28 10:53:00.404 15710-15710 Chat:MessageComposerController       I  [sendMessage] message.attachments.size: 1
2023-09-28 10:53:00.405 15710-15710 Chat:MessageComposerController       I  (main:2) [clearData]
2023-09-28 10:53:00.405 15710-15710 Chat:MessageComposerController       I  [clearData]
2023-09-28 10:53:00.408 15710-16435 Chat:AttachmentsSender               D  (DefaultDispatcher-worker-34:517) [sendAttachments] Message 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d has 1 pending attachments
2023-09-28 10:53:00.408 15710-16435 Chat:AttachmentsSender               D  [sendAttachments] Message 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d has 1 pending attachments
2023-09-28 10:53:00.410 15710-16398 Chat:QueryChannelsState              D  (DefaultDispatcher-worker-10:488) Sorting channels: 7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f, ec87d114-bdda-495c-942f-96c7930c48b0, 4e7da3d5-545c-4c65-b8e8-d6560079ffeb, 9e9aec3d-bd65-47be-b2f3-cbb0241db65f, ec87d114-bdda-495c-942f-96c7930c48b0, aac97191-502b-4176-b677-e42a4aa5c126
2023-09-28 10:53:00.410 15710-16398 Chat:QueryChannelsState              D  Sorting channels: 7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f, ec87d114-bdda-495c-942f-96c7930c48b0, 4e7da3d5-545c-4c65-b8e8-d6560079ffeb, 9e9aec3d-bd65-47be-b2f3-cbb0241db65f, ec87d114-bdda-495c-942f-96c7930c48b0, aac97191-502b-4176-b677-e42a4aa5c126
2023-09-28 10:53:00.410 15710-16398 Chat:QueryChannelsState              D  (DefaultDispatcher-worker-10:488) Sorting result: 7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f, ec87d114-bdda-495c-942f-96c7930c48b0, 4e7da3d5-545c-4c65-b8e8-d6560079ffeb, 9e9aec3d-bd65-47be-b2f3-cbb0241db65f, ec87d114-bdda-495c-942f-96c7930c48b0, aac97191-502b-4176-b677-e42a4aa5c126
2023-09-28 10:53:00.412 15710-16398 Chat:QueryChannelsState              D  Sorting result: 7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f, ec87d114-bdda-495c-942f-96c7930c48b0, 4e7da3d5-545c-4c65-b8e8-d6560079ffeb, 9e9aec3d-bd65-47be-b2f3-cbb0241db65f, ec87d114-bdda-495c-942f-96c7930c48b0, aac97191-502b-4176-b677-e42a4aa5c126
2023-09-28 10:53:00.416 15710-16435 Chat:SystemUploadWorker              D  (DefaultDispatcher-worker-34:517) [start] #uploader; Enqueueing attachments upload work for 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d

2023-09-28 10:53:00.416 15710-16435 Chat:SystemUploadWorker              D  [start] #uploader; Enqueueing attachments upload work for 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d

***** THIS SEEMS LIKE A ROOT CAUSE, THE ERROR DOES NOT PROPAGATE, OR ELSE THIS IS A RED HERRING

2023-09-28 10:53:00.419 15710-16435 Chat:AttachmentsSender               I  (DefaultDispatcher-worker-34:517) [waitForAttachmentsToBeSent] Could not upload attachments for message 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d
2023-09-28 10:53:00.419 15710-16435 Chat:AttachmentsSender               I  [waitForAttachmentsToBeSent] Could not upload attachments for message 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d


2023-09-28 10:53:00.447 15710-16435 Chat:Client                          D  (DefaultDispatcher-worker-34:517) [markRead] #doOnStart; cid: care-team:7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f
2023-09-28 10:53:00.447 15710-16435 Chat:Client                          D  [markRead] #doOnStart; cid: care-team:7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f
2023-09-28 10:53:00.450 15710-16441 Chat:Http                            I  (OkHttp https://chat.stream-io-api.com/...:523) --> POST https://chat.stream-io-api.com/channels/care-team/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/read?api_key=kpug7q2fd5sq (17-byte body)

***** LOTS OF OUTPUT CROPPED FOR BREVITY

2023-09-28 10:53:00.600 15710-16418 Chat:UploadWorker                    D  (DefaultDispatcher-worker-30:510) [uploadAttachments] #uploader; uploading attachment upload_id_cfdb5889-9aed-4c82-aebb-7c4680e7e947 for message 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d
2023-09-28 10:53:00.600 15710-16418 Chat:UploadWorker                    D  [uploadAttachments] #uploader; uploading attachment upload_id_cfdb5889-9aed-4c82-aebb-7c4680e7e947 for message 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d


2023-09-28 10:53:00.608 15710-16418 Chat:Uploader                        D  (DefaultDispatcher-worker-30:510) [uploadAttachment] #uploader; uploading upload_id_cfdb5889-9aed-4c82-aebb-7c4680e7e947 as file
2023-09-28 10:53:00.608 15710-16418 Chat:Uploader                        D  [uploadAttachment] #uploader; uploading upload_id_cfdb5889-9aed-4c82-aebb-7c4680e7e947 as file

2023-09-28 10:53:00.608 15710-16418 Chat:Uploader                        D  (DefaultDispatcher-worker-30:510) [uploadFile] #uploader; mimeType: video/quicktime, attachmentType: video, file: /data/user/0/com.vida.healthcoach.debug/cache/STREAM_105249348/Screen_Recording_2023-09-28_at_10.51.54_AM.mov, cid: care-team:$7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f, attachment: Attachment(mimeType="video/quicktime", fileSize=937918, type="video", name=Screen Recording 2023-09-28 at 10.51.54 AM.mov, upload="/data/user/0/com.vida.healthcoach.debug/cache/STREAM_105249348/Screen_Recording_2023-09-28_at_10.51.54_AM.mov", uploadState=Idle, extraData={uploadId=upload_id_cfdb5889-9aed-4c82-aebb-7c4680e7e947})
2023-09-28 10:53:00.608 15710-16418 Chat:Uploader                        D  [uploadFile] #uploader; mimeType: video/quicktime, attachmentType: video, file: /data/user/0/com.vida.healthcoach.debug/cache/STREAM_105249348/Screen_Recording_2023-09-28_at_10.51.54_AM.mov, cid: care-team:$7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f, attachment: Attachment(mimeType="video/quicktime", fileSize=937918, type="video", name=Screen Recording 2023-09-28 at 10.51.54 AM.mov, upload="/data/user/0/com.vida.healthcoach.debug/cache/STREAM_105249348/Screen_Recording_2023-09-28_at_10.51.54_AM.mov", uploadState=Idle, extraData={uploadId=upload_id_cfdb5889-9aed-4c82-aebb-7c4680e7e947})

2023-09-28 10:53:01.799 15710-15710 BAR_PostChatAttachment               V  POST/api/v1/attachment/upload = Success: ChatAttachmentResponse(id=gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime)

2023-09-28 10:53:01.805 15710-16422 Chat:Uploader                        V  (DefaultDispatcher-worker-24:504) [uploadFile] #uploader; result: Success(value=UploadedFile(file=gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime, thumbUrl=null))
2023-09-28 10:53:01.805 15710-16422 Chat:Uploader                        D  (DefaultDispatcher-worker-24:504) [onSuccessfulUpload] #uploader; attachment upload_id_cfdb5889-9aed-4c82-aebb-7c4680e7e947 uploaded successfully
2023-09-28 10:53:01.805 15710-16422 Chat:Uploader                        D  [onSuccessfulUpload] #uploader; attachment upload_id_cfdb5889-9aed-4c82-aebb-7c4680e7e947 uploaded successfully

2023-09-28 10:53:01.805 15710-16422 Chat:UploadWorker                    I  (DefaultDispatcher-worker-24:504) [Progress.onSuccess] #uploader; url: gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime
2023-09-28 10:53:01.805 15710-16422 Chat:UploadWorker                    I  [Progress.onSuccess] #uploader; url: gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime

2023-09-28 10:53:01.806 15710-16422 Chat:UploadWorker                    D  (DefaultDispatcher-worker-24:504) [sendAttachments] #uploader; all attachments for message 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d uploaded
2023-09-28 10:53:01.806 15710-16422 Chat:UploadWorker                    D  [sendAttachments] #uploader; all attachments for message 0a3cabe4-aa8f-4685-8c73-f2500a586b37-87b9723b-dcb3-4d14-bfac-c2fcddf8d65d uploaded

2023-09-28 10:53:01.806 15710-16422 Chat:SystemUploadWorker              D  (DefaultDispatcher-worker-24:504) [doWork] #uploader; attachments uploaded successfully
2023-09-28 10:53:01.806 15710-16422 Chat:SystemUploadWorker              D  [doWork] #uploader; attachments uploaded successfully

2023-09-28 10:53:02.025 15710-15914 BAR_PostChatAttachmentUrl            V  executeAsync

***** DESPITE THE SUCCESS MESSAGES, THE FILE DID NOT ACTUALLY UPLOAD, NO STREAM MESSAGE WAS CREATED WITH THE ATTACHMENT, RELOADING THE CHANNEL'S MESSAGE LIST REMOVES THE PREVIOUS SEEMINGLY-SUCCESSFULLY-UPLOADED MESSAGE FROM THE LIST
@SchramboVida SchramboVida added the bug Something isn't working label Sep 28, 2023
@DanielNovak
Copy link
Contributor

Hi, sorry for the late reply - we are looking into it.

@JcMinarro
Copy link
Member

Hello @SchramboVida
Sending messages with attachments consists of two steps:
-. Upload to the CDN all the attachments
-. Send the message to Stream server.

On the logs, we can see the attachment is properly uploaded to your CDN, and a new attachment with the URL gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime is created.
But the logs end when the message should start to be sent.
For some reason, it seems that the "sending message" step is falling.

Could you provide us with the following logs after the attachment is uploaded?

If you could replicate this error in our Sample App would be awesome, because it would help us to debug+fix it.

I have been doing some tests with our Sample App and it seems to be working fine.
Here are the FileUploader is provided to our ChatClient on the build process:

val client = ChatClient.Builder(apiKey, context)
            .loggerHandler(FirebaseLogger)
            .notifications(notificationConfig, notificationHandler)
            .logLevel(logLevel)
            .fileUploader(object : FileUploader {
                override fun sendFile(
                    channelType: String,
                    channelId: String,
                    userId: String,
                    file: File,
                    callback: ProgressCallback,
                ): Result<UploadedFile> {
                    return Result.Success(UploadedFile(file = "gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime"))
                }

                override fun sendFile(
                    channelType: String,
                    channelId: String,
                    userId: String,
                    file: File,
                ): Result<UploadedFile> {
                    return Result.Success(UploadedFile(file = "gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime"))
                }

                override fun sendImage(
                    channelType: String,
                    channelId: String,
                    userId: String,
                    file: File,
                    callback: ProgressCallback,
                ): Result<UploadedImage> {
                    return Result.Success(UploadedImage(file = "gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime"))
                }

                override fun sendImage(
                    channelType: String,
                    channelId: String,
                    userId: String,
                    file: File,
                ): Result<UploadedImage> {
                    return Result.Success(UploadedImage(file = "gs://vida-stream-attachments-test/7098a6a7-c47d-4d4c-b24e-9d6f7c8d8e7f/8e118e6f-48e8-4812-b8c0-74bcbf765ff7.quicktime"))
                }

                override fun deleteFile(
                    channelType: String,
                    channelId: String,
                    userId: String,
                    url: String,
                ): Result<Unit> {
                    return Result.Success(Unit)
                }

                override fun deleteImage(
                    channelType: String,
                    channelId: String,
                    userId: String,
                    url: String,
                ): Result<Unit> {
                    return Result.Success(Unit)
                }
            })
            .withPlugins(offlinePlugin, statePluginFactory)
            .uploadAttachmentsNetworkType(UploadAttachmentsNetworkType.NOT_ROAMING)
            .apply {
                if (BuildConfig.DEBUG) {
                    this.debugRequests(true)
                        .clientDebugger(CustomChatClientDebugger())
                }
            }
            .build()

@SchramboVida
Copy link
Author

Hi @JcMinarro,

I've tried again using a stubbed-out uploader implementation as suggested which just returns success from the FileUploader methods — no luck.

The failure occurs before any of the custom CDN uploader code executes, specifically in io.getstream.chat.android.client.attachment.AttachmentsSender waitForAttachmentsToBeSent(Message, String, String, RepositoryFacade) when appending the new Job:

1  all jobs not yet completed Screenshot 2023-10-04 at 2 22 23 PM

Before enqueueAttachmentUpload(...) the job exists and is active (not completed or cancelled). But by the time it gets to jobsMap[newMessage.id]?.join() all jobs have completed and there is nothing left to wait for.

That's strange because the repositoryFacade.observeAttachmentsForMessage(newMessage.id) collector body does not execute before the job completes, therefore the allAttachmentsUploaded flag remains false which debug-logs [waitForAttachmentsToBeSent] Could not upload attachments for message «message id» and returns Result.Failure(Error.GenericError(...)) from waitForAttachmentsToBeSent(...)

2  all jobs completed Screenshot 2023-10-04 at 2 24 03 PM

Interestingly, the failure result does not seem to propagate back to io.getstream.chat.android.client.attachment.AttachmentsSender and io.getstream.chat.android.client.attachment.worker.UploadAttachmentsAndroidWorker says the attachments were uploaded successfully.

Look for the CustomChatClientDebugger lines in the attached debug log output to see the failure in context with other debug output:

CustomChatClientDebugger•onStart(): message id = 
CustomChatClientDebugger•onInterceptionStart(): message id = 
CustomChatClientDebugger•onInterceptionUpdate(): message id = 0bc7a9c0-f0f6-48dc-927b-3b1b6260dceb-f2b947c9-ede2-459b-b626-3ff27c01510a
CustomChatClientDebugger•onInterceptionStop(): result = Failure(value=GenericError(message=Could not upload attachments, not sending message with id 0bc7a9c0-f0f6-48dc-927b-3b1b6260dceb-f2b947c9-ede2-459b-b626-3ff27c01510a))
CustomChatClientDebugger•onStop(): result = Failure(value=GenericError(message=Could not upload attachments, not sending message with id 0bc7a9c0-f0f6-48dc-927b-3b1b6260dceb-f2b947c9-ede2-459b-b626-3ff27c01510a))

upload image log

Thanks again for your help and please let me know if you have any questions!

Best Regards,
Jim

@JcMinarro
Copy link
Member

Hello @SchramboVida

Thanks for your detailed reply. I will need more time to debug it properly and try to find why it is failing in your side.
In the meantime, could you share with us how ChatClient is being initialized on your side (Don't post any API Key you use to initialize it)?

@kanat
Copy link
Collaborator

kanat commented Oct 12, 2023

Hi @SchramboVida,

One thing concerns me in your logs:

CustomChatClientDebugger•onInterceptionStop(): result = Failure(value=GenericError(message=Could not upload attachments, not sending message with id 0bc7a9c0-f0f6-48dc-927b-3b1b6260dceb-f2b947c9-ede2-459b-b626-3ff27c01510a))
CustomChatClientDebugger•onStop(): result = Failure(value=GenericError(message=Could not upload attachments, not sending message with id 0bc7a9c0-f0f6-48dc-927b-3b1b6260dceb-f2b947c9-ede2-459b-b626-3ff27c01510a))

SendMessageDebugger.onStop is called from ChatClient.sendMessage. And SendMessageDebugger.onInterceptionStop is called from ChatClient.sendAttachments:

attachmentsSender
    .sendAttachments(preparedMessage, channelType, channelId, isRetrying, repositoryFacade)
    .also { result ->
        debugger.onInterceptionStop(result)
    }

If SendMessageDebugger receives those errors, why you're not able to receive them as well 🤔

@kanat
Copy link
Collaborator

kanat commented Oct 12, 2023

Based on the logs it seems like this line does't suspend

jobsMap[newMessage.id]?.join()

Which makes me to believe that jobsMap[newMessage.id] is null

@DanielNovak
Copy link
Contributor

@SchramboVida were you able to solve the problem?

@k0goto
Copy link

k0goto commented Dec 4, 2023

Hello. I have encountered a similar trouble and conducted my own investigation and findings, so I would like to report them. (Please note that English is not my native language, so my expressions might not sound completely natural. 🙇 )

There seem to be two main issues causing this behavior.

Issue 1: waitForAttachmentsToBeSent(...) always fails

In AttachmentsSender.waitForAttachmentsToBeSent(...), UploadAttachmentsAndroidWorker enqueued through enqueueAttachmentUpload(...) runs asynchronously in the background, and its completion is not awaited. Therefore, when executing jobsMap[newMessage.id]?.join(), the uploadState of newMessage.attachments should either be Idle or InProgress.

/**
* Idle state before attachment starts to upload.
*/
public object Idle : UploadState() { override fun toString(): String = "Idle" }
/**
* State representing attachment upload progress.
*/
public data class InProgress(val bytesUploaded: Long, val totalBytes: Long) : UploadState()

Additionally, since jobsMap[newMessage.id] does not consider any UploadState other than Success and Failed, allAttachmentsUploaded will always be false, resulting in waitForAttachmentsToBeSent(...) always returning Result.Failure.

attachments.all { it.uploadState == Attachment.UploadState.Success } -> {
messageToBeSent = repositoryFacade.selectMessage(newMessage.id) ?: newMessage.copy(
attachments = attachments.toMutableList(),
)
allAttachmentsUploaded = true
jobsMap[newMessage.id]?.cancel()
}
attachments.any { it.uploadState is Attachment.UploadState.Failed } -> {
jobsMap[newMessage.id]?.cancel()
}
else -> Unit

Issue 2: Messaging failures are not reflected, making it appear as if they were successful

The default implementation of the MessageComposer component does not include a callback for sendMessage(...) call. (At the time of the reported in SDK 6.0.3, callback were not designed to be passed in the first place.)
As a result, the Result.Failure returned by waitForAttachmentsToBeSent(...) is not handled, and the messaging process terminates without proper feedback.

public fun sendMessage(
message: Message,
callback: Call.Callback<Message> = Call.Callback { /* no-op */ },
): Unit = messageComposerController.sendMessage(message, callback)

Furthermore, even after the messaging process is completed, the UploadAttachmentsAndroidWorker continues to run in the background and updates the message cache after upload attachments. So, though I haven't delved into it in detail, I suspect that the cached messages are giving the illusion of successful message sending until the app is re-launch.

val attachments = uploadAttachments(message)
updateMessages(message.copy(attachments = attachments))

// RepositoryFacade::insertMessage is implemented as upsert, therefore we need to delete the message first
messageRepository.deleteChannelMessage(updatedMessage)
messageRepository.insertMessage(updatedMessage)

My Solution

To address this issue, I forked and customized the behavior of waitForAttachmentsToBeSent(...). Specifically, after executing enqueueAttachmentUpload(...), I ensured that the worker's completion is waited for synchronously, allowing allAttachmentsUploaded to be evaluated correctly.

ref: main...10llip0p:stream-chat-android:patch-attachments-sender

However, this solution is merely a temporary workaround and not considered the definitive solution. I hope this issue will be resolved in a more comprehensive manner.

Thank you.

@SchramboVida
Copy link
Author

@JcMinarro @kanat @DanielNovak My apologies for this very late reply, I fell ill shortly after my initial report, then once recovered was busy implementing other app features. Now that those tasks have been completed, I've had some time to continue debugging and have a much better understanding of the cause as well as a solution for your review and additional feedback.

Problem Summary

At the core of the problem we have AttachmentsSender waitForAttachmentsToBeSent(...) where it calls jobsMap[newMessage.id]?.join():

var allAttachmentsUploaded = false
...
jobsMap = jobsMap + (
    newMessage.id to scope.launch {
        repositoryFacade.observeAttachmentsForMessage(newMessage.id)
            .filterNot(Collection<Attachment>::isEmpty)
            .collect { attachments ->
                ...
            }
    }
    )
enqueueAttachmentUpload(newMessage, channelType, channelId)
jobsMap[newMessage.id]?.join()
return if (allAttachmentsUploaded) { ...

The call to join() is not waiting for the attachments to be sent because repositoryFacade.observeAttachmentsForMessage(newMessage.id) is now an EmptyFlow by default, and we're using our own custom CDN uploader which returns a special signed URL for each uploaded asset (these URLs have scheme gs: to distinguish them from unsigned https: image and file URLs).

Here's our ChatClient configuration for reference:

private val offlinePluginFactory: PluginFactory = StreamStatePluginFactory(
    config = StatePluginConfig(
        backgroundSyncEnabled = true,
        userPresence = true,
    ),
    appContext = appContext,
)

override val chatClient: ChatClient = ChatClient.Builder(globalConfig.streamApiKey, appContext)
    .logLevel(if (isLoggingEnabled) ChatLogLevel.WARN else ChatLogLevel.NOTHING)
    .loggerHandler(chatLogger)
    .okHttpClient(
        serverManager.serverContext
            .createClient()
            .certificatePinner(CertificatePinner.DEFAULT)
            .build()
    )
    .notifications(
        notificationConfig = NotificationConfig(
            pushNotificationsEnabled = false,
            pushDeviceGenerators = listOf(),
            shouldShowNotificationOnPush = { false },
            requestPermissionOnAppLaunch = { false },
        )
    )
    .withRepositoryFactoryProvider(ChatRepositoryFactory.Provider)
    .withPlugins(offlinePluginFactory)
    .uploadAttachmentsNetworkType(UploadAttachmentsNetworkType.CONNECTED)
    .fileUploader(ChatFileUploader())
    .build()

When the user sends a message with one or more attachments, a WorkManager task is created with a unique work name built from "$channelId$messageId" in io.getstream.chat.android.client.attachment.worker.UploadAttachmentsAndroidWorker — in our case this task interacts with our custom PostChatAttachment FileUploader implementation running on an independent worker thread, which blocks until the task completes then returns io.getstream.result.Result.Success or io.getstream.result.Result.Failure accordingly.

While the attachments are uploading, ProgressCallback is invoked repeatedly as expected to update the UX, then once all of the attachments have been uploaded and the WorkManager task has completed, waitForAttachmentsToBeSent(...) returns Success or Failure based on the value of allAttachmentsUploaded which is initialized to false — note that a Stream message with attachments will not be created unless waitForAttachmentsToBeSent(...) returns Success; if it returns Failure, the UX will appear to have successfully uploaded the attachments, but no Stream message will actually be created remotely, no error message will be shown, and upon fresh app launch the user will discover that nothing was actually stored — the message and its attachments were only stored in the device local cache. This is perhaps the most troubling aspect of this design, that a Failure return from this method does not update the UX to reflect the failure(s) or allow the user to try again.

Here is the relevant section of our io.getstream.chat.android.client.uploader.FileUploader implementation for reference:

override fun sendImage(
    channelType: String,
    channelId: String,
    userId: String,
    file: File,
    callback: ProgressCallback,
): ChatResult<UploadedImage> = uploadImage(
    channelId = channelId,
    file = file,
    callback = callback,
)

private fun uploadImage(
    channelId: String,
    file: File,
    callback: ProgressCallback? = null,
): ChatResult<UploadedImage> = uploadAttachment(channelId = channelId, file = file, callback = callback).let { uploadResult ->
    when (uploadResult.isSuccess()) {
        true -> io.getstream.result.Result.Success(
            UploadedImage(file = uploadResult.getOrThrow().id) // file is a signed gs: schemed URL
        )
        else -> io.getstream.result.Result.Failure(
            io.getstream.result.Error.ThrowableError(
                uploadResult.getFailureOrThrow().toUserMessage(), uploadResult.getFailureOrThrow()
            )
        )
    }
}

@WorkerThread
private fun uploadAttachment(
    channelId: String,
    file: File,
    callback: ProgressCallback?,
): NetworkResult<ChatAttachmentResponse> {
    return PostChatAttachment(channelId = channelId, file = file, callback = callback)
        .toSingleResult() // subscribes on Schedulers.io()
        .observeOn(Schedulers.from(ThreadUtilAndroid.executor))
        .blockingGet()
        .also { uploadResult ->
            ChatAttachmentRepository.onChatAttachmentResponse(file, uploadResult)
        }
}

Note that the UploadedImage(file = ...) result does not update the imageUrl or assetUrl properties in the original Attachment — that appears to be the responsibility of the repositoryFacade.observeAttachmentsForMessage(newMessage.id) collector when assembling the messageToBeSent copy of the attachments returned from waitForAttachmentsToBeSent(...).

At this stage of uploading, the attachments have no uploadState and their imageUrl and assetUrl properties are null. This causes io.getstream.chat.android.client.attachment.AttachmentsSender to return Result.Failure when AttachmentsVerifier verifyAttachments(result) is called — it sees the attachments as "corrupted":

val corruptedAttachment = message.attachments.find {
    it.upload != null && it.imageUrl == null && it.assetUrl == null
}

Solution

To make repositoryFacade.observeAttachmentsForMessage(newMessage.id) properly collect the results of the custom uploader and update the attachments' imageUrl and assetUrl properties with their uploadState and gs: URLs, I've created an io.getstream.chat.android.client.persistance.repository.AttachmentRepository implementation in a custom RepositoryFactory.Provider based on the Stream NoOpAttachmentRepository (and related no-op repositories):

object ChatAttachmentRepository : AttachmentRepository {

    private val uploadScope: CoroutineScope = CoroutineScope(Dispatchers.IO + Job())
    private val uploadMap: ConcurrentHashMap<String, String> = ConcurrentHashMap() // uploaded attachment file path, gs: asset id (signed image url)

    fun onChatAttachmentResponse(upload: File, result: NetworkResult<ChatAttachmentResponse>) {
        when {
            result.isSuccess() -> uploadMap[upload.path] = result.getOrThrow().id
            result.isFail() -> uploadMap.remove(upload.path)
        }
    }

    @OptIn(InternalStreamChatApi::class)
    override fun observeAttachmentsForMessage(messageId: String): Flow<List<Attachment>> =
        MutableSharedFlow<List<Attachment>>().also { emitter ->
            uploadScope.launch {
                ChatClient.instance().getMessageUsingCache(messageId).await().getOrNull()?.also { message ->
                    val channelId = message.cid.cidToTypeAndId().second
                    WorkManager
                        .getInstance(VidaApplication.context)
                        .getWorkInfosForUniqueWorkLiveData("$channelId$messageId")
                        .asFlow()
                        .collectLatest { work ->
                            when (work.firstOrNull()?.state) {
                                WorkInfo.State.SUCCEEDED -> emitter.emit(
                                    message.attachments.map { attachment ->
                                        val signedUrl = attachment.upload?.let { uploadMap.remove(it.path) }
                                        when (attachment.type) {
                                            AttachmentType.IMAGE -> attachment.copy(uploadState = Attachment.UploadState.Success, imageUrl = signedUrl)
                                            AttachmentType.VIDEO,
                                            AttachmentType.FILE,
                                            AttachmentType.AUDIO,
                                            AttachmentType.AUDIO_RECORDING -> attachment.copy(uploadState = Attachment.UploadState.Success, assetUrl = signedUrl)
                                            else -> attachment.copy(uploadState = Attachment.UploadState.Failed(ChatGenericError("unsupported attachment type")))
                                        }
                                    }
                                )
                                WorkInfo.State.FAILED, WorkInfo.State.CANCELLED -> message.attachments.forEach { attachment ->
                                    attachment.upload?.let { uploadMap.remove(it.path) }
                                }
                                else -> Unit
                            }
                        }
                }
            }
        }.asSharedFlow()

    override suspend fun clear() = uploadScope.coroutineContext.cancelChildren()
}

object ChatChannelConfigRepository : ChannelConfigRepository {
    override suspend fun cacheChannelConfigs() = Unit
    override fun selectChannelConfig(channelType: String): ChannelConfig? = null
    override suspend fun insertChannelConfigs(configs: Collection<ChannelConfig>) = Unit
    override suspend fun insertChannelConfig(config: ChannelConfig) = Unit
    override suspend fun clear() = Unit
}

object ChatChannelRepository : ChannelRepository {
    override suspend fun insertChannel(channel: Channel) = Unit
    override suspend fun insertChannels(channels: Collection<Channel>) = Unit
    override suspend fun deleteChannel(cid: String) = Unit
    override suspend fun selectAllCids(): List<String> = emptyList()
    override suspend fun selectChannels(channelCIDs: List<String>): List<Channel> = emptyList()
    override suspend fun selectChannel(cid: String): Channel? = null
    override suspend fun selectChannelCidsBySyncNeeded(limit: Int): List<String> = emptyList()
    override suspend fun selectChannelsSyncNeeded(limit: Int): List<Channel> = emptyList()
    override suspend fun setChannelDeletedAt(cid: String, deletedAt: Date) = Unit
    override suspend fun setHiddenForChannel(cid: String, hidden: Boolean, hideMessagesBefore: Date) = Unit
    override suspend fun setHiddenForChannel(cid: String, hidden: Boolean) = Unit
    override suspend fun selectMembersForChannel(cid: String): List<Member> = emptyList()
    override suspend fun updateMembersForChannel(cid: String, members: List<Member>) = Unit
    override suspend fun updateLastMessageForChannel(cid: String, lastMessage: Message) = Unit
    override suspend fun clear() = Unit
}

object ChatMessageRepository : MessageRepository {
    override suspend fun selectMessages(messageIds: List<String>): List<Message> = emptyList()
    override suspend fun selectMessage(messageId: String): Message? = null
    override suspend fun insertMessages(messages: List<Message>) = Unit
    override suspend fun insertMessage(message: Message) = Unit
    override suspend fun deleteChannelMessagesBefore(cid: String, hideMessagesBefore: Date) = Unit
    override suspend fun deleteChannelMessage(message: Message) = Unit
    override suspend fun selectMessageIdsBySyncState(syncStatus: SyncStatus): List<String> = emptyList()
    override suspend fun selectMessageBySyncState(syncStatus: SyncStatus): List<Message> = emptyList()
    override suspend fun clear() = Unit
    override suspend fun selectMessagesForChannel(cid: String, pagination: AnyChannelPaginationRequest?): List<Message> = emptyList()
    override suspend fun selectMessagesForThread(messageId: String, limit: Int): List<Message> = emptyList()
}

object ChatQueryChannelsRepository : QueryChannelsRepository {
    override suspend fun insertQueryChannels(queryChannelsSpec: QueryChannelsSpec) = Unit
    override suspend fun selectBy(filter: FilterObject, querySort: QuerySorter<Channel>): QueryChannelsSpec? = null
    override suspend fun clear() = Unit
}

object ChatReactionRepository : ReactionRepository {
    override suspend fun insertReaction(reaction: Reaction) = Unit
    override suspend fun selectReactionById(id: Int): Reaction? = null
    override suspend fun selectReactionsByIds(ids: List<Int>): List<Reaction> = emptyList()
    override suspend fun selectReactionIdsBySyncStatus(syncStatus: SyncStatus): List<Int> = emptyList()
    override suspend fun selectReactionsBySyncStatus(syncStatus: SyncStatus): List<Reaction> = emptyList()
    override suspend fun deleteReaction(reaction: Reaction) = Unit
    override suspend fun clear() = Unit
    override suspend fun updateReactionsForMessageByDeletedDate(userId: String, messageId: String, deletedAt: Date) = Unit
    override suspend fun selectUserReactionToMessage(reactionType: String, messageId: String, userId: String): Reaction? = null
    override suspend fun selectUserReactionsToMessage(messageId: String, userId: String): List<Reaction> = emptyList()
}

object ChatSyncStateRepository : SyncStateRepository {
    override suspend fun insertSyncState(syncState: SyncState) = Unit
    override suspend fun selectSyncState(userId: String): SyncState? = null
    override suspend fun clear() = Unit
}

object ChatUserRepository : UserRepository {
    override suspend fun insertUsers(users: Collection<User>) = Unit
    override suspend fun insertUser(user: User) = Unit
    override suspend fun insertCurrentUser(user: User) = Unit
    override suspend fun selectUser(userId: String): User? = null
    override suspend fun selectUsers(ids: List<String>): List<User> = emptyList()
    override fun observeLatestUsers(): StateFlow<Map<String, User>> = MutableStateFlow(emptyMap())
    override suspend fun clear() = Unit
}

object ChatRepositoryFactory : RepositoryFactory {
    override fun createUserRepository(): UserRepository = ChatUserRepository
    override fun createChannelConfigRepository(): ChannelConfigRepository = ChatChannelConfigRepository
    override fun createQueryChannelsRepository(): QueryChannelsRepository = ChatQueryChannelsRepository
    override fun createSyncStateRepository(): SyncStateRepository = ChatSyncStateRepository
    override fun createAttachmentRepository(): AttachmentRepository = ChatAttachmentRepository
    override fun createReactionRepository(getUser: suspend (userId: String) -> User): ReactionRepository = ChatReactionRepository
    override fun createMessageRepository(getUser: suspend (userId: String) -> User): MessageRepository = ChatMessageRepository
    override fun createChannelRepository(getUser: suspend (userId: String) -> User, getMessage: suspend (messageId: String) -> Message?): ChannelRepository = ChatChannelRepository

    object Provider : RepositoryFactory.Provider {
        override fun createRepositoryFactory(user: User): RepositoryFactory = ChatRepositoryFactory
    }
}

My Concerns

The ChatAttachmentRepository observeAttachmentsForMessage(newMessage.id) implementation relies on the internal Stream API ChatClient.instance().getMessageUsingCache(messageId) to retrieve the cached message and its attachments by message id, and on the WorkManager unique work name built from "$channelId$messageId" to locate and monitor the active upload task.

Can we trust that this internal Stream API and related dependencies on the WorkManager unique work name will remain stable as the v6 SDK evolves, and do you have any concerns with this approach or suggestions for improvements?

Thanks!

@JcMinarro
Copy link
Member

JcMinarro commented Jan 3, 2024

Note that the UploadedImage(file = ...) result does not update the imageUrl or assetUrl properties in the original Attachment — that appears to be the responsibility of the repositoryFacade.observeAttachmentsForMessage(newMessage.id) collector when assembling the messageToBeSent copy of the attachments returned from waitForAttachmentsToBeSent(...).

At this stage of uploading, the attachments have no uploadState and their imageUrl and assetUrl properties are null. This causes io.getstream.chat.android.client.attachment.AttachmentsSender to return Result.Failure when AttachmentsVerifier verifyAttachments(result) is called — it sees the attachments as "corrupted":

Hello @SchramboVida
The uploadState, imageUrl and assetUrl is updated into the AttachmentUploader that is the internal class that AttachmentSender uses to upload the attachment.

I just adapted our Sample App to use a custom FileUploader that returns a gs://... URL similar to what your implementation does, and the behavior looks right, sending the message with a custom attachment. The attachment is not properly displayed on the MessageListView because the gs://... URL is a fake one and it doesn't know how to render it.
I just added a new log line to show the URLs and uploadState after the attachment is uploaded.

Could you review this implementation and compare it with yours?
If you can reproduce the error, would be great if you could adapt our Sample App, then we can debug it on our side.

@k0goto
Copy link

k0goto commented Jan 4, 2024

#4973 (comment)

This comment is very helpful for me. The problem summary is likely to be very similar to what I reported, but the solution is more appropriate and smart than my one.

repositoryFacade.observeAttachmentsForMessage(newMessage.id) is now an EmptyFlow by default,

I believe this is the essence of the problem. Without resolving it, even when implemented according to the documentation, it is not possible to upload files (even when using the default Stream CDN as I did).
Therefore, it is necessary to fix the default implementation of NoOpAttachmentRepository.observeAttachmentsForMessage(newMessage.id) to return the appropriate Flow, or describe the need to implement a custom AttachmentRepository (like as @SchramboVida 's solution) in the File Uploads document.

override fun observeAttachmentsForMessage(messageId: String): Flow<List<Attachment>> = emptyFlow()

@JcMinarro
Copy link
Member

Ok.
After reviewing again the code-snippets you shared, I can see you are not using the OfflinePlugin.
On v6 we splitted Offline Support into 2 plugins, StatePlugin that allows subscribe to the state of the Chat Data, and the OfflinePlugin that persists data locally on the device to allow to use Chat while the device doesn't have Internet access.

If the OfflinePlugin is not added during the configuration process, a NoOpRepositoryFacade is causing the attachments not to be uploaded properly.

Aren't you adding OfflinePlugin for any reason? Or it was because you didn't know about it and wasn't added during your migration to v6?

While we fix the issue when the OfflinePlugin is not present, you can add OfflinePlugin to your initialization. It will initialize the RepositoryFacade with a custom implementation we provide using Room as Data Base Enginee, and the issue should be resolved.

val offlinePluginFactory = StreamOfflinePluginFactory(appContext = context)

ChatClient.Builder(apiKey, context)
    [...]
    .withPlugins(offlinePluginFactory)
    .build()

@SchramboVida
Copy link
Author

Hi @JcMinarro, thanks for confirming what I suspected, that StreamOfflinePluginFactory is responsible for handling this aspect of the upload. I was going to mention that you can reproduce the problem in the sample app by removing this plugin from the client builder — then uploads will appear to succeed, but the message will disappear after restarting the app.

The documentation states this plugin is optional, and since our app does not work offline we chose to not include it. However, including it resolves the problem without my ChatAttachmentRepository workaround, so we'll go with that. 👍

Thanks again for the help!
— Jim

@JcMinarro
Copy link
Member

Hi @JcMinarro, thanks for confirming what I suspected, that StreamOfflinePluginFactory is responsible for handling this aspect of the upload. I was going to mention that you can reproduce the problem in the sample app by removing this plugin from the client builder — then uploads will appear to succeed, but the message will disappear after restarting the app.

The documentation states this plugin is optional, and since our app does not work offline we chose to not include it. However, including it resolves the problem without my ChatAttachmentRepository workaround, so we'll go with that. 👍

Thanks again for the help!
— Jim

The idea is StatePlugin works independently of the OfflinePlugin. They both were together in a singular artifact and we split them. We will upgrade the attachment upload process to be sure works without the offline plugin

@k0goto
Copy link

k0goto commented Jan 11, 2024

I found that my problem was caused for the same reason. And I was able to solve it by using the offline plugin too.
Thanks for this issue and everyone!

@JcMinarro
Copy link
Member

BTW.
We already fix the issue to be able to use our SDK without the OfflinePlugin.
I'm closing this Issue now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants