Media upload cancellation (#1058)

* Initial implementation of media upload cancellation

* Add tests

* Add changelog

* Update screenshots

* Add documentation

* Fix lint issues

* Fix review comments

---------

Co-authored-by: ElementBot <benoitm+elementbot@element.io>
This commit is contained in:
Jorge Martin Espinosa 2023-08-17 11:02:03 +02:00 committed by GitHub
parent 4a630f141d
commit 983b83a56f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 414 additions and 65 deletions

View file

@ -0,0 +1,28 @@
/*
* Copyright (c) 2023 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.api.media
/**
* This is an abstraction over the Rust SDK's `SendAttachmentJoinHandle` which allows us to either [await] the upload process or [cancel] it.
*/
interface MediaUploadHandler {
/** Await the upload process to finish. */
suspend fun await(): Result<Unit>
/** Cancel the upload process. */
fun cancel()
}

View file

@ -25,6 +25,7 @@ import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.media.AudioInfo
import io.element.android.libraries.matrix.api.media.FileInfo
import io.element.android.libraries.matrix.api.media.ImageInfo
import io.element.android.libraries.matrix.api.media.MediaUploadHandler
import io.element.android.libraries.matrix.api.media.VideoInfo
import io.element.android.libraries.matrix.api.room.location.AssetType
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
@ -81,13 +82,13 @@ interface MatrixRoom : Closeable {
suspend fun redactEvent(eventId: EventId, reason: String? = null): Result<Unit>
suspend fun sendImage(file: File, thumbnailFile: File, imageInfo: ImageInfo, progressCallback: ProgressCallback?): Result<Unit>
suspend fun sendImage(file: File, thumbnailFile: File, imageInfo: ImageInfo, progressCallback: ProgressCallback?): Result<MediaUploadHandler>
suspend fun sendVideo(file: File, thumbnailFile: File, videoInfo: VideoInfo, progressCallback: ProgressCallback?): Result<Unit>
suspend fun sendVideo(file: File, thumbnailFile: File, videoInfo: VideoInfo, progressCallback: ProgressCallback?): Result<MediaUploadHandler>
suspend fun sendAudio(file: File, audioInfo: AudioInfo, progressCallback: ProgressCallback?): Result<Unit>
suspend fun sendAudio(file: File, audioInfo: AudioInfo, progressCallback: ProgressCallback?): Result<MediaUploadHandler>
suspend fun sendFile(file: File, fileInfo: FileInfo, progressCallback: ProgressCallback?): Result<Unit>
suspend fun sendFile(file: File, fileInfo: FileInfo, progressCallback: ProgressCallback?): Result<MediaUploadHandler>
suspend fun toggleReaction(emoji: String, eventId: EventId): Result<Unit>

View file

@ -0,0 +1,42 @@
/*
* Copyright (c) 2023 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.impl.media
import io.element.android.libraries.androidutils.file.safeDelete
import io.element.android.libraries.matrix.api.media.MediaUploadHandler
import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle
import java.io.File
class MediaUploadHandlerImpl(
private val filesToUpload: List<File>,
private val sendAttachmentJoinHandle: SendAttachmentJoinHandle,
) : MediaUploadHandler {
override suspend fun await(): Result<Unit> =
runCatching {
sendAttachmentJoinHandle.join()
}
.also { cleanUpFiles() }
override fun cancel() {
sendAttachmentJoinHandle.cancel()
cleanUpFiles()
}
private fun cleanUpFiles() {
filesToUpload.forEach { file -> file.safeDelete() }
}
}

View file

@ -28,6 +28,7 @@ import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.media.AudioInfo
import io.element.android.libraries.matrix.api.media.FileInfo
import io.element.android.libraries.matrix.api.media.ImageInfo
import io.element.android.libraries.matrix.api.media.MediaUploadHandler
import io.element.android.libraries.matrix.api.media.VideoInfo
import io.element.android.libraries.matrix.api.room.MatrixRoom
import io.element.android.libraries.matrix.api.room.MatrixRoomMembersState
@ -38,6 +39,7 @@ import io.element.android.libraries.matrix.api.room.roomMembers
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
import io.element.android.libraries.matrix.api.timeline.item.event.EventType
import io.element.android.libraries.matrix.impl.core.toProgressWatcher
import io.element.android.libraries.matrix.impl.media.MediaUploadHandlerImpl
import io.element.android.libraries.matrix.impl.media.map
import io.element.android.libraries.matrix.impl.room.location.toInner
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
@ -268,26 +270,26 @@ class RustMatrixRoom(
}
}
override suspend fun sendImage(file: File, thumbnailFile: File, imageInfo: ImageInfo, progressCallback: ProgressCallback?): Result<Unit> {
return sendAttachment {
override suspend fun sendImage(file: File, thumbnailFile: File, imageInfo: ImageInfo, progressCallback: ProgressCallback?): Result<MediaUploadHandler> {
return sendAttachment(listOf(file, thumbnailFile)) {
innerRoom.sendImage(file.path, thumbnailFile.path, imageInfo.map(), progressCallback?.toProgressWatcher())
}
}
override suspend fun sendVideo(file: File, thumbnailFile: File, videoInfo: VideoInfo, progressCallback: ProgressCallback?): Result<Unit> {
return sendAttachment {
override suspend fun sendVideo(file: File, thumbnailFile: File, videoInfo: VideoInfo, progressCallback: ProgressCallback?): Result<MediaUploadHandler> {
return sendAttachment(listOf(file, thumbnailFile)) {
innerRoom.sendVideo(file.path, thumbnailFile.path, videoInfo.map(), progressCallback?.toProgressWatcher())
}
}
override suspend fun sendAudio(file: File, audioInfo: AudioInfo, progressCallback: ProgressCallback?): Result<Unit> {
return sendAttachment {
override suspend fun sendAudio(file: File, audioInfo: AudioInfo, progressCallback: ProgressCallback?): Result<MediaUploadHandler> {
return sendAttachment(listOf(file)) {
innerRoom.sendAudio(file.path, audioInfo.map(), progressCallback?.toProgressWatcher())
}
}
override suspend fun sendFile(file: File, fileInfo: FileInfo, progressCallback: ProgressCallback?): Result<Unit> {
return sendAttachment {
override suspend fun sendFile(file: File, fileInfo: FileInfo, progressCallback: ProgressCallback?): Result<MediaUploadHandler> {
return sendAttachment(listOf(file)) {
innerRoom.sendFile(file.path, fileInfo.map(), progressCallback?.toProgressWatcher())
}
}
@ -371,14 +373,9 @@ class RustMatrixRoom(
}
}
//TODO handle cancellation, need refactoring of how we are catching errors
private suspend fun sendAttachment(handle: () -> SendAttachmentJoinHandle): Result<Unit> {
private suspend fun sendAttachment(files: List<File>, handle: () -> SendAttachmentJoinHandle): Result<MediaUploadHandler> {
return runCatching {
handle().use {
it.join()
}
MediaUploadHandlerImpl(files, handle())
}
}
}

View file

@ -0,0 +1,31 @@
/*
* Copyright (c) 2023 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.matrix.test.media
import io.element.android.libraries.matrix.api.media.MediaUploadHandler
import io.element.android.tests.testutils.simulateLongTask
import kotlin.coroutines.cancellation.CancellationException
class FakeMediaUploadHandler(
private var result: Result<Unit> = Result.success(Unit),
) : MediaUploadHandler {
override suspend fun await(): Result<Unit> = simulateLongTask { result }
override fun cancel() {
result = Result.failure(CancellationException())
}
}

View file

@ -25,6 +25,7 @@ import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.media.AudioInfo
import io.element.android.libraries.matrix.api.media.FileInfo
import io.element.android.libraries.matrix.api.media.ImageInfo
import io.element.android.libraries.matrix.api.media.MediaUploadHandler
import io.element.android.libraries.matrix.api.media.VideoInfo
import io.element.android.libraries.matrix.api.room.MatrixRoom
import io.element.android.libraries.matrix.api.room.MatrixRoomMembersState
@ -34,6 +35,7 @@ import io.element.android.libraries.matrix.api.room.location.AssetType
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
import io.element.android.libraries.matrix.test.A_ROOM_ID
import io.element.android.libraries.matrix.test.A_SESSION_ID
import io.element.android.libraries.matrix.test.media.FakeMediaUploadHandler
import io.element.android.libraries.matrix.test.timeline.FakeMatrixTimeline
import io.element.android.tests.testutils.simulateLongTask
import kotlinx.coroutines.delay
@ -70,7 +72,7 @@ class FakeMatrixRoom(
private var canRedactResult = Result.success(canRedact)
private val canSendStateResults = mutableMapOf<StateEventType, Result<Boolean>>()
private val canSendEventResults = mutableMapOf<MessageEventType, Result<Boolean>>()
private var sendMediaResult = Result.success(Unit)
private var sendMediaResult = Result.success(FakeMediaUploadHandler())
private var setNameResult = Result.success(Unit)
private var setTopicResult = Result.success(Unit)
private var updateAvatarResult = Result.success(Unit)
@ -226,21 +228,34 @@ class FakeMatrixRoom(
thumbnailFile: File,
imageInfo: ImageInfo,
progressCallback: ProgressCallback?
): Result<Unit> = fakeSendMedia(progressCallback)
): Result<MediaUploadHandler> = fakeSendMedia(progressCallback)
override suspend fun sendVideo(file: File, thumbnailFile: File, videoInfo: VideoInfo, progressCallback: ProgressCallback?): Result<Unit> = fakeSendMedia(
override suspend fun sendVideo(
file: File,
thumbnailFile: File,
videoInfo: VideoInfo,
progressCallback: ProgressCallback?
): Result<MediaUploadHandler> = fakeSendMedia(
progressCallback
)
override suspend fun sendAudio(file: File, audioInfo: AudioInfo, progressCallback: ProgressCallback?): Result<Unit> = fakeSendMedia(progressCallback)
override suspend fun sendAudio(
file: File,
audioInfo: AudioInfo,
progressCallback: ProgressCallback?
): Result<MediaUploadHandler> = fakeSendMedia(progressCallback)
override suspend fun sendFile(file: File, fileInfo: FileInfo, progressCallback: ProgressCallback?): Result<Unit> = fakeSendMedia(progressCallback)
override suspend fun sendFile(
file: File,
fileInfo: FileInfo,
progressCallback: ProgressCallback?
): Result<MediaUploadHandler> = fakeSendMedia(progressCallback)
override suspend fun forwardEvent(eventId: EventId, roomIds: List<RoomId>): Result<Unit> = simulateLongTask {
forwardEventResult
}
private suspend fun fakeSendMedia(progressCallback: ProgressCallback?): Result<Unit> = simulateLongTask {
private suspend fun fakeSendMedia(progressCallback: ProgressCallback?): Result<MediaUploadHandler> = simulateLongTask {
sendMediaResult.onSuccess {
progressCallbackValues.forEach { (current, total) ->
progressCallback?.onProgress(current, total)
@ -338,7 +353,7 @@ class FakeMatrixRoom(
unignoreResult = result
}
fun givenSendMediaResult(result: Result<Unit>) {
fun givenSendMediaResult(result: Result<FakeMediaUploadHandler>) {
sendMediaResult = result
}

View file

@ -38,5 +38,12 @@ android {
api(projects.libraries.matrix.api)
implementation(libs.inject)
implementation(libs.coroutines.core)
testImplementation(projects.libraries.matrix.test)
testImplementation(projects.libraries.mediaupload.test)
testImplementation(libs.test.junit)
testImplementation(libs.test.truth)
testImplementation(libs.coroutines.test)
testImplementation(libs.test.robolectric)
}
}

View file

@ -17,9 +17,13 @@
package io.element.android.libraries.mediaupload.api
import android.net.Uri
import io.element.android.libraries.core.extensions.flatMap
import io.element.android.libraries.core.extensions.flatMapCatching
import io.element.android.libraries.matrix.api.core.ProgressCallback
import io.element.android.libraries.matrix.api.media.MediaUploadHandler
import io.element.android.libraries.matrix.api.room.MatrixRoom
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Inject
class MediaSender @Inject constructor(
@ -27,6 +31,9 @@ class MediaSender @Inject constructor(
private val room: MatrixRoom,
) {
private val ongoingUploadJobs = ConcurrentHashMap<Job.Key, MediaUploadHandler>()
val hasOngoingMediaUploads get() = ongoingUploadJobs.isNotEmpty()
suspend fun sendMedia(
uri: Uri,
mimeType: String,
@ -40,16 +47,25 @@ class MediaSender @Inject constructor(
deleteOriginal = true,
compressIfPossible = compressIfPossible
)
.flatMap { info ->
.flatMapCatching { info ->
room.sendMedia(info, progressCallback)
}
.onFailure { error ->
val job = ongoingUploadJobs.remove(Job)
if (error !is CancellationException) {
job?.cancel()
}
}
.onSuccess {
ongoingUploadJobs.remove(Job)
}
}
private suspend fun MatrixRoom.sendMedia(
uploadInfo: MediaUploadInfo,
progressCallback: ProgressCallback?
progressCallback: ProgressCallback?,
): Result<Unit> {
return when (uploadInfo) {
val handler = when (uploadInfo) {
is MediaUploadInfo.Image -> {
sendImage(
file = uploadInfo.file,
@ -83,5 +99,11 @@ class MediaSender @Inject constructor(
)
}
}
return handler
.flatMapCatching { uploadHandler ->
ongoingUploadJobs[Job] = uploadHandler
uploadHandler.await()
}
}
}

View file

@ -0,0 +1,116 @@
/*
* Copyright (c) 2023 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.libraries.mediaupload.api
import android.net.Uri
import com.google.common.truth.Truth.assertThat
import io.element.android.libraries.matrix.api.room.MatrixRoom
import io.element.android.libraries.matrix.test.room.FakeMatrixRoom
import io.element.android.libraries.mediaupload.test.FakeMediaPreProcessor
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runTest
import org.junit.Test
import org.junit.runner.RunWith
import org.robolectric.RobolectricTestRunner
@RunWith(RobolectricTestRunner::class)
class MediaSenderTests {
@Test
fun `given an attachment when sending it the preprocessor always runs`() = runTest {
val preProcessor = FakeMediaPreProcessor()
val sender = aMediaSender(preProcessor)
val uri = Uri.parse("content://image.jpg")
sender.sendMedia(uri = uri, mimeType = "image/jpeg", compressIfPossible = true)
assertThat(preProcessor.processCallCount).isEqualTo(1)
}
@Test
fun `given an attachment when sending it the MatrixRoom will call sendMedia`() = runTest {
val room = FakeMatrixRoom()
val sender = aMediaSender(room = room)
val uri = Uri.parse("content://image.jpg")
sender.sendMedia(uri = uri, mimeType = "image/jpeg", compressIfPossible = true)
assertThat(room.sendMediaCount).isEqualTo(1)
}
@Test
fun `given a failure in the preprocessor when sending the whole process fails`() = runTest {
val preProcessor = FakeMediaPreProcessor().apply {
givenResult(Result.failure(Exception()))
}
val sender = aMediaSender(preProcessor)
val uri = Uri.parse("content://image.jpg")
val result = sender.sendMedia(uri = uri, mimeType = "image/jpeg", compressIfPossible = true)
assertThat(result.exceptionOrNull()).isNotNull()
}
@Test
fun `given a failure in the media upload when sending the whole process fails`() = runTest {
val room = FakeMatrixRoom().apply {
givenSendMediaResult(Result.failure(Exception()))
}
val sender = aMediaSender(room = room)
val uri = Uri.parse("content://image.jpg")
val result = sender.sendMedia(uri = uri, mimeType = "image/jpeg", compressIfPossible = true)
assertThat(result.exceptionOrNull()).isNotNull()
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `given a cancellation in the media upload when sending the job is cancelled`() = runTest(StandardTestDispatcher()) {
val room = FakeMatrixRoom()
val sender = aMediaSender(room = room)
val sendJob = launch {
val uri = Uri.parse("content://image.jpg")
sender.sendMedia(uri = uri, mimeType = "image/jpeg", compressIfPossible = true)
}
// Wait until several internal tasks run and the file is being uploaded
advanceTimeBy(3L)
// Assert the file is being uploaded
assertThat(sender.hasOngoingMediaUploads).isTrue()
// Cancel the coroutine
sendJob.cancel()
// Wait for the coroutine cleanup to happen
advanceTimeBy(1L)
// Assert the file is not being uploaded anymore
assertThat(sender.hasOngoingMediaUploads).isFalse()
}
private fun aMediaSender(
preProcessor: MediaPreProcessor = FakeMediaPreProcessor(),
room: MatrixRoom = FakeMatrixRoom(),
) = MediaSender(
preProcessor,
room,
)
}

View file

@ -25,6 +25,9 @@ import java.io.File
class FakeMediaPreProcessor : MediaPreProcessor {
var processCallCount = 0
private set
private var result: Result<MediaUploadInfo> = Result.success(
MediaUploadInfo.AnyFile(
File("test"),
@ -43,6 +46,7 @@ class FakeMediaPreProcessor : MediaPreProcessor {
deleteOriginal: Boolean,
compressIfPossible: Boolean
): Result<MediaUploadInfo> = simulateLongTask {
processCallCount++
result
}