Skip to content

Commit

Permalink
Add clickhouse-replicated dialect
Browse files Browse the repository at this point in the history
  • Loading branch information
chapsuk committed Sep 21, 2024
1 parent cf53a22 commit cbe9e6f
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 11 deletions.
14 changes: 14 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func OpenDBWithDriver(driver string, dbstring string) (*sql.DB, error) {
switch driver {
case "postgres", "pgx", "sqlite3", "sqlite", "mysql", "sqlserver", "clickhouse", "vertica", "azuresql", "ydb", "libsql", "starrocks":
return sql.Open(driver, dbstring)
case "clickhouse-replicated":
db, err := sql.Open("clickhouse", dbstring)
if err != nil {
return nil, fmt.Errorf("open db: %w", err)
}
_, err = db.Exec("SET insert_quorum=2")
if err != nil {
return nil, fmt.Errorf("SET insert_quorum %w", err)
}
_, err = db.Exec("SET select_sequential_consistency=1")
if err != nil {
return nil, fmt.Errorf("SET select_sequential_consistency %w", err)
}
return db, nil
default:
return nil, fmt.Errorf("unsupported driver %s", driver)
}
Expand Down
2 changes: 2 additions & 0 deletions dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func SetDialect(s string) error {
d = dialect.Tidb
case "clickhouse":
d = dialect.Clickhouse
case "clickhouse-replicated":
d = dialect.ClickhouseReplicated
case "vertica":
d = dialect.Vertica
case "ydb":
Expand Down
41 changes: 41 additions & 0 deletions internal/dialect/dialectquery/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,44 @@ func (c *Clickhouse) GetLatestVersion(tableName string) string {
q := `SELECT max(version_id) FROM %s`
return fmt.Sprintf(q, tableName)
}

type ClickhouseReplicated struct{}

var _ Querier = (*ClickhouseReplicated)(nil)

func (c *ClickhouseReplicated) CreateTable(tableName string) string {
q := `CREATE TABLE IF NOT EXISTS %s ON CLUSTER '{cluster}' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime default now()
)
ENGINE = ReplicatedMergeTree()
ORDER BY (date)`
return fmt.Sprintf(q, tableName)
}

func (c *ClickhouseReplicated) InsertVersion(tableName string) string {
q := `INSERT INTO %s (version_id, is_applied) VALUES ($1, $2)`
return fmt.Sprintf(q, tableName)
}

func (c *ClickhouseReplicated) DeleteVersion(tableName string) string {
q := `ALTER TABLE %s DELETE WHERE version_id = $1 SETTINGS mutations_sync = 2`
return fmt.Sprintf(q, tableName)
}

func (c *ClickhouseReplicated) GetMigrationByVersion(tableName string) string {
q := `SELECT tstamp, is_applied FROM %s WHERE version_id = $1 ORDER BY tstamp DESC LIMIT 1`
return fmt.Sprintf(q, tableName)
}

func (c *ClickhouseReplicated) ListMigrations(tableName string) string {
q := `SELECT version_id, is_applied FROM %s ORDER BY version_id DESC`
return fmt.Sprintf(q, tableName)
}

func (c *ClickhouseReplicated) GetLatestVersion(tableName string) string {
q := `SELECT max(version_id) FROM %s`
return fmt.Sprintf(q, tableName)
}
23 changes: 12 additions & 11 deletions internal/dialect/dialects.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ package dialect
type Dialect string

const (
Postgres Dialect = "postgres"
Mysql Dialect = "mysql"
Sqlite3 Dialect = "sqlite3"
Sqlserver Dialect = "sqlserver"
Redshift Dialect = "redshift"
Tidb Dialect = "tidb"
Clickhouse Dialect = "clickhouse"
Vertica Dialect = "vertica"
Ydb Dialect = "ydb"
Turso Dialect = "turso"
Starrocks Dialect = "starrocks"
Postgres Dialect = "postgres"
Mysql Dialect = "mysql"
Sqlite3 Dialect = "sqlite3"
Sqlserver Dialect = "sqlserver"
Redshift Dialect = "redshift"
Tidb Dialect = "tidb"
Clickhouse Dialect = "clickhouse"
ClickhouseReplicated Dialect = "clickhouse-replicated"
Vertica Dialect = "vertica"
Ydb Dialect = "ydb"
Turso Dialect = "turso"
Starrocks Dialect = "starrocks"
)
2 changes: 2 additions & 0 deletions internal/dialect/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func NewStore(d Dialect) (Store, error) {
querier = &dialectquery.Tidb{}
case Clickhouse:
querier = &dialectquery.Clickhouse{}
case ClickhouseReplicated:
querier = &dialectquery.ClickhouseReplicated{}
case Vertica:
querier = &dialectquery.Vertica{}
case Ydb:
Expand Down

0 comments on commit cbe9e6f

Please sign in to comment.