Skip to content

Commit

Permalink
feat: Add Postgres source
Browse files Browse the repository at this point in the history
  • Loading branch information
blank038 committed Oct 30, 2024
1 parent d89ae72 commit 52d6400
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.aiyostudio.esync.internal.config

import com.aiyostudio.esync.common.repository.impl.MysqlRepositoryImpl
import com.aiyostudio.esync.common.repository.impl.PostgresRepositoryImpl
import com.aiyostudio.esync.internal.api.event.InitModulesEvent
import com.aiyostudio.esync.internal.handler.CacheHandler
import com.aiyostudio.esync.internal.handler.ModuleHandler
Expand Down Expand Up @@ -55,21 +56,16 @@ object SyncConfig {
LoggerUtil.print("&cFailed to initialize repository, check the config.yml file.", true)
return
}
RepositoryHandler.repository = when(type) {
"mysql" -> MysqlRepositoryImpl(
sourceConfig.getString("url"),
sourceConfig.getString("user"),
sourceConfig.getString("password")
)
"mysql-variant" -> MysqlVariantRepositoryImpl(
sourceConfig.getString("url"),
sourceConfig.getString("user"),
sourceConfig.getString("password")
)
val url = sourceConfig.getString("url")
val user = sourceConfig.getString("user")
val password = sourceConfig.getString("password")
RepositoryHandler.repository = when (type) {
"mysql" -> MysqlRepositoryImpl(url, user, password)
"mysql-variant" -> MysqlVariantRepositoryImpl(url, user, password)
"postgres" -> PostgresRepositoryImpl(url, user, password)
else -> throw NullPointerException("Failed to initialize repository.")
}
RepositoryHandler.repository?.run { this.init() }

LoggerUtil.print("&6 * &fSync source: &e${RepositoryHandler.repository?.id ?: "NONE"}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,28 @@ import java.sql.Connection
import java.sql.DriverManager
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

open class MysqlRepositoryImpl(
private val url: String,
private val user: String,
private val password: String
) : IRepository {
protected val esyncDataTable = "`esync_data`"
protected val sql = arrayOf(
protected val esyncDataTable = "esync_data"
protected open val sql = arrayOf(
"""
CREATE TABLE IF NOT EXISTS $esyncDataTable
(
`id` BIGINT AUTO_INCREMENT NOT NULL,
`owner_uuid` VARCHAR(40) NOT NULL,
`module` VARCHAR(100) NOT NULL,
`data` MEDIUMBLOB NOT NULL,
`state` ENUM ('COMPLETE', 'WAITING', 'LOCKED'),
PRIMARY KEY (`id`),
INDEX idx_owner (owner_uuid),
INDEX idx_state (state),
INDEX idx_module (module)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS $esyncDataTable
(
`id` BIGINT AUTO_INCREMENT NOT NULL,
`owner_uuid` VARCHAR(40) NOT NULL,
`module` VARCHAR(100) NOT NULL,
`data` MEDIUMBLOB NOT NULL,
`state` ENUM ('COMPLETE', 'WAITING', 'LOCKED'),
PRIMARY KEY (`id`),
INDEX idx_owner (owner_uuid),
INDEX idx_state (state),
INDEX idx_module (module)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
""".trimIndent()
)
override val id = "MySQL"
Expand All @@ -46,10 +45,12 @@ open class MysqlRepositoryImpl(
val result = AtomicBoolean(false)
this.connect {
val sql = "SELECT state FROM $esyncDataTable WHERE owner_uuid = ? AND module = ?"
it.prepareStatement(sql).use { statement ->
statement.setString(1, uuid.toString())
statement.setString(2, module)
statement.executeQuery().use { rs -> result.set(rs.next()) }
it.use { conn ->
conn.prepareStatement(sql).use { statement ->
statement.setString(1, uuid.toString())
statement.setString(2, module)
statement.executeQuery().use { rs -> result.set(rs.next()) }
}
}
}
return result.get()
Expand Down Expand Up @@ -143,40 +144,36 @@ open class MysqlRepositoryImpl(
override fun disable() {
}

private fun connectTransaction(uuid: UUID, module: String, block: (connect: Connection, id: Int) -> Unit) {
open fun connectTransaction(uuid: UUID, module: String, block: (connect: Connection, id: Int) -> Unit) {
with(this.getConnection()) {
this.autoCommit = false
try {
this.transactionIsolation = Connection.TRANSACTION_READ_COMMITTED
val lockQuery = "SELECT id FROM esync_data WHERE owner_uuid = ? AND module = ? FOR UPDATE;"
val id = AtomicInteger(-1)
this.prepareStatement(lockQuery).use { statement ->
statement.setString(1, uuid.toString())
statement.setString(2, module)
statement.executeQuery().use { rs ->
if (rs.next()) {
id.set(rs.getInt(1))
this.use {
this.autoCommit = false
try {
this.transactionIsolation = Connection.TRANSACTION_READ_COMMITTED
val lockQuery = "SELECT id FROM esync_data WHERE owner_uuid = ? AND module = ? FOR UPDATE;"
var id = -1
this.prepareStatement(lockQuery).use { statement ->
statement.setString(1, uuid.toString())
statement.setString(2, module)
statement.executeQuery().use { rs ->
if (rs.next()) {
id = rs.getInt(1)
}
}
}
block(this, id)
this.commit()
} catch (e: Exception) {
this.rollback()
} finally {
this.autoCommit = true
}
block(this, id.get())
this.commit()
} catch (e: Exception) {
this.rollback()
} finally {
this.close()
}
}
}

private fun connect(block: (connect: Connection) -> Unit) {
with(this.getConnection()) {
try {
block(this)
} finally {
this.close()
}
}
open fun connect(block: (connect: Connection) -> Unit) {
with(this.getConnection()) { this.use { block(this) } }
}

open fun getConnection(): Connection {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.aiyostudio.esync.common.repository.impl

import com.aiyostudio.esync.common.enums.SyncState
import java.sql.Types
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

class PostgresRepositoryImpl(
url: String,
user: String,
password: String
) : MysqlRepositoryImpl(url, user, password) {
override val sql = arrayOf(
"""
DO
${'$'}${'$'}
BEGIN
CREATE TYPE sync_state AS ENUM ('COMPLETE', 'WAITING', 'LOCKED');
EXCEPTION
WHEN duplicate_object THEN null;
END
${'$'}${'$'};
""".trimIndent(),
"""
CREATE TABLE IF NOT EXISTS $esyncDataTable
(
id SERIAL PRIMARY KEY NOT NULL,
owner_uuid CHARACTER(40) NOT NULL,
module CHARACTER(100) NOT NULL,
data BYTEA NOT NULL,
state sync_state NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_owner ON $esyncDataTable (owner_uuid);
CREATE INDEX IF NOT EXISTS idx_state ON $esyncDataTable (state);
CREATE INDEX IF NOT EXISTS idx_module ON $esyncDataTable (module);
""".trimIndent()
)
override val id: String = "PostgreSQL"

init {
Class.forName("org.postgresql.Driver")
}

override fun queryData(uuid: UUID, module: String): ByteArray? {
val result = AtomicReference<ByteArray>()
this.connect { conn ->
val sql = "SELECT data FROM $esyncDataTable WHERE owner_uuid = ? AND module = ?"
conn.prepareStatement(sql).use { statement ->
statement.setString(1, uuid.toString())
statement.setString(2, module)
statement.executeQuery().use rsUse@{ rs ->
if (!rs.next()) {
return@rsUse
}
result.set(rs.getBytes(1))
}
}
}
return result.get()
}

override fun insert(uuid: UUID, module: String, bytea: ByteArray, state: SyncState): Boolean {
if (this.isExists(uuid, module)) {
return this.updateData(uuid, module, bytea) && this.updateState(uuid, module, state)
}
val result = AtomicBoolean(false)
this.connect {
val sql = "INSERT INTO $esyncDataTable(owner_uuid, module, data, state) VALUES (?, ?, ?, ?)"
it.prepareStatement(sql).use { statement ->
statement.setString(1, uuid.toString())
statement.setString(2, module)
statement.setBytes(3, bytea)
statement.setObject(4, state.name, Types.OTHER)
result.set(statement.executeUpdate() > 0)
}
}
return result.get()
}

override fun updateData(uuid: UUID, module: String, bytea: ByteArray): Boolean {
if (!this.isExists(uuid, module)) return false
val result = AtomicBoolean(false)
this.connectTransaction(uuid, module) { conn, id ->
val sql = "UPDATE $esyncDataTable SET data = ? WHERE id = ?"
conn.prepareStatement(sql).use { statement ->
statement.setBytes(1, bytea)
statement.setInt(2, id)
result.set(statement.executeUpdate() > 0)
}
}
return result.get()
}

override fun updateState(uuid: UUID, module: String, state: SyncState): Boolean {
if (!this.isExists(uuid, module)) return false
val result = AtomicBoolean(false)
this.connectTransaction(uuid, module) { conn, id ->
val sql = "UPDATE $esyncDataTable SET state = ? WHERE id = ?"
conn.prepareStatement(sql).use { statement ->
statement.setObject(1, state.name, Types.OTHER)
statement.setInt(2, id)
result.set(statement.executeUpdate() > 0)
}
}
return result.get()
}
}

0 comments on commit 52d6400

Please sign in to comment.