From 609d9d90067b98c07ccc823b41180c05a55084b1 Mon Sep 17 00:00:00 2001 From: Flowyi Date: Wed, 26 Jul 2023 12:11:25 +0800 Subject: [PATCH] support chat room connection token (#5) --- lib/components/chat.dart | 10 +- lib/components/chat_room.dart | 19 +- lib/components/setting.dart | 5 +- lib/controller/chat_room.dart | 8 +- lib/controller/message.dart | 28 ++- lib/controller/settings.dart | 72 +----- lib/data/glm.dart | 95 -------- lib/main.dart | 4 + lib/repository/chat_room.dart | 433 +++++++++++++++++++++++----------- lib/repository/message.dart | 5 - lib/utils/tidb.dart | 49 ++++ 11 files changed, 399 insertions(+), 329 deletions(-) delete mode 100644 lib/data/glm.dart create mode 100644 lib/utils/tidb.dart diff --git a/lib/components/chat.dart b/lib/components/chat.dart index d632622..2f18cff 100644 --- a/lib/components/chat.dart +++ b/lib/components/chat.dart @@ -132,7 +132,7 @@ class _ChatWindowState extends State { final MessageController messageController = Get.find(); final ChatRoomController chatRoomController = Get.find(); - var chatRoomUuid = chatRoomController.currentChatRoomUuid.value; + final room = chatRoomController.getCurrentRoom(); var first_letters = message.substring(0, min(3, message.length)).toLowerCase(); var ask_ai = first_letters == "@ai"; @@ -150,7 +150,7 @@ class _ChatWindowState extends State { source: MessageSource.user, ask_ai: ask_ai, ); - messageController.addMessage(chatRoomUuid, newMessage, ai_question); + messageController.addMessage(room, newMessage, ai_question); _formKey.currentState!.reset(); } @@ -261,7 +261,7 @@ class _ChatWindowState extends State { var polling = false; _timer = Timer.periodic( const Duration(seconds: 3), - (Timer timer) { + (Timer timer) { if (polling) { return; } @@ -275,8 +275,8 @@ class _ChatWindowState extends State { void _loadMessagesRemote() async { final ChatRoomController chatRoomController = Get.find(); - var chatRoomUuid = chatRoomController.currentChatRoomUuid.value; + final room = chatRoomController.getCurrentRoom(); final MessageController messageController = Get.find(); - messageController.upsertRemoteMessages(chatRoomUuid); + messageController.upsertRemoteMessages(room); } } diff --git a/lib/components/chat_room.dart b/lib/components/chat_room.dart index 2ba5ac7..13d0a6b 100644 --- a/lib/components/chat_room.dart +++ b/lib/components/chat_room.dart @@ -58,10 +58,10 @@ class _ChatRoomState extends State { final comp.ChatRoomController chatRoomController = Get.find(); chatRoomController.setCurrentRoom(index); if (index >= 0) { - String roomUuid = chatRoomController.roomList[index].uuid; - chatRoomController.currentChatRoomUuid(roomUuid); + final room = chatRoomController.getCurrentRoom(); + chatRoomController.currentChatRoomUuid(room.uuid); MessageController controllerMessage = Get.find(); - controllerMessage.loadAllMessages(roomUuid); + controllerMessage.loadAllMessages(room); } } } @@ -219,10 +219,12 @@ class NewChatButton extends StatelessWidget { const uuid = Uuid(); var createTime = DateTime.now().toUtc(); repo.ChatRoom chatRoom = repo.ChatRoom( - uuid: uuid.v1(), - name: "New Chat Room", - createTime: createTime, - connectionToken: ""); + uuid: uuid.v1(), + name: "New Chat Room", + createTime: createTime, + connectionToken: repo.ChatRoomRepository.myTiDBConn.toToken(), + role: repo.Role.host, + ); chatRoomController.addChatRoom(chatRoom); FirebaseAnalytics.instance.logEvent(name: "chat_room_add"); @@ -323,8 +325,9 @@ class _ChatDetailButtonState extends State final comp.ChatRoomController chatRoomController = Get.find(); final MessageController messageController = Get.find(); messageController.messageList.value = []; + final room = chatRoomController.getCurrentRoom(); chatRoomController.setCurrentRoom(-1); - chatRoomController.deleteChatRoom(); + chatRoomController.deleteChatRoom(room); FirebaseAnalytics.instance.logEvent(name: "chat_room_delete"); } diff --git a/lib/components/setting.dart b/lib/components/setting.dart index 9291e09..410fde3 100644 --- a/lib/components/setting.dart +++ b/lib/components/setting.dart @@ -90,7 +90,8 @@ class _SettingPageState extends State { children: [ const Text("AGI"), Tooltip( - message: "Artificial General Intelligence", + message: + "Artificial General Intelligence.\nYou can use @ai to talk to the AI service in any chat room.", child: IconButton( iconSize: 10.0, splashRadius: 10, @@ -256,7 +257,7 @@ class _SettingPageState extends State { const Text("TiDB Serverless"), Tooltip( message: - "TiDB Serverless is an online database service which can be accessed from any where. Get it for free: www.tidbcloud.com", + "TiDB Serverless is an online database service.\nMoyubie can store your chat messages on TiDB Serverless, so that you can access them from anywhere with any devices.\nTiDB Serverless is also required for group chat.", child: IconButton( iconSize: 10.0, splashRadius: 10, diff --git a/lib/controller/chat_room.dart b/lib/controller/chat_room.dart index f6ee2ab..f7efd03 100644 --- a/lib/controller/chat_room.dart +++ b/lib/controller/chat_room.dart @@ -14,6 +14,10 @@ class ChatRoomController extends GetxController { super.onInit(); } + ChatRoom getCurrentRoom() { + return roomList[currentRoomIndex.value.value]; + } + void setCurrentRoom(int index) async { currentRoomIndex.value = IntegerWrapper(index); if (index > 0) { @@ -22,8 +26,8 @@ class ChatRoomController extends GetxController { update(); } - void deleteChatRoom() async { - await ChatRoomRepository().deleteChatRoom(currentChatRoomUuid.value); + void deleteChatRoom(ChatRoom room) async { + await ChatRoomRepository().deleteChatRoom(room); roomList.value = await ChatRoomRepository().getChatRooms(); update(); } diff --git a/lib/controller/message.dart b/lib/controller/message.dart index 534a33d..ab49ebb 100644 --- a/lib/controller/message.dart +++ b/lib/controller/message.dart @@ -11,18 +11,20 @@ class MessageController extends GetxController { final messageList = [].obs; final uuid = const Uuid(); - void loadAllMessages(String chatRoomUuid) async { - final msgList = await ChatRoomRepository().getMessagesByChatRoomUUid(chatRoomUuid); + void loadAllMessages(ChatRoom room) async { + final msgList = await ChatRoomRepository().getMessagesByChatRoomUUid(room); messageList.value = msgList; - final messageListRemote = await ChatRoomRepository().getNewMessagesByChatRoomUuidRemote( - chatRoomUuid, msgList.lastOrNull?.createTime); + final messageListRemote = await ChatRoomRepository() + .getNewMessagesByChatRoomUuidRemote( + room, msgList.lastOrNull?.createTime); messageList.value = [...msgList, ...messageListRemote]; update(); } - void upsertRemoteMessages(String roomUuid) async { + void upsertRemoteMessages(ChatRoom room) async { final lastMsgTime = messageList.lastOrNull?.createTime; - final newMessages = await ChatRoomRepository().getNewMessagesByChatRoomUuidRemote(roomUuid, lastMsgTime); + final newMessages = await ChatRoomRepository() + .getNewMessagesByChatRoomUuidRemote(room, lastMsgTime); bool needUpdate = false; for (var item in newMessages) { if (messageList.where((m) => m.uuid == item.uuid).isEmpty) { @@ -35,13 +37,13 @@ class MessageController extends GetxController { } } - void addMessage( - String chatRoomUuid, Message input, String ai_question) async { + void addMessage(ChatRoom room, Message input, String ai_question) async { // Add user intput to message list - await ChatRoomRepository().addMessage(chatRoomUuid, input); + await ChatRoomRepository().addMessage(room, input); - final messages = - await ChatRoomRepository().getMessagesByChatRoomUUid(chatRoomUuid); + final chatRoomUuid = room.uuid; + + final messages = await ChatRoomRepository().getMessagesByChatRoomUUid(room); if (!input.ask_ai) { messageList.value = messages; return; @@ -65,9 +67,9 @@ class MessageController extends GetxController { }, // (Message res) async { // if streaming is done ,load all the message - ChatRoomRepository().addMessage(chatRoomUuid, res); + ChatRoomRepository().addMessage(room, res); final messages = - await ChatRoomRepository().getMessagesByChatRoomUUid(chatRoomUuid); + await ChatRoomRepository().getMessagesByChatRoomUUid(room); messageList.value = messages; completer.complete(); }); diff --git a/lib/controller/settings.dart b/lib/controller/settings.dart index b247268..83e8ec3 100644 --- a/lib/controller/settings.dart +++ b/lib/controller/settings.dart @@ -4,6 +4,7 @@ import 'package:flutter/foundation.dart'; import 'package:flutter/material.dart'; import 'package:moyubie/repository/chat_room.dart'; import 'package:moyubie/utils/package.dart'; +import 'package:moyubie/utils/tidb.dart'; import 'package:get/get.dart'; import 'package:get_storage/get_storage.dart'; @@ -63,28 +64,6 @@ class SettingsController extends GetxController { version.value = await getAppVersion(); } - // void setGlmBaseUrl(String baseUrl) { - // glmBaseUrl.value = baseUrl; - // GetStorage _box = GetStorage(); - // _box.write('glmBaseUrl', baseUrl); - // } - - // getGlmBaseUrlFromPreferences() async { - // GetStorage _box = GetStorage(); - // String baseUrl = _box.read('glmBaseUrl') ?? "https://api.openai-proxy.com"; - // setGlmBaseUrl(baseUrl); - // } - - Future validateTiDB() async { - var crr = ChatRoomRepository(); - var conn = await crr.getRemoteDb(forceInit: true); - if (conn == null) { - return "Cannot connect to remote database with ${crr.remoteDBToString()}, "; - } - - return null; - } - Future validateLLM() async { if (llm.value == "OpenAI") { if (openAiKey.value.length <= 10) { @@ -141,26 +120,25 @@ class SettingsController extends GetxController { } var res = updateTiDBCmdToRepo(serverlessCmd.value); - res ??= await validateTiDB(); + res ??= await ChatRoomRepository.myTiDBConn.validateRemoteDB(); String popMsg; switch (res) { case null: { - ChatRoomRepository().setRemoteDBValid(true); popMsg = "OK"; break; } case "Empty": { - ChatRoomRepository().setRemoteDBValid(false); + ChatRoomRepository.myTiDBConn.clearConnect(); popMsg = "Warn: Chat messages are only saved to local if you don't specify TiDB Serverless connectin."; break; } default: { - ChatRoomRepository().setRemoteDBValid(false); + ChatRoomRepository.myTiDBConn.clearConnect(); popMsg = "Connect to TiDB Serverless failed:\n$res."; } } @@ -186,46 +164,10 @@ class SettingsController extends GetxController { String? updateTiDBCmdToRepo(String cmd) { if (cmd.isEmpty) return "Empty"; - cmd = cmd.replaceFirst(" -p", " -p "); - final options = cmd.split(" "); - var nextOpts = List.from(options); - nextOpts.removeAt(0); - nextOpts.add(""); - String user = ""; - String host = ""; - int port = 0; - String password = ""; - - try { - for (int i = 0; i < options.length; i += 1) { - final opt = options[i]; - final nextOpt = nextOpts[i]; - switch (opt) { - case "-u": - case "--user": - user = nextOpt.replaceAll("'", ""); - user = user.replaceAll('"', ""); - user = user.replaceAll("'", ""); - break; - case "-h": - case "--host": - host = nextOpt; - break; - case "-P": - case "--port": - port = int.parse(nextOpt); - break; - case "-p": - case "--password": - password = nextOpt; - break; - default: - } - } - } catch (e) { - return e.toString(); + var (host, port, user, password) = parseTiDBConnectionText(cmd); + if (port == 0) { + return user; } - ChatRoomRepository().updateRemoteDBConfig(host, port, user, password); if (user.isEmpty || host.isEmpty || port == 0 || password.isEmpty) { diff --git a/lib/data/glm.dart b/lib/data/glm.dart deleted file mode 100644 index 2ca6914..0000000 --- a/lib/data/glm.dart +++ /dev/null @@ -1,95 +0,0 @@ -import 'package:flutter/foundation.dart'; -import 'package:moyubie/data/llm.dart'; -import 'package:get_storage/get_storage.dart'; -import 'dart:convert'; -import 'package:http/http.dart' as http; -import 'package:uuid/uuid.dart'; - -import '../repository/chat_room.dart'; - -class ChatGlM extends LLM { - final uuid = const Uuid(); - @override - getResponse( - String chatRoomUuid, - String userName, - String question, - AIConversationContext convContext, - ValueChanged onResponse, - ValueChanged errorCallback, - ValueChanged onSuccess) async { - // var messageToBeSend = messages.removeLast(); - // var prompt = messageToBeSend.message; - // var history = messages.length >= 2 ? collectHistory(messages) : []; - // var body = {'query': prompt, 'history': history.isEmpty ? [] : history}; - // var glmBaseUrl = GetStorage().read("glmBaseUrl") ?? ""; - // if (glmBaseUrl.isEmpty) { - // errorCallback(Message( - // uuid: uuid.v4(), - // userName: userName, - // createTime: DateTime.now(), - // message: "glm baseUrl is empty,please set you glmBaseUrl first", - // source: MessageSource.bot, - // )); - // return; - // } - // final request = http.Request("POST", Uri.parse(glmBaseUrl)); - // request.headers.addAll({'Content-Type': 'application/json'}); - // final requestBody = json.encode(body); - // request.body = requestBody; - // try { - // final response = await request.send(); - // /** chunk like this - // * event: delta - // * data: {"delta": "j", "response": "j", "finished": false} - // */ - // await for (final chunk in response.stream.transform(utf8.decoder)) { - // String data = chunk.split('\n').firstWhere( - // (element) => element.startsWith("data:"), - // orElse: () => 'No matching data'); - // if (!data.startsWith("data:")) { - // continue; - // } - // final jsonData = jsonDecode(data.split("data:")[1].trim()); - // if (jsonData["finished"]) { - // onSuccess(Message( - // uuid: uuid.v4(), - // userName: userName, - // createTime: DateTime.now(), - // message: jsonData["response"], - // source: MessageSource.bot)); - // } else { - // onResponse(Message( - // uuid: uuid.v4(), - // userName: userName, - // createTime: DateTime.now(), - // message: jsonData["response"], - // source: MessageSource.bot)); - // } - // } - // } catch (e) { - // errorCallback(Message( - // uuid: uuid.v4(), - // userName: userName, - // createTime: DateTime.now(), - // message: e.toString(), - // source: MessageSource.bot, - // )); - // } - } -} - -List> collectHistory(List list) { - List> result = []; - for (int i = list.length - 1; i >= 0; i -= 2) { - //只添加最近的会话 - if (i - 1 > 0) { - result.insert(0, [list[i - 1].message, list[i].message]); - } - if (result.length > 3) { - //放太多轮次也没啥意思 - break; - } - } - return result; -} diff --git a/lib/main.dart b/lib/main.dart index ba69ea7..3db3994 100644 --- a/lib/main.dart +++ b/lib/main.dart @@ -37,6 +37,10 @@ void main() async { sqfliteFfiInit(); databaseFactory = databaseFactoryFfi; } + + String path = join(await getDatabasesPath(), 'moyubie.db'); + await deleteDatabase(path); + runApp(const MyApp()); } diff --git a/lib/repository/chat_room.dart b/lib/repository/chat_room.dart index e2481cc..205208c 100644 --- a/lib/repository/chat_room.dart +++ b/lib/repository/chat_room.dart @@ -2,29 +2,120 @@ import 'package:font_awesome_flutter/font_awesome_flutter.dart'; import 'package:get_storage/get_storage.dart'; import 'package:mysql_client/exception.dart'; import 'package:sqflite/sqflite.dart'; +import 'package:moyubie/utils/tidb.dart'; import 'package:firebase_analytics/firebase_analytics.dart'; import 'package:path/path.dart'; import 'package:mysql_client/mysql_client.dart'; import 'package:sqflite_common_ffi/sqflite_ffi.dart'; +import 'dart:convert'; +import 'dart:math'; + +class TiDBConnection { + MySQLConnection? connection; + + String host = ""; + int port = 0; + String userName = ""; + String userNamePrefix = ""; + String password = ""; + + close() async { + if (connection != null) { + if (connection!.connected) { + await connection?.close(); + } + } + connection = null; + } + + bool hasSet() { + return host.isNotEmpty && + port != 0 && + userName.isNotEmpty && + password.isNotEmpty; + } + + setConnect(String host, int port, String userName, String password) async { + this.host = host; + this.port = port; + this.userName = userName; + this.password = password; + userNamePrefix = userName.split(".").first; + + close(); + } + + clearConnect() async { + host = ""; + port = 0; + userName = ""; + userNamePrefix = ""; + password = ""; + + close(); + } + + @override + String toString() { + return "hose: $host, port: $port, userName: $userName, password: $password"; + } + + String toToken() { + String connText = hasSet() // + ? "mysql -u '$userName' -h $host -P 4000 -p$password" + : ""; + return base64.encode(utf8.encode(connText)); + } + + Future validateRemoteDB() async { + var dbConn = + await ChatRoomRepository.getRemoteDb(this, true, forceInit: true); + if (dbConn == null) { + return "Cannot connect to remote database with ${toString()}, "; + } + return null; + } + + static TiDBConnection fromToken(String token) { + String str = utf8.decode(base64.decode(token)); + var conn = TiDBConnection(); + var (host, port, userName, password) = parseTiDBConnectionText(str); + conn.setConnect(host, port, userName, password); + return conn; + } +} + +enum Role { + host, + guest, +} class ChatRoom { String uuid; String name; DateTime createTime; // UTC time zone. String connectionToken; - - ChatRoom( - {required this.uuid, - required this.name, - required this.createTime, - required this.connectionToken}); + Role role; + + ChatRoom({ + required this.uuid, + required this.name, + required this.createTime, + required this.connectionToken, + required this.role, + }); + + bool isHost() { + return role == Role.host; + } Map toSQLMap() { return { 'uuid': uuid, 'name': name, 'create_time': createTime.millisecondsSinceEpoch, - 'connection_token': connectionToken, + 'connection': connectionToken, + 'role': role.name, }; } } @@ -78,7 +169,9 @@ class ChatRoomRepository { static const String _columnChatRoomName = 'name'; // UTC time zone. SQLite: Integer(i.e. Unix Time), TiDB: DateTime static const String _columnChatRoomCreateTime = 'create_time'; - static const String _columnChatRoomConnectionToken = 'connection_token'; + static const String _columnChatRoomConnectionToken = 'connection'; + // The user role of this chat room, could be 'host' or 'guest' + static const String _columnChatRoomRole = 'role'; static const String _columnMessageUuid = 'uuid'; static const String _columnMessageUserName = 'user_name'; @@ -88,16 +181,17 @@ class ChatRoomRepository { static const String _columnMessageSource = 'source'; static const String _columnAskAI = 'ask_ai'; - static MySQLConnection? _remoteDatabase; - static bool isRemoteDBValid = false; - static String host = ""; - static int port = 0; - static String userName = ""; - static String password = ""; + static var myTiDBConn = TiDBConnection(); + // TODO: We haven't implement connection GC yet!!! + static var connMap = {}; static Database? _database; static ChatRoomRepository? _instance; + static const _chars = + 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz1234567890'; + final Random _rnd = Random(); + ChatRoomRepository._internal(); factory ChatRoomRepository() { @@ -116,7 +210,8 @@ class ChatRoomRepository { $_columnChatRoomUuid VARCHAR(36) PRIMARY KEY, $_columnChatRoomName TEXT, $_columnChatRoomCreateTime INTEGER, - $_columnChatRoomConnectionToken TEXT + $_columnChatRoomConnectionToken TEXT, + $_columnChatRoomRole TEXT ) '''); }); @@ -124,12 +219,31 @@ class ChatRoomRepository { return _database!; } + TiDBConnection ensureConnection(String token) { + var conn = connMap[token]; + if (conn != null) { + return conn; + } + conn = TiDBConnection.fromToken(token); + connMap[token] = conn; + return conn; + } + updateRemoteDBConfig( - String host, int port, String userName, String password) { - ChatRoomRepository.host = host; - ChatRoomRepository.port = port; - ChatRoomRepository.userName = userName; - ChatRoomRepository.password = password; + String host, + int port, + String userName, + String password, + ) { + myTiDBConn.setConnect(host, port, userName, password); + } + + static Future validateRemoteDB(TiDBConnection conn) async { + var dbConn = await getRemoteDb(conn, true, forceInit: true); + if (dbConn == null) { + return "Cannot connect to remote database with ${conn.toString()}, "; + } + return null; } removeDatabase() async { @@ -139,76 +253,88 @@ class ChatRoomRepository { } Future removeDatabaseRemote() async { - final db = await getRemoteDb(); + final db = await getRemoteDb(myTiDBConn, true); if (db != null) { var res = await db.execute("SHOW DATABASES LIKE 'moyubie';"); if (res.rows.isNotEmpty) { await db.execute("DROP DATABASE moyubie;"); } - await _remoteDatabase?.close(); - _remoteDatabase = null; + res = await db.execute( + "SELECT `user` FROM mysql.user where `user` like '${myTiDBConn.userNamePrefix}.MYB_%';"); + for (final row in res.rows) { + final user = row.colByName("user"); + await db.execute("DROP USER '$user';"); + } + + await myTiDBConn.close(); return true; } else { return false; } } - String remoteDBToString() { - return "hose: $host, port: $port, userName: $userName, password: $password"; - } - - void setRemoteDBValid(bool v) { - isRemoteDBValid = v; + String myRemoteDBToString() { + return "hose: ${myTiDBConn.host}, port: ${myTiDBConn.port}, userName: ${myTiDBConn.userName}, password: ${myTiDBConn.password}"; } - Future getRemoteDb({bool forceInit = false}) async { + static Future getRemoteDb(TiDBConnection conn, bool isHost, + {bool forceInit = false}) async { bool shouldInit = - _remoteDatabase == null || !_remoteDatabase!.connected || forceInit; - if (host.isEmpty || (!isRemoteDBValid && !forceInit)) { + conn.connection == null || !conn.connection!.connected || forceInit; + if (conn.host.isEmpty || + conn.port == 0 || + conn.userName.isEmpty || + conn.password.isEmpty) { shouldInit = false; } try { if (shouldInit) { // Make sure the old connection has been close - _remoteDatabase?.close(); + conn.connection?.close(); - var conn = await MySQLConnection.createConnection( - host: host, port: port, userName: userName, password: password); - _remoteDatabase = conn; - if (_remoteDatabase == null) return Future(() => null); + var dbConn = await MySQLConnection.createConnection( + host: conn.host, + port: conn.port, + userName: conn.userName, + password: conn.password); + conn.connection = dbConn; - await conn.connect(); + await dbConn.connect(); - conn.onClose(() { + dbConn.onClose(() { // I haven't check the client carefully. // Is it enough to handle connection broken or someting bad? - _remoteDatabase = null; + conn.connection = null; }); - var res = await conn.execute("SHOW DATABASES LIKE 'moyubie';"); - if (res.rows.isEmpty) { - FirebaseAnalytics.instance.logEvent(name: "remote_createdb"); - await conn.execute("CREATE DATABASE IF NOT EXISTS moyubie;"); - } - await conn.execute("USE moyubie;"); - res = await conn.execute("SHOW TABLES LIKE 'chat_room';"); - if (res.rows.isEmpty) { - await conn.execute(''' - CREATE TABLE IF NOT EXISTS $_tableChatRoom ( + if (isHost) { + var res = await dbConn.execute("SHOW DATABASES LIKE 'moyubie';"); + if (res.rows.isEmpty) { + FirebaseAnalytics.instance.logEvent(name: "remote_createdb"); + await dbConn.execute("CREATE DATABASE IF NOT EXISTS moyubie;"); + } + await dbConn.execute("USE moyubie;"); + res = await dbConn.execute("SHOW TABLES LIKE 'chat_room';"); + if (res.rows.isEmpty) { + await dbConn.execute(''' + CREATE TABLE IF NOT EXISTS $_tableChatRoom ( $_columnChatRoomUuid VARCHAR(36) PRIMARY KEY, $_columnChatRoomName TEXT, - $_columnChatRoomCreateTime DATETIME, - $_columnChatRoomConnectionToken TEXT - ) - '''); + $_columnChatRoomCreateTime DATETIME(6), + $_columnChatRoomConnectionToken TEXT, + $_columnChatRoomRole TEXT + ) + '''); + } } } } catch (e) { return Future(() => null); } - return _remoteDatabase; + + return conn.connection; } Future> getChatRooms({String? where}) async { @@ -218,18 +344,19 @@ class ChatRoomRepository { return List.generate(maps.length, (i) { var ct = maps[i][_columnChatRoomCreateTime]; return ChatRoom( - uuid: maps[i][_columnChatRoomUuid], - name: maps[i][_columnChatRoomName], - createTime: DateTime.fromMicrosecondsSinceEpoch(ct), - connectionToken: maps[i][_columnChatRoomConnectionToken], - ); + uuid: maps[i][_columnChatRoomUuid], + name: maps[i][_columnChatRoomName], + createTime: DateTime.fromMicrosecondsSinceEpoch(ct), + connectionToken: maps[i][_columnChatRoomConnectionToken], + role: Role.values + .firstWhere((e) => e.name == maps[i][_columnChatRoomRole])); }); } Future> getChatRoomsRemote() async { - final db = await getRemoteDb(); + final db = await getRemoteDb(myTiDBConn, true); if (db == null) return Future(() => []); - var res = await db.execute("SELECT * FROM $_tableChatRoom;"); + var res = await db.execute("SELECT * FROM moyubie.$_tableChatRoom;"); return res.rows.map((e) { var maps = e.assoc(); return ChatRoom( @@ -237,6 +364,8 @@ class ChatRoomRepository { name: maps[_columnChatRoomName]!, createTime: DateTime.parse(maps[_columnChatRoomCreateTime]!), connectionToken: maps[_columnChatRoomConnectionToken]!, + role: + Role.values.firstWhere((e) => e.name == maps[_columnChatRoomRole]), ); }).toList(); } @@ -245,8 +374,9 @@ class ChatRoomRepository { final db = await _getDb(); // await db.execute("DELETE FROM $_tableChatRoom;"); for (var room in rooms) { + // TODO Remote this await db.execute(''' - CREATE TABLE IF NOT EXISTS `${room.uuid}` ( + CREATE TABLE IF NOT EXISTS `msg_${room.uuid}` ( $_columnMessageUuid VARCHAR(36) PRIMARY KEY, $_columnMessageUserName TEXT, $_columnMessageCreateTime INTEGER, @@ -263,44 +393,77 @@ class ChatRoomRepository { } } - Future addChatRoom(ChatRoom chatRoom) async { - final db = await _getDb(); - await db.insert( - _tableChatRoom, - chatRoom.toSQLMap(), - conflictAlgorithm: ConflictAlgorithm.replace, - ); - await db.execute(''' - CREATE TABLE IF NOT EXISTS `${chatRoom.uuid}` ( + String genRandomString(int length) { + return String.fromCharCodes(Iterable.generate( + length, (_) => _chars.codeUnitAt(_rnd.nextInt(_chars.length)))); + } + + Future addChatRoom(ChatRoom room) async { + var roomConn = TiDBConnection.fromToken(room.connectionToken); + + final remoteDB = await getRemoteDb(myTiDBConn, true); + if (remoteDB != null) { + if (room.isHost()) { + String user = "${myTiDBConn.userNamePrefix}.MYB_${genRandomString(10)}"; + String pwd = genRandomString(20); + await remoteDB.execute(''' + CREATE TABLE IF NOT EXISTS moyubie.`msg_${room.uuid}` ( $_columnMessageUuid VARCHAR(36) PRIMARY KEY, $_columnMessageUserName TEXT, - $_columnMessageCreateTime INTEGER, + $_columnMessageCreateTime DATETIME(6), $_columnMessageMessage TEXT, $_columnMessageSource TEXT, $_columnAskAI INTEGER - ) + ); '''); - final remoteDB = await getRemoteDb(); - if (remoteDB != null) { - await remoteDB.execute(''' + + await remoteDB.execute(''' BEGIN; + CREATE USER '$user'@'%' IDENTIFIED BY '$pwd'; + GRANT INSERT, SELECT on `moyubie`.`msg_${room.uuid}` TO '$user'@'%'; + COMMIT; + '''); + + // Update the connection to use the new user. + roomConn.setConnect(myTiDBConn.host, myTiDBConn.port, user, pwd); + // Looks like TiDB Serverless need some time to prepare the new users' connection. + // And immediate connection will fail. + } - INSERT IGNORE INTO $_tableChatRoom - (`$_columnChatRoomUuid`, `$_columnChatRoomName`, `$_columnChatRoomCreateTime`, `$_columnChatRoomConnectionToken`) VALUES - ('${chatRoom.uuid}', :name, '${chatRoom.createTime.toString()}', :token); + await remoteDB.execute( + ''' + INSERT IGNORE INTO moyubie.$_tableChatRoom + (`$_columnChatRoomUuid`, `$_columnChatRoomName`, `$_columnChatRoomCreateTime`, `$_columnChatRoomConnectionToken`, `$_columnChatRoomRole`) VALUES + ('${room.uuid}', :name, '${room.createTime.toString()}', :token, '${room.role.name}'); + ''', + { + "name": room.name, + "token": roomConn.toToken(), + }, + ); + } - CREATE TABLE IF NOT EXISTS `${chatRoom.uuid}` ( + // Use the user who is dedicated for this chat room to chat + room.connectionToken = roomConn.toToken(); + + final db = await _getDb(); + await db.insert( + _tableChatRoom, + room.toSQLMap(), + conflictAlgorithm: ConflictAlgorithm.replace, + ); + await db.execute(''' + CREATE TABLE IF NOT EXISTS `msg_${room.uuid}` ( $_columnMessageUuid VARCHAR(36) PRIMARY KEY, $_columnMessageUserName TEXT, - $_columnMessageCreateTime DATETIME, + $_columnMessageCreateTime INTEGER, $_columnMessageMessage TEXT, $_columnMessageSource TEXT, $_columnAskAI INTEGER - ); + ) + '''); - COMMIT; - ''', {"name": chatRoom.name, "token": chatRoom.connectionToken}); - } + return null; } Future updateChatRoom(ChatRoom chatRoom) async { @@ -311,19 +474,23 @@ class ChatRoomRepository { where: '$_columnChatRoomUuid = ?', whereArgs: [chatRoom.uuid], ); - final remoteDB = await getRemoteDb(); - if (remoteDB != null) { - await remoteDB.execute(''' - UPDATE $_tableChatRoom SET + + if (chatRoom.isHost()) { + final remoteDB = await getRemoteDb(myTiDBConn, true); + if (remoteDB != null) { + await remoteDB.execute(''' + UPDATE moyubie.$_tableChatRoom SET $_columnChatRoomName = :name, - $_columnChatRoomCreateTime = unixepoch('${chatRoom.createTime.toString()}'), + $_columnChatRoomCreateTime = '${chatRoom.createTime.toString()}'), $_columnChatRoomConnectionToken = :token - WHERE $_columnChatRoomUuid = '${chatRoom.uuid}' - ''', {"name": chatRoom.name, "token": chatRoom.connectionToken}); + WHERE $_columnChatRoomUuid = '${chatRoom.uuid}' + ''', {"name": chatRoom.name, "token": chatRoom.connectionToken}); + } } } - Future deleteChatRoom(String uuid) async { + Future deleteChatRoom(ChatRoom room) async { + final uuid = room.uuid; final db = await _getDb(); await db.transaction((txn) async { await txn.delete( @@ -332,21 +499,29 @@ class ChatRoomRepository { whereArgs: [uuid], ); }); - await db.execute('DROP TABLE IF EXISTS `$uuid`'); - - final remoteDB = await getRemoteDb(); - if (remoteDB != null) { - await remoteDB.execute(''' - DELETE FROM $_tableChatRoom WHERE $_columnChatRoomUuid = '$uuid' - '''); - await remoteDB.execute('DROP TABLE IF EXISTS `$uuid`'); + await db.execute('DROP TABLE IF EXISTS `msg_$uuid`'); + + final conn = TiDBConnection.fromToken(room.connectionToken); + + if (conn.hasSet() && room.isHost()) { + final remoteDB = await getRemoteDb(myTiDBConn, true); + if (remoteDB != null) { + await remoteDB.execute( + "DELETE FROM moyubie.$_tableChatRoom WHERE $_columnChatRoomUuid = '$uuid'"); + await remoteDB.execute("DROP TABLE IF EXISTS moyubie.`msg_$uuid`"); + // Do some protection, in case bug cause the root user removed. + if (!(conn.userName.split(".").length == 2 && + conn.userName.split(".")[1] == "root")) { + await remoteDB.execute("DROP USER IF EXISTS '${conn.userName}'"); + } + } } } - Future> getMessagesByChatRoomUUid(String uuid, + Future> getMessagesByChatRoomUUid(ChatRoom room, {int limit = 500}) async { final db = await _getDb(); - final List> maps = await db.query('`$uuid`', + final List> maps = await db.query('`msg_${room.uuid}`', orderBy: "$_columnMessageCreateTime desc", limit: limit); return List.from(maps.reversed.map((m) => Message( uuid: m[_columnMessageUuid], @@ -361,19 +536,22 @@ class ChatRoomRepository { } Future> getNewMessagesByChatRoomUuidRemote( - String uuid, DateTime? from) async { - final db = await getRemoteDb(); + ChatRoom room, DateTime? from) async { + final conn = ensureConnection(room.connectionToken); + final db = + await getRemoteDb(conn, false /* isHost should always be false */); if (db == null) { return Future(() => []); } String whereClause = ""; if (from != null) { - whereClause = "WHERE UNIX_TIMESTAMP($_columnMessageCreateTime) > ${from.millisecondsSinceEpoch ~/ 1000}"; + whereClause = + "WHERE UNIX_TIMESTAMP($_columnMessageCreateTime) > ${from.millisecondsSinceEpoch ~/ 1000}"; } var res; try { res = await db.execute(''' - SELECT * FROM `$uuid` $whereClause ORDER BY $_columnMessageCreateTime ASC; + SELECT * FROM `msg_${room.uuid}` $whereClause ORDER BY $_columnMessageCreateTime ASC; '''); } catch (e) { print("catch error"); @@ -394,16 +572,16 @@ class ChatRoomRepository { }).toList()); } - Future addMessage(String chatRoomUuid, Message message) async { + Future addMessage(ChatRoom room, Message message) async { final db = await _getDb(); await db.insert( - '`$chatRoomUuid`', + '`msg_${room.uuid}`', message.toSQLMap(), conflictAlgorithm: ConflictAlgorithm.replace, ); // Don't wait for remote message finish adding to TiDB. - var addRemote = addMessageRemote(chatRoomUuid, message); + var addRemote = addMessageRemote(room, message); addRemote .then((value) => { // todo @@ -413,19 +591,20 @@ class ChatRoomRepository { }); } - Future addMessageRemote(String chatRoomUuid, Message message) async { + Future addMessageRemote(ChatRoom room, Message message) async { try { - await insertMessageRemote(chatRoomUuid, message); + await insertMessageRemote(room, message); } catch (e) { if (e is MySQLServerException) { if (e.errorCode == 1146) { // Create the chat room. It is possible that user create chat room before adding mysql/TiDB connection. final rooms = await getChatRooms( - where: "$_columnChatRoomUuid = '$chatRoomUuid'"); + where: "$_columnChatRoomUuid = '${room.uuid}}'"); if (rooms.length == 1) { // Add chat room again. - await addChatRoom(rooms.first); - await insertMessageRemote(chatRoomUuid, message); + final newRoom = rooms.first; + await addChatRoom(newRoom); + await insertMessageRemote(newRoom, message); } else { // If it is not, then too weird. I give up! } @@ -441,11 +620,12 @@ class ChatRoomRepository { return null; } - Future insertMessageRemote(String chatRoomUuid, Message message) async { - var remoteDB = await getRemoteDb(); + Future insertMessageRemote(ChatRoom room, Message message) async { + final conn = ensureConnection(room.connectionToken); + final remoteDB = await getRemoteDb(conn, false); if (remoteDB != null) { // Must use SQL with param - await remoteDB.execute('''INSERT IGNORE INTO `$chatRoomUuid` + await remoteDB.execute('''INSERT IGNORE INTO moyubie.`msg_${room.uuid}` ($_columnMessageUuid, $_columnMessageUserName, $_columnMessageCreateTime, $_columnMessageMessage, $_columnMessageSource, $_columnAskAI) VALUES (:uuid, :user, :createTime, :message, :source, :askAI)''', { "uuid": message.uuid, @@ -457,19 +637,4 @@ class ChatRoomRepository { }); } } - - Future deleteMessage(String chatRoomUuid, String messageUuid) async { - final db = await _getDb(); - await db.delete( - '`$chatRoomUuid`', - where: '$_columnMessageUuid = ?', - whereArgs: [messageUuid], - ); - - final remoteDB = await getRemoteDb(); - if (remoteDB != null) { - await remoteDB.execute( - "DELETE FROM `$chatRoomUuid` WHERE $_columnMessageUuid = '$messageUuid'"); - } - } } diff --git a/lib/repository/message.dart b/lib/repository/message.dart index 8f44618..9e5648a 100644 --- a/lib/repository/message.dart +++ b/lib/repository/message.dart @@ -1,7 +1,6 @@ import 'package:dart_openai/dart_openai.dart'; import 'package:flutter/foundation.dart'; import 'package:moyubie/controller/settings.dart'; -import 'package:moyubie/data/glm.dart'; import 'package:moyubie/data/llm.dart'; import 'package:get_storage/get_storage.dart'; import 'package:uuid/uuid.dart'; @@ -74,8 +73,4 @@ class MessageRepository { onResponse, errorCallback, onSuccess); } } - - deleteMessage(String chatRoomUuid, String messageUuid) { - ChatRoomRepository().deleteMessage(chatRoomUuid, messageUuid); - } } diff --git a/lib/utils/tidb.dart b/lib/utils/tidb.dart new file mode 100644 index 0000000..bdb336b --- /dev/null +++ b/lib/utils/tidb.dart @@ -0,0 +1,49 @@ +// Return (host, port, user, password). +// If port == 0, means "host" is the erorr message. +(String, int, String, String) parseTiDBConnectionText(String text) { + if (text.isEmpty) { + return ("", 0, "", ""); + } + + text = text.replaceFirst(" -p", " -p "); + final options = text.split(" "); + var nextOpts = List.from(options); + nextOpts.removeAt(0); + nextOpts.add(""); + String user = ""; + String host = ""; + int port = 0; + String password = ""; + + try { + for (int i = 0; i < options.length; i += 1) { + final opt = options[i]; + final nextOpt = nextOpts[i]; + switch (opt) { + case "-u": + case "--user": + user = nextOpt.replaceAll("'", ""); + user = user.replaceAll('"', ""); + user = user.replaceAll("'", ""); + break; + case "-h": + case "--host": + host = nextOpt; + break; + case "-P": + case "--port": + port = int.parse(nextOpt); + break; + case "-p": + case "--password": + password = nextOpt; + break; + default: + } + } + + return (host, port, user, password); + } catch (e) { + return (e.toString(), 0, "", ""); + } +}