Skip to content

Commit

Permalink
Merge remote-tracking branch 'StageGuard/seq-based-roaming' into prs/…
Browse files Browse the repository at this point in the history
…2549-seq-based-roaming

# Conflicts:
#	mirai-core-api/src/nativeMain/kotlin/contact/roaming/RoamingMessages.kt
  • Loading branch information
Karlatemp committed Jul 26, 2023
2 parents 575bac4 + db420c5 commit 08041f3
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1182,17 +1182,19 @@ public final class net/mamoe/mirai/contact/roaming/RoamingMessageFilter$Companio

public abstract interface class net/mamoe/mirai/contact/roaming/RoamingMessages {
public fun getAllMessages (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lkotlinx/coroutines/flow/Flow;
public fun getAllMessages (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getAllMessages (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun getAllMessages$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun getAllMessages$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun getAllMessages$suspendImpl (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getAllMessagesStream ()Ljava/util/stream/Stream;
public fun getAllMessagesStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getAllMessagesStream (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Ljava/util/stream/Stream;
public fun getAllMessagesStream (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun getAllMessagesStream$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Ljava/util/stream/Stream;
public static synthetic fun getAllMessagesStream$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun getAllMessagesStream$suspendImpl (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getMessagesBefore (Ljava/lang/Integer;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lnet/mamoe/mirai/utils/Streamable;
public abstract fun getMessagesBefore (Ljava/lang/Integer;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun getMessagesBefore$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Ljava/lang/Integer;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public fun getMessagesIn (JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lkotlinx/coroutines/flow/Flow;
public abstract fun getMessagesIn (JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun getMessagesIn$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
6 changes: 4 additions & 2 deletions mirai-core-api/compatibility-validation/jvm/api/jvm.api
Original file line number Diff line number Diff line change
Expand Up @@ -1182,17 +1182,19 @@ public final class net/mamoe/mirai/contact/roaming/RoamingMessageFilter$Companio

public abstract interface class net/mamoe/mirai/contact/roaming/RoamingMessages {
public fun getAllMessages (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lkotlinx/coroutines/flow/Flow;
public fun getAllMessages (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getAllMessages (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun getAllMessages$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun getAllMessages$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun getAllMessages$suspendImpl (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getAllMessagesStream ()Ljava/util/stream/Stream;
public fun getAllMessagesStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getAllMessagesStream (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Ljava/util/stream/Stream;
public fun getAllMessagesStream (Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun getAllMessagesStream$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Ljava/util/stream/Stream;
public static synthetic fun getAllMessagesStream$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun getAllMessagesStream$suspendImpl (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getMessagesBefore (Ljava/lang/Integer;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lnet/mamoe/mirai/utils/Streamable;
public abstract fun getMessagesBefore (Ljava/lang/Integer;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun getMessagesBefore$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;Ljava/lang/Integer;Lnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public fun getMessagesIn (JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;)Lkotlinx/coroutines/flow/Flow;
public abstract fun getMessagesIn (JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun getMessagesIn$default (Lnet/mamoe/mirai/contact/roaming/RoamingMessages;JJLnet/mamoe/mirai/contact/roaming/RoamingMessageFilter;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package net.mamoe.mirai.contact.roaming
import kotlinx.coroutines.flow.Flow
import net.mamoe.mirai.message.data.MessageChain
import net.mamoe.mirai.message.data.MessageSource
import net.mamoe.mirai.message.data.OnlineMessageSource
import net.mamoe.mirai.utils.Streamable

/**
* 漫游消息记录管理器. 可通过 [RoamingSupported.roamingMessages] 获得.
Expand Down Expand Up @@ -48,6 +50,31 @@ public expect interface RoamingMessages {
filter: RoamingMessageFilter? = null
): Flow<MessageChain>

/**
* 查询指定消息之前的消息记录
*
* 返回查询到的漫游消息记录, 顺序为由新到旧. 这些 [MessageChain] 与从事件中收到的消息链相似, 属于在线消息.
* 可从 [MessageChain] 获取 [MessageSource] 来确定发送人等相关信息, 也可以进行引用回复或撤回.
*
* 注意, 返回的消息记录既包含机器人发送给目标用户的消息, 也包含目标用户发送给机器人的消息.
* 可通过 [MessageChain] 获取 [MessageSource] (用法为 `messageChain.source`), 判断 [MessageSource.fromId] (发送人).
* 消息的其他*元数据*信息也要通过 [MessageSource] 获取 (如 [MessageSource.time] 获取时间).
*
* 若只需要获取单向消息 (机器人发送给目标用户的消息或反之), 可使用 [RoamingMessageFilter.SENT] 或 [RoamingMessageFilter.RECEIVED] 作为 [filter] 参数传递.
*
* 性能提示: 请在 [filter] 执行筛选, 若 [filter] 返回 `false` 则不会解析消息链, 这对本函数的处理速度有决定性影响.
*
* @param messageId 消息序列号,请查看 [MessageSource.ids], 一般为 [OnlineMessageSource] 的序列号。
* 为 `null` 时从最近一条消息开始获取且包含该消息.
* @param filter 过滤器.
* @since 2.15
* @see MessageSource
*/
public suspend fun getMessagesBefore(
messageId: Int? = null,
filter: RoamingMessageFilter? = null
): Streamable<MessageChain>

/**
* 查询所有漫游消息记录.
*
Expand All @@ -64,7 +91,7 @@ public expect interface RoamingMessages {
*
* @param filter 过滤器.
*/
public open suspend fun getAllMessages(
public suspend fun getAllMessages(
filter: RoamingMessageFilter? = null
): Flow<MessageChain>
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import kotlinx.coroutines.flow.Flow
import me.him188.kotlin.jvm.blocking.bridge.JvmBlockingBridge
import net.mamoe.mirai.message.data.MessageChain
import net.mamoe.mirai.message.data.MessageSource
import net.mamoe.mirai.message.data.OnlineMessageSource
import net.mamoe.mirai.utils.JavaFriendlyAPI
import net.mamoe.mirai.utils.JdkStreamSupport.toStream
import net.mamoe.mirai.utils.Streamable
import java.util.stream.Stream


Expand Down Expand Up @@ -56,6 +58,31 @@ public actual interface RoamingMessages {
filter: RoamingMessageFilter? = null
): Flow<MessageChain>

/**
* 查询指定消息之前的消息记录
*
* 返回查询到的漫游消息记录, 顺序为由新到旧. 这些 [MessageChain] 与从事件中收到的消息链相似, 属于在线消息.
* 可从 [MessageChain] 获取 [MessageSource] 来确定发送人等相关信息, 也可以进行引用回复或撤回.
*
* 注意, 返回的消息记录既包含机器人发送给目标用户的消息, 也包含目标用户发送给机器人的消息.
* 可通过 [MessageChain] 获取 [MessageSource] (用法为 `messageChain.source`), 判断 [MessageSource.fromId] (发送人).
* 消息的其他*元数据*信息也要通过 [MessageSource] 获取 (如 [MessageSource.time] 获取时间).
*
* 若只需要获取单向消息 (机器人发送给目标用户的消息或反之), 可使用 [RoamingMessageFilter.SENT] 或 [RoamingMessageFilter.RECEIVED] 作为 [filter] 参数传递.
*
* 性能提示: 请在 [filter] 执行筛选, 若 [filter] 返回 `false` 则不会解析消息链, 这对本函数的处理速度有决定性影响.
*
* @param messageId 消息序列号,请查看 [MessageSource.ids], 一般为 [OnlineMessageSource] 的序列号。
* 为 `null` 时从最近一条消息开始获取且包含该消息.
* @param filter 过滤器.
* @since 2.15
* @see MessageSource
*/
public actual suspend fun getMessagesBefore(
messageId: Int?,
filter: RoamingMessageFilter?
): Streamable<MessageChain>

/**
* 查询所有漫游消息记录. Java Stream 方法查看 [getAllMessagesStream].
*
Expand All @@ -75,7 +102,7 @@ public actual interface RoamingMessages {
@Suppress("ACTUAL_FUNCTION_WITH_DEFAULT_ARGUMENTS") // Keep JVM ABI
public actual suspend fun getAllMessages(
filter: RoamingMessageFilter? = null
): Flow<MessageChain> = getMessagesIn(0, Long.MAX_VALUE, filter)
): Flow<MessageChain>

/**
* 查询指定时间段内的漫游消息记录. Kotlin Flow 版本查看 [getMessagesIn].
Expand Down Expand Up @@ -125,5 +152,5 @@ public actual interface RoamingMessages {
@JavaFriendlyAPI
public suspend fun getAllMessagesStream(
filter: RoamingMessageFilter? = null
): Stream<MessageChain> = getMessagesStream(0, Long.MAX_VALUE, filter)
): Stream<MessageChain> = getAllMessages().toStream()
}
7 changes: 7 additions & 0 deletions mirai-core-mock/src/database/MessageDatabase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public interface MessageDatabase {
filter: RoamingMessageFilter
): Sequence<MessageInfo>

public fun queryMessageInfosBy(
subject: Long, kind: MessageSourceKind,
contact: Contact,
sequence: Long,
filter: RoamingMessageFilter
): Sequence<MessageInfo>

/**
* implementation note: 该方法可能同时被多个线程同时调用
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package net.mamoe.mirai.mock.internal.contact.roaming

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emptyFlow
import net.mamoe.mirai.contact.Friend
import net.mamoe.mirai.contact.Group
import net.mamoe.mirai.contact.Stranger
Expand All @@ -22,6 +23,7 @@ import net.mamoe.mirai.message.data.MessageSourceKind
import net.mamoe.mirai.mock.internal.MockBotImpl
import net.mamoe.mirai.mock.utils.mock
import net.mamoe.mirai.utils.JavaFriendlyAPI
import net.mamoe.mirai.utils.Streamable
import net.mamoe.mirai.utils.cast
import java.util.stream.Stream
import kotlin.streams.asStream
Expand All @@ -37,6 +39,22 @@ internal class MockRoamingMessages(
return getMsg(timeStart, timeEnd, filter).asFlow()
}

override suspend fun getMessagesBefore(
messageId: Int?,
filter: RoamingMessageFilter?
): Streamable<MessageChain> {
return object : Streamable<MessageChain> {
override fun asFlow(): Flow<MessageChain> {
messageId ?: return emptyFlow()
return getMsg(messageId.toLong(), filter).asFlow()
}
}
}

override suspend fun getAllMessages(filter: RoamingMessageFilter?): Flow<MessageChain> {
return getMsg(0, Long.MAX_VALUE, filter).asFlow()
}

private fun getMsg(
timeStart: Long,
timeEnd: Long,
Expand All @@ -58,6 +76,25 @@ internal class MockRoamingMessages(
).map { it.buildSource(contact.bot.mock()) + it.message }
}

private fun getMsg(
sequence: Long,
filter: RoamingMessageFilter?
): Sequence<MessageChain> {
val msgDb = contact.bot.cast<MockBotImpl>().msgDatabase
return msgDb.queryMessageInfosBy(
contact.id,
when (contact) {
is Friend -> MessageSourceKind.FRIEND
is Group -> MessageSourceKind.GROUP
is Stranger -> MessageSourceKind.STRANGER
else -> error(contact.javaClass.toString())
},
contact,
sequence,
filter ?: RoamingMessageFilter.ANY
).map { it.buildSource(contact.bot.mock()) + it.message }
}

@JavaFriendlyAPI
override suspend fun getMessagesStream(
timeStart: Long,
Expand Down
64 changes: 44 additions & 20 deletions mirai-core-mock/src/internal/db/MsgDatabaseImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,38 +65,62 @@ internal class MsgDatabaseImpl : MessageDatabase {
): Sequence<MessageInfo> {
if (timeEnd < timeStart) return emptySequence()
return sequence<MessageInfo> {
val rm = object : RoamingMessage {
override val contact: Contact get() = contact
override var sender: Long = -1
override var target: Long = -1
override var time: Long = -1
override val ids: IntArray = intArrayOf(-1)
override val internalIds: IntArray = intArrayOf(-1)
}
for (msgInfo in db) {
if (msgInfo.kind != kind) continue
if (msgInfo.time < timeStart) continue
if (msgInfo.time > timeEnd) continue
if (msgInfo.subject != subject) continue

rm.sender = msgInfo.sender
if (kind != MessageSourceKind.GROUP) {
if (msgInfo.sender == contact.id) {
rm.target = contact.bot.id
} else {
rm.target = msgInfo.subject
}
} else {
rm.target = msgInfo.subject
val rm = msgInfo.toRoamingMessage(contact)

if (filter.invoke(rm)) {
yield(msgInfo)
}
rm.time = msgInfo.time
rm.ids[0] = msgInfo.id
rm.internalIds[0] = msgInfo.internal
}
}
}

override fun queryMessageInfosBy(
subject: Long, kind: MessageSourceKind,
contact: Contact,
sequence: Long,
filter: RoamingMessageFilter
): Sequence<MessageInfo> {
return sequence<MessageInfo> {
var emitted = 0
for (msgInfo in db) {
if (msgInfo.kind != kind) continue
if (msgInfo.subject != subject) continue

val rm = msgInfo.toRoamingMessage(contact)

if (filter.invoke(rm)) {
yield(msgInfo)
}
}
}
}

private fun MessageInfo.toRoamingMessage(contact: Contact): RoamingMessage {
val info = this

return object : RoamingMessage {
override val contact: Contact = contact
override val sender: Long = info.sender
override val target: Long =
if (info.kind != MessageSourceKind.GROUP) {
if (info.sender == contact.id) {
contact.bot.id
} else {
info.subject
}
} else {
info.subject
}
override val time: Long = info.time
override val ids: IntArray = IntArray(1) { info.id }
override val internalIds: IntArray = IntArray(1) { info.internal }

}
}
}
Loading

0 comments on commit 08041f3

Please sign in to comment.