-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c4f761a
commit 98a713a
Showing
3 changed files
with
135 additions
and
92 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,97 +1,116 @@ | ||
import 'dart:developer'; | ||
|
||
import 'package:flutter/widgets.dart'; | ||
import 'package:sqflite/sqflite.dart'; | ||
import 'package:get/get.dart'; | ||
import 'package:get/get_rx/get_rx.dart'; | ||
import 'package:moyubie/controller/settings.dart'; | ||
import 'package:moyubie/utils/tidb.dart'; | ||
import 'package:moyubie/repository/chat_room.dart'; | ||
import 'package:mysql_client/mysql_client.dart'; | ||
import 'package:stream_transform/stream_transform.dart'; | ||
|
||
class _CachedConnection { | ||
MySQLConnection? _last; | ||
class Tag { | ||
String name; | ||
DateTime time; | ||
|
||
_CachedConnection(Stream<MySQLConnection> stream) { | ||
stream.listen((event) { | ||
if (_last != null && _last != event) { | ||
log("Update connection. [old=$_last, new=$event]", | ||
name: "moyubie::_CachedConnection"); | ||
_last!.close(); | ||
} | ||
_last = event; | ||
}, | ||
onError: (err) => | ||
log("ERROR [$err]", name: "moyubie::_CachedConnection"), | ||
onDone: () => log("APP CLOSED", name: "moyubie::_CachedConnection"), | ||
cancelOnError: false); | ||
} | ||
Tag({ | ||
required this.name, | ||
required this.time, | ||
}); | ||
|
||
MySQLConnection? get value => _last; | ||
Map<String, dynamic> toSQLMap() { | ||
return { | ||
'name': name, | ||
'added_at': time.toIso8601String(), | ||
}; | ||
} | ||
} | ||
|
||
// TODO refactor TagsRepository and ChatRoomRepository | ||
class TagsRepository { | ||
factory TagsRepository.byConfig(SettingsController ctl, | ||
{bool forceInit = false}) { | ||
final stream = ctl.serverlessCmd.stream.switchMap((p0) { | ||
final (host, port, user, password, msgTable) = | ||
parseTiDBConnectionText(p0); | ||
if (host.isEmpty || port == 0 || user.isEmpty) { | ||
return Stream<MySQLConnection>.error(Exception("Invalid DB")); | ||
} | ||
return Stream.fromFuture(() async { | ||
final conn = await MySQLConnection.createConnection( | ||
host: host, port: port, userName: user, password: password); | ||
await conn.connect(); | ||
await prepareTables(conn); | ||
return conn; | ||
}()); | ||
}); | ||
return TagsRepository(_CachedConnection(stream)); | ||
} | ||
static const tableTags = "tags"; | ||
static const columnTagsName = "name"; | ||
// UTC time zone. SQLite: Text, TiDB: DateTime | ||
static const columnTagsAddedAt = "added_at"; | ||
|
||
final _CachedConnection _conn; | ||
|
||
TagsRepository(this._conn); | ||
Future<void> addNewTags(List<String> tags) async { | ||
var now = DateTime.now().toUtc(); | ||
return await _addNewTags(tags.map((n) => Tag(name: n, time: now)).toList()); | ||
} | ||
|
||
static const _tagName = "name"; | ||
static const _tagAddedAt = "added_at"; | ||
static const _table = "tags"; | ||
static const _db = "moyubie"; | ||
// Note that we don't wait for database operations | ||
Future<void> _addNewTags(List<Tag> tags) async { | ||
final db = ChatRoomRepository().getLocalDb(); | ||
db.then((db_) async { | ||
final batch = db_.batch(); | ||
for (final tag in tags) { | ||
db_.insert( | ||
tableTags, | ||
tag.toSQLMap(), | ||
conflictAlgorithm: ConflictAlgorithm.replace, | ||
); | ||
} | ||
await batch.commit(); | ||
}); | ||
|
||
static Future<void> prepareTables(MySQLConnection conn) async { | ||
await conn.execute("CREATE DATABASE IF NOT EXISTS $_db"); | ||
await conn.execute("CREATE TABLE IF NOT EXISTS `$_db`.$_table(" | ||
"$_tagName TEXT," | ||
"$_tagAddedAt DATETIME," | ||
"INDEX sand_of_time(added_at)" | ||
");"); | ||
} | ||
final remoteDB = ChatRoomRepository().getMyRemoteDb(); | ||
remoteDB.then((remoteDB_) async { | ||
if (remoteDB_ == null) { | ||
return; | ||
} | ||
|
||
Future<void> addNewTags(List<String> tags) async { | ||
final now = DateTime.now(); | ||
if (_conn.value == null) { | ||
throw Exception("The connection isn't ready for now..."); | ||
} | ||
final insert = await _conn.value!.prepare( | ||
"INSERT INTO $_db.$_table($_tagName, $_tagAddedAt) VALUES (?, ?);"); | ||
await Future.wait(tags.map((e) => insert.execute([e, now])), | ||
eagerError: true); | ||
for (final tag in tags) { | ||
await remoteDB_.execute( | ||
"INSERT IGNORE INTO moyubie.`$tableTags` VALUES (:name, :time)", | ||
{"name": tag.name, "time": tag.time.toString()}); | ||
} | ||
}); | ||
} | ||
|
||
Future<List<String>> fetchMostPopularTags(int limit) async { | ||
if (_conn.value == null) { | ||
throw Exception("The connection isn't ready for now..."); | ||
} | ||
final res = await _conn.value!.execute( | ||
"SELECT t1.NAME AS $_tagName FROM " | ||
"(SELECT COUNT($_tagName) AS CNT, $_tagName AS NAME FROM $_db.$_table GROUP BY $_tagName) AS t1" | ||
" ORDER BY t1.CNT DESC LIMIT :limit; ", | ||
{"limit": limit}); | ||
final output = <String>[]; | ||
for (final rs in res) { | ||
for (final row in rs.rows) { | ||
output.add(row.colByName(_tagName) as String); | ||
// Firt return what we have in local | ||
final db = await ChatRoomRepository().getLocalDb(); | ||
final List<Map<String, dynamic>> maps = await db.rawQuery(''' | ||
SELECT t1.NAME AS $columnTagsName FROM | ||
(SELECT COUNT($columnTagsName) AS CNT, $columnTagsName AS NAME FROM $tableTags GROUP BY $columnTagsName) AS t1 | ||
ORDER BY t1.CNT DESC LIMIT $limit; | ||
'''); | ||
|
||
final localTags = maps.map((e) { | ||
String tag_ = e[columnTagsName]; | ||
return tag_; | ||
}).toList(); | ||
|
||
// Then start async task to synchronize remote to local | ||
final remoteDB = ChatRoomRepository().getMyRemoteDb(); | ||
remoteDB.then((remoteDB_) async { | ||
if (remoteDB_ == null) { | ||
return; | ||
} | ||
} | ||
return output; | ||
final List<Map<String, dynamic>> maps = await db.query(tableTags, | ||
orderBy: "$columnTagsAddedAt DESC", limit: 1); | ||
DateTime? last; | ||
if (maps.isNotEmpty) { | ||
last = DateTime.parse(maps.first[columnTagsAddedAt]); | ||
} | ||
var sql = last == null | ||
? "SELECT * FROM moyubie.`$tableTags` ORDER BY `$columnTagsAddedAt` DESC LIMIT 100" | ||
: "SELECT * FROM moyubie.`$tableTags` WHERE `$columnTagsAddedAt` > '${last.toString()}' ORDER BY `$columnTagsAddedAt` DESC LIMIT 100"; | ||
final res = await remoteDB_.execute(sql); | ||
if (res.rows.isEmpty) { | ||
return; | ||
} | ||
final remoteTags = res.rows.map((e) { | ||
var map = e.assoc(); | ||
return Tag( | ||
name: map[columnTagsName]!, | ||
time: DateTime.parse(map[columnTagsAddedAt]!)); | ||
}).toList(); | ||
|
||
await _addNewTags(remoteTags); | ||
}); | ||
|
||
return localTags; | ||
} | ||
} |