diff --git a/db.go b/db.go index 7dfb7615a..281945fca 100644 --- a/db.go +++ b/db.go @@ -40,6 +40,20 @@ func OpenDBWithDriver(driver string, dbstring string) (*sql.DB, error) { switch driver { case "postgres", "pgx", "sqlite3", "sqlite", "mysql", "sqlserver", "clickhouse", "vertica", "azuresql", "ydb", "libsql", "starrocks": return sql.Open(driver, dbstring) + case "clickhouse-replicated": + db, err := sql.Open("clickhouse", dbstring) + if err != nil { + return nil, fmt.Errorf("open db: %w", err) + } + _, err = db.Exec("SET insert_quorum=2") + if err != nil { + return nil, fmt.Errorf("SET insert_quorum %w", err) + } + _, err = db.Exec("SET select_sequential_consistency=1") + if err != nil { + return nil, fmt.Errorf("SET select_sequential_consistency %w", err) + } + return db, nil default: return nil, fmt.Errorf("unsupported driver %s", driver) } diff --git a/dialect.go b/dialect.go index ecebd144f..4527307bb 100644 --- a/dialect.go +++ b/dialect.go @@ -47,6 +47,8 @@ func SetDialect(s string) error { d = dialect.Tidb case "clickhouse": d = dialect.Clickhouse + case "clickhouse-replicated": + d = dialect.ClickhouseReplicated case "vertica": d = dialect.Vertica case "ydb": diff --git a/internal/dialect/dialectquery/clickhouse.go b/internal/dialect/dialectquery/clickhouse.go index 723efd4cc..8c046fb56 100644 --- a/internal/dialect/dialectquery/clickhouse.go +++ b/internal/dialect/dialectquery/clickhouse.go @@ -42,3 +42,44 @@ func (c *Clickhouse) GetLatestVersion(tableName string) string { q := `SELECT max(version_id) FROM %s` return fmt.Sprintf(q, tableName) } + +type ClickhouseReplicated struct{} + +var _ Querier = (*ClickhouseReplicated)(nil) + +func (c *ClickhouseReplicated) CreateTable(tableName string) string { + q := `CREATE TABLE IF NOT EXISTS %s ON CLUSTER '{cluster}' ( + version_id Int64, + is_applied UInt8, + date Date default now(), + tstamp DateTime default now() + ) + ENGINE = ReplicatedMergeTree() + ORDER BY (date)` + return fmt.Sprintf(q, tableName) +} + +func (c *ClickhouseReplicated) InsertVersion(tableName string) string { + q := `INSERT INTO %s (version_id, is_applied) VALUES ($1, $2)` + return fmt.Sprintf(q, tableName) +} + +func (c *ClickhouseReplicated) DeleteVersion(tableName string) string { + q := `ALTER TABLE %s DELETE WHERE version_id = $1 SETTINGS mutations_sync = 2` + return fmt.Sprintf(q, tableName) +} + +func (c *ClickhouseReplicated) GetMigrationByVersion(tableName string) string { + q := `SELECT tstamp, is_applied FROM %s WHERE version_id = $1 ORDER BY tstamp DESC LIMIT 1` + return fmt.Sprintf(q, tableName) +} + +func (c *ClickhouseReplicated) ListMigrations(tableName string) string { + q := `SELECT version_id, is_applied FROM %s ORDER BY version_id DESC` + return fmt.Sprintf(q, tableName) +} + +func (c *ClickhouseReplicated) GetLatestVersion(tableName string) string { + q := `SELECT max(version_id) FROM %s` + return fmt.Sprintf(q, tableName) +} diff --git a/internal/dialect/dialects.go b/internal/dialect/dialects.go index acf064258..ff60af7e6 100644 --- a/internal/dialect/dialects.go +++ b/internal/dialect/dialects.go @@ -4,15 +4,16 @@ package dialect type Dialect string const ( - Postgres Dialect = "postgres" - Mysql Dialect = "mysql" - Sqlite3 Dialect = "sqlite3" - Sqlserver Dialect = "sqlserver" - Redshift Dialect = "redshift" - Tidb Dialect = "tidb" - Clickhouse Dialect = "clickhouse" - Vertica Dialect = "vertica" - Ydb Dialect = "ydb" - Turso Dialect = "turso" - Starrocks Dialect = "starrocks" + Postgres Dialect = "postgres" + Mysql Dialect = "mysql" + Sqlite3 Dialect = "sqlite3" + Sqlserver Dialect = "sqlserver" + Redshift Dialect = "redshift" + Tidb Dialect = "tidb" + Clickhouse Dialect = "clickhouse" + ClickhouseReplicated Dialect = "clickhouse-replicated" + Vertica Dialect = "vertica" + Ydb Dialect = "ydb" + Turso Dialect = "turso" + Starrocks Dialect = "starrocks" ) diff --git a/internal/dialect/store.go b/internal/dialect/store.go index 5179cabaf..cac7b244d 100644 --- a/internal/dialect/store.go +++ b/internal/dialect/store.go @@ -63,6 +63,8 @@ func NewStore(d Dialect) (Store, error) { querier = &dialectquery.Tidb{} case Clickhouse: querier = &dialectquery.Clickhouse{} + case ClickhouseReplicated: + querier = &dialectquery.ClickhouseReplicated{} case Vertica: querier = &dialectquery.Vertica{} case Ydb: