Merge branch 'develop' into valere/call/fix_join_button_on_several_items
This commit is contained in:
commit
af47e2b405
376 changed files with 5383 additions and 2535 deletions
|
|
@ -44,6 +44,8 @@ import io.element.android.libraries.matrix.impl.mapper.map
|
|||
import io.element.android.libraries.matrix.impl.room.history.map
|
||||
import io.element.android.libraries.matrix.impl.room.join.map
|
||||
import io.element.android.libraries.matrix.impl.room.knock.RustKnockRequest
|
||||
import io.element.android.libraries.matrix.impl.room.location.liveLocationSharesFlow
|
||||
import io.element.android.libraries.matrix.impl.room.location.timedByExpiry
|
||||
import io.element.android.libraries.matrix.impl.room.member.RoomMemberListFetcher
|
||||
import io.element.android.libraries.matrix.impl.room.threads.RustThreadsListService
|
||||
import io.element.android.libraries.matrix.impl.roomdirectory.map
|
||||
|
|
@ -511,7 +513,7 @@ class JoinedRustRoom(
|
|||
}
|
||||
|
||||
override fun subscribeToLiveLocationShares(): Flow<List<LiveLocationShare>> {
|
||||
TODO("Not implemented yet")
|
||||
return innerRoom.liveLocationSharesFlow().timedByExpiry(systemClock::epochMillis)
|
||||
}
|
||||
|
||||
override suspend fun startLiveLocationShare(durationMillis: Long): Result<Unit> = withContext(roomDispatcher) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright (c) 2026 Element Creations Ltd.
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
|
||||
* Please see LICENSE files in the repository root for full details.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.room.location
|
||||
|
||||
import io.element.android.libraries.matrix.api.core.UserId
|
||||
import io.element.android.libraries.matrix.api.room.location.LastLocation
|
||||
import io.element.android.libraries.matrix.api.room.location.LiveLocationShare
|
||||
import io.element.android.libraries.matrix.impl.util.cancelAndDestroy
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.buffer
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import org.matrix.rustcomponents.sdk.LiveLocationShareListener
|
||||
import org.matrix.rustcomponents.sdk.LiveLocationShareUpdate
|
||||
import org.matrix.rustcomponents.sdk.RoomInterface
|
||||
import org.matrix.rustcomponents.sdk.LiveLocationShare as RustLiveLocationShare
|
||||
|
||||
fun RoomInterface.liveLocationSharesFlow(): Flow<List<LiveLocationShare>> {
|
||||
fun MutableList<LiveLocationShare>.applyUpdate(update: LiveLocationShareUpdate) {
|
||||
when (update) {
|
||||
is LiveLocationShareUpdate.Append -> addAll(update.values.map { it.into() })
|
||||
is LiveLocationShareUpdate.Clear -> clear()
|
||||
is LiveLocationShareUpdate.Insert -> add(update.index.toInt(), update.value.into())
|
||||
is LiveLocationShareUpdate.PopBack -> if (isNotEmpty()) removeAt(lastIndex)
|
||||
is LiveLocationShareUpdate.PopFront -> if (isNotEmpty()) removeAt(0)
|
||||
is LiveLocationShareUpdate.PushBack -> add(update.value.into())
|
||||
is LiveLocationShareUpdate.PushFront -> add(0, update.value.into())
|
||||
is LiveLocationShareUpdate.Remove -> removeAt(update.index.toInt())
|
||||
is LiveLocationShareUpdate.Reset -> {
|
||||
clear()
|
||||
addAll(update.values.map { it.into() })
|
||||
}
|
||||
is LiveLocationShareUpdate.Set -> set(update.index.toInt(), update.value.into())
|
||||
is LiveLocationShareUpdate.Truncate -> subList(update.length.toInt(), size).clear()
|
||||
}
|
||||
}
|
||||
return callbackFlow {
|
||||
val liveLocationShares = liveLocationShares()
|
||||
val shares: MutableList<LiveLocationShare> = ArrayList()
|
||||
val taskHandle = liveLocationShares.subscribe(object : LiveLocationShareListener {
|
||||
override fun onUpdate(updates: List<LiveLocationShareUpdate>) {
|
||||
for (update in updates) {
|
||||
shares.applyUpdate(update)
|
||||
}
|
||||
trySend(shares)
|
||||
}
|
||||
})
|
||||
awaitClose {
|
||||
taskHandle.cancelAndDestroy()
|
||||
liveLocationShares.destroy()
|
||||
}
|
||||
}.buffer(Channel.UNLIMITED)
|
||||
}
|
||||
|
||||
private fun RustLiveLocationShare.into(): LiveLocationShare {
|
||||
return LiveLocationShare(
|
||||
userId = UserId(userId),
|
||||
lastLocation = lastLocation?.let {
|
||||
LastLocation(
|
||||
geoUri = it.location.geoUri,
|
||||
timestamp = it.ts.toLong(),
|
||||
assetType = it.location.asset.into(),
|
||||
)
|
||||
},
|
||||
startTimestamp = startTs.toLong(),
|
||||
endTimestamp = (startTs + timeout).toLong()
|
||||
)
|
||||
}
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Copyright (c) 2026 Element Creations Ltd.
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
|
||||
* Please see LICENSE files in the repository root for full details.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.room.location
|
||||
|
||||
import io.element.android.libraries.matrix.api.room.location.LiveLocationShare
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.channelFlow
|
||||
import kotlinx.coroutines.launch
|
||||
|
||||
/**
|
||||
* Makes sure to filter and emit live location based on the endTimestamp.
|
||||
*/
|
||||
internal fun Flow<List<LiveLocationShare>>.timedByExpiry(
|
||||
currentTimeMillis: () -> Long = System::currentTimeMillis,
|
||||
): Flow<List<LiveLocationShare>> = channelFlow {
|
||||
var timerJob: Job? = null
|
||||
|
||||
fun List<LiveLocationShare>.nextExpiryAfter(timestamp: Long): Long? {
|
||||
return this
|
||||
.asSequence()
|
||||
.map { it.endTimestamp }
|
||||
.filter { it > timestamp }
|
||||
.minOrNull()
|
||||
}
|
||||
|
||||
fun List<LiveLocationShare>.filterLive(): List<LiveLocationShare> {
|
||||
val currentTimeMillis = currentTimeMillis()
|
||||
return filter { it.endTimestamp > currentTimeMillis }
|
||||
}
|
||||
|
||||
fun reschedule(shares: List<LiveLocationShare>) {
|
||||
timerJob?.cancel()
|
||||
timerJob = launch {
|
||||
val currentTimeMillis = currentTimeMillis()
|
||||
val nextExpiry = shares.nextExpiryAfter(currentTimeMillis) ?: return@launch
|
||||
delay((nextExpiry - currentTimeMillis).coerceAtLeast(0))
|
||||
val liveShares = shares.filterLive()
|
||||
send(liveShares)
|
||||
reschedule(liveShares)
|
||||
}
|
||||
}
|
||||
|
||||
collect { shares ->
|
||||
val liveShares = shares.filterLive()
|
||||
send(liveShares)
|
||||
reschedule(liveShares)
|
||||
}
|
||||
}
|
||||
|
|
@ -12,6 +12,7 @@ import io.element.android.libraries.architecture.AsyncData
|
|||
import io.element.android.libraries.matrix.api.core.ThreadId
|
||||
import io.element.android.libraries.matrix.api.core.UserId
|
||||
import io.element.android.libraries.matrix.api.notification.CallIntent
|
||||
import io.element.android.libraries.matrix.api.room.location.LiveLocationInfo
|
||||
import io.element.android.libraries.matrix.api.timeline.item.EmbeddedEventInfo
|
||||
import io.element.android.libraries.matrix.api.timeline.item.EventThreadInfo
|
||||
import io.element.android.libraries.matrix.api.timeline.item.ThreadSummary
|
||||
|
|
@ -20,6 +21,7 @@ import io.element.android.libraries.matrix.api.timeline.item.event.EventContent
|
|||
import io.element.android.libraries.matrix.api.timeline.item.event.FailedToParseMessageLikeContent
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.FailedToParseStateContent
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.LegacyCallInviteContent
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.LiveLocationContent
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.MembershipChange
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.OtherState
|
||||
import io.element.android.libraries.matrix.api.timeline.item.event.PollContent
|
||||
|
|
@ -34,8 +36,10 @@ import io.element.android.libraries.matrix.api.timeline.item.event.UtdCause
|
|||
import io.element.android.libraries.matrix.impl.media.map
|
||||
import io.element.android.libraries.matrix.impl.poll.map
|
||||
import io.element.android.libraries.matrix.impl.room.join.map
|
||||
import io.element.android.libraries.matrix.impl.room.location.into
|
||||
import kotlinx.collections.immutable.toImmutableList
|
||||
import kotlinx.collections.immutable.toImmutableMap
|
||||
import org.matrix.rustcomponents.sdk.BeaconInfo
|
||||
import org.matrix.rustcomponents.sdk.EmbeddedEventDetails
|
||||
import org.matrix.rustcomponents.sdk.MsgLikeContent
|
||||
import org.matrix.rustcomponents.sdk.MsgLikeKind
|
||||
|
|
@ -109,8 +113,14 @@ class TimelineEventContentMapper(
|
|||
)
|
||||
}
|
||||
is MsgLikeKind.LiveLocation -> {
|
||||
// Live location messages are a special kind of message that we want to treat as unknown content for now
|
||||
UnknownContent
|
||||
LiveLocationContent(
|
||||
isLive = kind.content.isLive,
|
||||
startTimestamp = kind.content.ts.toLong(),
|
||||
description = kind.content.description,
|
||||
timeout = kind.content.timeoutMs.toLong(),
|
||||
assetType = kind.content.assetType.into(),
|
||||
locations = kind.content.locations.map { location -> location.map() }
|
||||
)
|
||||
}
|
||||
is MsgLikeKind.Other -> UnknownContent
|
||||
}
|
||||
|
|
@ -266,3 +276,11 @@ private fun RustEncryptedMessage.map(): UnableToDecryptContent.Data {
|
|||
RustEncryptedMessage.Unknown -> UnableToDecryptContent.Data.Unknown
|
||||
}
|
||||
}
|
||||
|
||||
private fun BeaconInfo.map(): LiveLocationInfo {
|
||||
return LiveLocationInfo(
|
||||
description = description,
|
||||
geoUri = geoUri,
|
||||
timestamp = ts.toLong(),
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
* Copyright (c) 2026 Element Creations Ltd.
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial.
|
||||
* Please see LICENSE files in the repository root for full details.
|
||||
*/
|
||||
|
||||
package io.element.android.libraries.matrix.impl.room.location
|
||||
|
||||
import app.cash.turbine.test
|
||||
import com.google.common.truth.Truth.assertThat
|
||||
import io.element.android.libraries.matrix.api.core.UserId
|
||||
import io.element.android.libraries.matrix.api.room.location.LiveLocationShare
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.emptyFlow
|
||||
import kotlinx.coroutines.flow.flowOf
|
||||
import kotlinx.coroutines.test.advanceTimeBy
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.junit.Test
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
class TimedLiveLocationSharesFlowTest {
|
||||
@Test
|
||||
fun `it keeps emitting shares for subsequent expiries without upstream changes`() = runTest {
|
||||
val shares = listOf(
|
||||
aLiveLocationShare(userId = "@alice:server", endTimestamp = 1_000),
|
||||
aLiveLocationShare(userId = "@bob:server", endTimestamp = 2_000),
|
||||
aLiveLocationShare(userId = "@carol:server", endTimestamp = 3_000),
|
||||
)
|
||||
|
||||
flowOf(shares)
|
||||
.timedByExpiry(currentTimeMillis = { testScheduler.currentTime })
|
||||
.test {
|
||||
assertThat(awaitItem()).isEqualTo(shares)
|
||||
|
||||
advanceTimeBy(1_000)
|
||||
assertThat(awaitItem()).isEqualTo(shares.drop(1))
|
||||
|
||||
advanceTimeBy(999)
|
||||
expectNoEvents()
|
||||
|
||||
advanceTimeBy(1)
|
||||
assertThat(awaitItem()).isEqualTo(shares.drop(2))
|
||||
|
||||
advanceTimeBy(999)
|
||||
expectNoEvents()
|
||||
|
||||
advanceTimeBy(1)
|
||||
assertThat(awaitItem()).isEmpty()
|
||||
|
||||
awaitComplete()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `it does not double-emit when a share is already expired on receipt`() = runTest {
|
||||
val shares = listOf(
|
||||
aLiveLocationShare(userId = "@alice:server", endTimestamp = 500),
|
||||
aLiveLocationShare(userId = "@bob:server", endTimestamp = 2_000),
|
||||
)
|
||||
|
||||
flowOf(shares)
|
||||
.timedByExpiry(currentTimeMillis = { 1_000 + testScheduler.currentTime })
|
||||
.test {
|
||||
assertThat(awaitItem()).isEqualTo(shares.drop(1))
|
||||
expectNoEvents()
|
||||
|
||||
advanceTimeBy(999)
|
||||
expectNoEvents()
|
||||
|
||||
advanceTimeBy(1)
|
||||
assertThat(awaitItem()).isEmpty()
|
||||
|
||||
awaitComplete()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `it reschedules timed emission when upstream shares change`() = runTest {
|
||||
val upstream = MutableSharedFlow<List<LiveLocationShare>>(extraBufferCapacity = 1)
|
||||
val initialShares = listOf(aLiveLocationShare(endTimestamp = 10_000))
|
||||
val updatedShares = listOf(
|
||||
aLiveLocationShare(userId = "@alice:server", endTimestamp = 10_000),
|
||||
aLiveLocationShare(userId = "@bob:server", endTimestamp = 6_000),
|
||||
)
|
||||
|
||||
upstream
|
||||
.timedByExpiry(currentTimeMillis = { testScheduler.currentTime })
|
||||
.test {
|
||||
upstream.emit(initialShares)
|
||||
assertThat(awaitItem()).isEqualTo(initialShares)
|
||||
|
||||
advanceTimeBy(5_000)
|
||||
upstream.emit(updatedShares)
|
||||
assertThat(awaitItem()).isEqualTo(updatedShares)
|
||||
|
||||
advanceTimeBy(999)
|
||||
expectNoEvents()
|
||||
|
||||
advanceTimeBy(1)
|
||||
assertThat(awaitItem()).isEqualTo(updatedShares.take(1))
|
||||
|
||||
advanceTimeBy(3_999)
|
||||
expectNoEvents()
|
||||
|
||||
advanceTimeBy(1)
|
||||
assertThat(awaitItem()).isEmpty()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `it completes after the last scheduled re-emission when upstream completes`() = runTest {
|
||||
val shares = listOf(aLiveLocationShare(endTimestamp = 1_000))
|
||||
flowOf(shares)
|
||||
.timedByExpiry(currentTimeMillis = { testScheduler.currentTime })
|
||||
.test {
|
||||
assertThat(awaitItem()).isEqualTo(shares)
|
||||
|
||||
advanceTimeBy(1_000)
|
||||
assertThat(awaitItem()).isEmpty()
|
||||
|
||||
awaitComplete()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `it completes immediately when upstream emits nothing`() = runTest {
|
||||
emptyFlow<List<LiveLocationShare>>()
|
||||
.timedByExpiry(currentTimeMillis = { testScheduler.currentTime })
|
||||
.test {
|
||||
awaitComplete()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun aLiveLocationShare(
|
||||
userId: String = "@user:server",
|
||||
endTimestamp: Long,
|
||||
): LiveLocationShare {
|
||||
return LiveLocationShare(
|
||||
userId = UserId(userId),
|
||||
lastLocation = null,
|
||||
startTimestamp = 0L,
|
||||
endTimestamp = endTimestamp,
|
||||
)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue