Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement chdb v3.0 Connect method #16

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions chdb/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ func NewConnect(opts map[string]string) (ret *connector, err error) {
if ok {
ret.udfPath = udfPath
}
if ret.session == nil {
ret.session, err = chdb.NewSession()
if err != nil {
return nil, err
}
}
return
}

Expand Down Expand Up @@ -243,7 +249,8 @@ type conn struct {
bufferSize int
useUnsafe bool
session *chdb.Session
QueryFun queryHandle

QueryFun queryHandle
}

func prepareValues(values []driver.Value) []driver.NamedValue {
Expand All @@ -267,6 +274,7 @@ func (c *conn) SetupQueryFun() {
if c.session != nil {
c.QueryFun = c.session.Query
}

}

func (c *conn) Query(query string, values []driver.Value) (driver.Rows, error) {
Expand Down Expand Up @@ -334,7 +342,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
}

buf := result.Buf()
if buf == nil {
if len(buf) == 0 {
return nil, fmt.Errorf("result is nil")
}
return c.driverType.PrepareRows(result, buf, c.bufferSize, c.useUnsafe)
Expand Down
191 changes: 151 additions & 40 deletions chdb/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,39 @@ import (
"github.com/chdb-io/chdb-go/chdb"
)

var (
session *chdb.Session
)

func globalSetup() error {
sess, err := chdb.NewSession()
if err != nil {
return err
}
session = sess
return nil
}

func globalTeardown() {
session.Cleanup()
session.Close()
}

func TestMain(m *testing.M) {
if err := globalSetup(); err != nil {
fmt.Println("Global setup failed:", err)
os.Exit(1)
}
// Run all tests.
exitCode := m.Run()

// Global teardown: clean up any resources here.
globalTeardown()

// Exit with the code returned by m.Run().
os.Exit(exitCode)
}

func TestDb(t *testing.T) {
db, err := sql.Open("chdb", "")
if err != nil {
Expand Down Expand Up @@ -112,37 +145,74 @@ func TestDbWithOpt(t *testing.T) {
}

func TestDbWithSession(t *testing.T) {
sessionDir, err := os.MkdirTemp("", "unittest-sessiondata")

session.Query(
"CREATE TABLE IF NOT EXISTS TestDbWithSession (id UInt32) ENGINE = MergeTree() ORDER BY id;")

session.Query("INSERT INTO TestDbWithSession VALUES (1), (2), (3);")

ret, err := session.Query("SELECT * FROM TestDbWithSession;")
if err != nil {
t.Fatalf("Query fail, err: %s", err)
}
if string(ret.Buf()) != "1\n2\n3\n" {
t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf()))
}
db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr()))
if err != nil {
t.Fatalf("open db fail, err: %s", err)
}
if db.Ping() != nil {
t.Fatalf("ping db fail, err: %s", err)
}
rows, err := db.Query("select * from TestDbWithSession;")
if err != nil {
t.Fatalf("create temp directory fail, err: %s", err)
t.Fatalf("exec create function fail, err: %s", err)
}
defer os.RemoveAll(sessionDir)
session, err := chdb.NewSession(sessionDir)
defer rows.Close()
cols, err := rows.Columns()
if err != nil {
t.Fatalf("new session fail, err: %s", err)
t.Fatalf("get result columns fail, err: %s", err)
}
if len(cols) != 1 {
t.Fatalf("result columns length shoule be 3, actual: %d", len(cols))
}
var bar = 0
var count = 1
for rows.Next() {
err = rows.Scan(&bar)
if err != nil {
t.Fatalf("scan fail, err: %s", err)
}
if bar != count {
t.Fatalf("result is not match, want: %d actual: %d", count, bar)
}
count++
}
defer session.Cleanup()
}

session.Query("CREATE DATABASE IF NOT EXISTS testdb; " +
"CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;")
func TestDbWithConnection(t *testing.T) {

session.Query("USE testdb; INSERT INTO testtable VALUES (1), (2), (3);")
session.Query(
"CREATE TABLE IF NOT EXISTS TestDbWithConnection (id UInt32) ENGINE = MergeTree() ORDER BY id;")

ret, err := session.Query("SELECT * FROM testtable;")
session.Query("INSERT INTO TestDbWithConnection VALUES (1), (2), (3);")

ret, err := session.Query("SELECT * FROM TestDbWithConnection;")
if err != nil {
t.Fatalf("Query fail, err: %s", err)
}
if string(ret.Buf()) != "1\n2\n3\n" {
t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf()))
}
db, err := sql.Open("chdb", fmt.Sprintf("session=%s", sessionDir))
db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr()))
if err != nil {
t.Fatalf("open db fail, err: %s", err)
}
if db.Ping() != nil {
t.Fatalf("ping db fail, err: %s", err)
}
rows, err := db.Query("select * from testtable;")
rows, err := db.Query("select * from TestDbWithConnection;")
if err != nil {
t.Fatalf("exec create function fail, err: %s", err)
}
Expand All @@ -168,37 +238,73 @@ func TestDbWithSession(t *testing.T) {
}
}

func TestQueryRow(t *testing.T) {
sessionDir, err := os.MkdirTemp("", "unittest-sessiondata")
func TestDbWithConnectionSqlDriverOnly(t *testing.T) {
db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr()))
if err != nil {
t.Fatalf("open db fail, err: %s", err)
}
if db.Ping() != nil {
t.Fatalf("ping db fail, err: %s", err)
}

_, err = db.Exec(
"CREATE TABLE IF NOT EXISTS TestDbWithConnectionSqlDriverOnly (id UInt32) ENGINE = MergeTree() ORDER BY id;")
if err != nil {
t.Fatalf("create temp directory fail, err: %s", err)
t.Fatalf("could not create database & table: %s", err)
}
defer os.RemoveAll(sessionDir)
session, err := chdb.NewSession(sessionDir)
_, err = db.Exec("INSERT INTO TestDbWithConnectionSqlDriverOnly VALUES (1), (2), (3);")
if err != nil {
t.Fatalf("new session fail, err: %s", err)
t.Fatalf("could not insert rows in the table: %s", err)
}
defer session.Cleanup()
session.Query("CREATE DATABASE IF NOT EXISTS testdb; " +
"CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;")

session.Query("USE testdb; INSERT INTO testtable VALUES (1), (2), (3);")
rows, err := db.Query("select * from TestDbWithConnectionSqlDriverOnly;")
if err != nil {
t.Fatalf("exec create function fail, err: %s", err)
}
defer rows.Close()
cols, err := rows.Columns()
if err != nil {
t.Fatalf("get result columns fail, err: %s", err)
}
if len(cols) != 1 {
t.Fatalf("result columns length shoule be 3, actual: %d", len(cols))
}
var bar = 0
var count = 1
for rows.Next() {
err = rows.Scan(&bar)
if err != nil {
t.Fatalf("scan fail, err: %s", err)
}
if bar != count {
t.Fatalf("result is not match, want: %d actual: %d", count, bar)
}
count++
}
}

func TestQueryRow(t *testing.T) {

session.Query(
"CREATE TABLE IF NOT EXISTS TestQueryRow (id UInt32) ENGINE = MergeTree() ORDER BY id;")

ret, err := session.Query("SELECT * FROM testtable;")
session.Query(" INSERT INTO TestQueryRow VALUES (1), (2), (3);")

ret, err := session.Query("SELECT * FROM TestQueryRow;")
if err != nil {
t.Fatalf("Query fail, err: %s", err)
}
if string(ret.Buf()) != "1\n2\n3\n" {
t.Errorf("Query result should be 1\n2\n3\n, got %s", string(ret.Buf()))
}
db, err := sql.Open("chdb", fmt.Sprintf("session=%s", sessionDir))
db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr()))
if err != nil {
t.Fatalf("open db fail, err: %s", err)
}
if db.Ping() != nil {
t.Fatalf("ping db fail, err: %s", err)
}
rows := db.QueryRow("select * from testtable;")
rows := db.QueryRow("select * from TestQueryRow;")

var bar = 0
var count = 1
Expand All @@ -217,32 +323,37 @@ func TestQueryRow(t *testing.T) {
}

func TestExec(t *testing.T) {
sessionDir, err := os.MkdirTemp("", "unittest-sessiondata")
if err != nil {
t.Fatalf("create temp directory fail, err: %s", err)
}
defer os.RemoveAll(sessionDir)
session, err := chdb.NewSession(sessionDir)
if err != nil {
t.Fatalf("new session fail, err: %s", err)
}
defer session.Cleanup()
session.Query("CREATE DATABASE IF NOT EXISTS testdb; " +
"CREATE TABLE IF NOT EXISTS testdb.testtable (id UInt32) ENGINE = MergeTree() ORDER BY id;")

db, err := sql.Open("chdb", fmt.Sprintf("session=%s", sessionDir))
session.Query(
"CREATE TABLE IF NOT EXISTS TestExec (id UInt32) ENGINE = MergeTree() ORDER BY id;")

db, err := sql.Open("chdb", fmt.Sprintf("session=%s", session.ConnStr()))
if err != nil {
t.Fatalf("open db fail, err: %s", err)
}
if db.Ping() != nil {
t.Fatalf("ping db fail, err: %s", err)
}

_, err = db.Exec("INSERT INTO testdb.testtable VALUES (1), (2), (3);")
tables, err := db.Query("SHOW TABLES;")
if err != nil {
t.Fatalf(err.Error())
}
defer tables.Close()
for tables.Next() {
var tblName string
if err := tables.Scan(&tblName); err != nil {
t.Fatal(err)
}
t.Log(tblName)
fmt.Printf("tblName: %v\n", tblName)
}

_, err = db.Exec("INSERT INTO TestExec VALUES (1), (2), (3);")
if err != nil {
t.Fatalf("exec failed, err: %s", err)
}
rows := db.QueryRow("select * from testdb.testtable;")
rows := db.QueryRow("select * from TestExec;")

var bar = 0
var count = 1
Expand Down
Loading
Loading