Skip to content

Commit

Permalink
retry for chatroom
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Jul 27, 2023
1 parent c4f761a commit 4edee96
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 43 deletions.
102 changes: 59 additions & 43 deletions lib/repository/chat_room.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import 'dart:async';
import 'dart:convert';
import 'dart:math';

import 'package:mysql_client/exception.dart';
import 'package:sqflite/sqflite.dart';
import 'package:moyubie/utils/tidb.dart';
import 'package:firebase_analytics/firebase_analytics.dart';
import 'package:path/path.dart';
import 'package:mysql_client/mysql_client.dart';
import 'package:sqflite_common_ffi/sqflite_ffi.dart';
import 'dart:convert';
import 'dart:math';

import 'package:uuid/uuid.dart';
import 'package:retry/retry.dart';

class TiDBConnection {
MySQLConnection? connection;
Expand Down Expand Up @@ -194,6 +195,13 @@ class ChatRoomRepository {
static const String _columnMessageSource = 'source';
static const String _columnAskAI = 'ask_ai';

static const RetryOptions _connectRetry =
RetryOptions(delayFactor: Duration(milliseconds: 500), maxAttempts: 5);
static const RetryOptions _queryRetry =
RetryOptions(delayFactor: Duration(milliseconds: 200), maxAttempts: 4);
static const RetryOptions _longRetry =
RetryOptions(delayFactor: Duration(milliseconds: 500), maxAttempts: 4);

static var myTiDBConn = TiDBConnection();
// TODO: We haven't implement connection GC yet!!!
static var connMap = <String, TiDBConnection>{};
Expand Down Expand Up @@ -268,16 +276,17 @@ class ChatRoomRepository {
Future<bool> removeDatabaseRemote() async {
final db = await getRemoteDb(myTiDBConn, true);
if (db != null) {
var res = await db.execute("SHOW DATABASES LIKE 'moyubie';");
var res = await _queryRetry
.retry(() => db.execute("SHOW DATABASES LIKE 'moyubie';"));
if (res.rows.isNotEmpty) {
await db.execute("DROP DATABASE moyubie;");
await _queryRetry.retry(() => db.execute("DROP DATABASE moyubie;"));
}

res = await db.execute(
"SELECT `user` FROM mysql.user where `user` like '${myTiDBConn.userNamePrefix}.MYB_%';");
res = await _queryRetry.retry(() => db.execute(
"SELECT `user` FROM mysql.user where `user` like '${myTiDBConn.userNamePrefix}.MYB_%';"));
for (final row in res.rows) {
final user = row.colByName("user");
await db.execute("DROP USER '$user';");
await _queryRetry.retry(() => db.execute("DROP USER '$user';"));
}

await myTiDBConn.close();
Expand Down Expand Up @@ -309,7 +318,7 @@ class ChatRoomRepository {
password: conn.password);
conn.connection = dbConn;

await dbConn.connect();
await _connectRetry.retry(() => dbConn.connect());

dbConn.onClose(() {
// I haven't check the client carefully.
Expand All @@ -318,23 +327,26 @@ class ChatRoomRepository {
});

if (isHost) {
var res = await dbConn.execute("SHOW DATABASES LIKE 'moyubie';");
var res = await _queryRetry
.retry(() => dbConn.execute("SHOW DATABASES LIKE 'moyubie';"));
if (res.rows.isEmpty) {
FirebaseAnalytics.instance.logEvent(name: "remote_createdb");
await dbConn.execute("CREATE DATABASE IF NOT EXISTS moyubie;");
await _queryRetry.retry(
() => dbConn.execute("CREATE DATABASE IF NOT EXISTS moyubie;"));
}
await dbConn.execute("USE moyubie;");
res = await dbConn.execute("SHOW TABLES LIKE 'chat_room';");
await _queryRetry.retry(() => dbConn.execute("USE moyubie;"));
res = await _queryRetry
.retry(() => dbConn.execute("SHOW TABLES LIKE 'chat_room';"));
if (res.rows.isEmpty) {
await dbConn.execute('''
await _longRetry.retry(() => dbConn.execute('''
CREATE TABLE IF NOT EXISTS $_tableChatRoom (
$_columnChatRoomUuid VARCHAR(36) PRIMARY KEY,
$_columnChatRoomName TEXT,
$_columnChatRoomCreateTime DATETIME(6),
$_columnChatRoomConnectionToken TEXT,
$_columnChatRoomRole TEXT
)
''');
'''));
}
}
}
Expand Down Expand Up @@ -366,7 +378,8 @@ class ChatRoomRepository {
try {
final db = await getRemoteDb(myTiDBConn, true);
if (db == null) return Future(() => []);
var res = await db.execute("SELECT * FROM moyubie.$_tableChatRoom;");
var res = await _longRetry
.retry(() => db.execute("SELECT * FROM moyubie.$_tableChatRoom;"));
return res.rows.map((e) {
var maps = e.assoc();
return ChatRoom(
Expand Down Expand Up @@ -445,7 +458,7 @@ class ChatRoomRepository {
String pwd = genRandomString(20);
String msgTable = "msg_${room.uuid}";

await remoteDB.execute('''
await _longRetry.retry(() => remoteDB.execute('''
CREATE TABLE IF NOT EXISTS moyubie.`$msgTable` (
$_columnMessageUuid VARCHAR(36) PRIMARY KEY,
$_columnMessageUserName TEXT,
Expand All @@ -454,14 +467,14 @@ class ChatRoomRepository {
$_columnMessageSource TEXT,
$_columnAskAI INTEGER
);
''');
'''));

await remoteDB.execute('''
await _longRetry.retry(() => remoteDB.execute('''
BEGIN;
CREATE USER '$user'@'%' IDENTIFIED BY '$pwd';
GRANT INSERT, SELECT on `moyubie`.`$msgTable` TO '$user'@'%';
COMMIT;
''');
'''));

// Update the connection to use the new user.
roomConn.setConnect(
Expand All @@ -470,17 +483,17 @@ class ChatRoomRepository {
// And immediate connection will fail.
}

await remoteDB.execute(
'''
await _longRetry.retry(() => remoteDB.execute(
'''
INSERT IGNORE INTO moyubie.$_tableChatRoom
(`$_columnChatRoomUuid`, `$_columnChatRoomName`, `$_columnChatRoomCreateTime`, `$_columnChatRoomConnectionToken`, `$_columnChatRoomRole`) VALUES
('${room.uuid}', :name, '${room.createTime.toString()}', :token, '${room.role.name}');
''',
{
"name": room.name,
"token": roomConn.toToken(),
},
);
{
"name": room.name,
"token": roomConn.toToken(),
},
));
}

// Use the user who is dedicated for this chat room to chat
Expand Down Expand Up @@ -516,13 +529,13 @@ class ChatRoomRepository {
if (chatRoom.isHost()) {
final remoteDB = await getRemoteDb(myTiDBConn, true);
if (remoteDB != null) {
await remoteDB.execute('''
await _longRetry.retry(() => remoteDB.execute('''
UPDATE moyubie.$_tableChatRoom SET
$_columnChatRoomName = :name,
$_columnChatRoomCreateTime = '${chatRoom.createTime.toString()}',
$_columnChatRoomConnectionToken = :token
WHERE $_columnChatRoomUuid = '${chatRoom.uuid}'
''', {"name": chatRoom.name, "token": chatRoom.connectionToken});
''', {"name": chatRoom.name, "token": chatRoom.connectionToken}));
}
}
}
Expand All @@ -544,13 +557,15 @@ class ChatRoomRepository {
if (conn.hasSet() && room.isHost()) {
final remoteDB = await getRemoteDb(myTiDBConn, true);
if (remoteDB != null) {
await remoteDB.execute(
"DELETE FROM moyubie.$_tableChatRoom WHERE $_columnChatRoomUuid = '$uuid'");
await remoteDB.execute("DROP TABLE IF EXISTS moyubie.`msg_$uuid`");
await _longRetry.retry(() => remoteDB.execute(
"DELETE FROM moyubie.$_tableChatRoom WHERE $_columnChatRoomUuid = '$uuid'"));
await _queryRetry.retry(
() => remoteDB.execute("DROP TABLE IF EXISTS moyubie.`msg_$uuid`"));
// Do some protection, in case bug cause the root user removed.
if (!(conn.userName.split(".").length == 2 &&
conn.userName.split(".")[1] == "root")) {
await remoteDB.execute("DROP USER IF EXISTS '${conn.userName}'");
await _queryRetry.retry(
() => remoteDB.execute("DROP USER IF EXISTS '${conn.userName}'"));
}
}
}
Expand Down Expand Up @@ -592,7 +607,7 @@ class ChatRoomRepository {
try {
sql =
"SELECT * FROM moyubie.`msg_${room.uuid}` $whereClause ORDER BY $_columnMessageCreateTime desc limit $limit;";
res = await db.execute(sql);
res = await _longRetry.retry(() => db.execute(sql));
} catch (e) {
print("catch error: $sql, error: ${e.toString()}");
return Future(() => []);
Expand Down Expand Up @@ -667,16 +682,17 @@ class ChatRoomRepository {
// Must use SQL with param
// TODO This mysql client does not support batch?
for (final m in messages) {
await remoteDB.execute('''INSERT IGNORE INTO moyubie.`msg_${room.uuid}`
await _longRetry.retry(() =>
remoteDB.execute('''INSERT IGNORE INTO moyubie.`msg_${room.uuid}`
($_columnMessageUuid, $_columnMessageUserName, $_columnMessageCreateTime, $_columnMessageMessage, $_columnMessageSource, $_columnAskAI) VALUES
(:uuid, :user, :createTime, :message, :source, :askAI)''', {
"uuid": m.uuid,
"user": m.userName,
"createTime": m.createTime.toString(),
"message": m.message,
"source": m.source.name,
"askAI": m.ask_ai ? 1 : 0
});
"uuid": m.uuid,
"user": m.userName,
"createTime": m.createTime.toString(),
"message": m.message,
"source": m.source.name,
"askAI": m.ask_ai ? 1 : 0
}));
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,14 @@ packages:
url: "https://pub.dev"
source: hosted
version: "1.2.3"
retry:
dependency: "direct main"
description:
name: retry
sha256: "822e118d5b3aafed083109c72d5f484c6dc66707885e07c0fbcb8b986bba7efc"
url: "https://pub.dev"
source: hosted
version: "3.1.2"
scroll_to_index:
dependency: transitive
description:
Expand Down
1 change: 1 addition & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies:
firebase_core: ^2.15.0
timeago: ^3.5.0
stream_transform: ^2.1.0
retry: ^3.1.2

dev_dependencies:
flutter_test:
Expand Down

0 comments on commit 4edee96

Please sign in to comment.