Skip to content

Commit

Permalink
feat: add options for datav observability datasource #290
Browse files Browse the repository at this point in the history
  • Loading branch information
sunface committed Oct 17, 2023
1 parent a3908ef commit dc432df
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 82 deletions.
37 changes: 31 additions & 6 deletions query/internal/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package datasource
import (
"context"
"database/sql"
"encoding/json"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -94,9 +95,16 @@ func SaveDatasource(c *gin.Context) {
}

now := time.Now()
data, err := json.Marshal(ds.Data)
if err != nil {
logger.Warn("Error encode datasource data", "error", err)
c.JSON(500, common.RespError(e.Internal))
return
}

if ds.Id == 0 {
// create
res, err := db.Conn.ExecContext(c.Request.Context(), "INSERT INTO datasource (name,type,url,team_id,created,updated) VALUES (?,?,?,?,?,?)", ds.Name, ds.Type, ds.URL, ds.TeamId, now, now)
res, err := db.Conn.ExecContext(c.Request.Context(), "INSERT INTO datasource (name,type,url,team_id,data,created,updated) VALUES (?,?,?,?,?,?,?)", ds.Name, ds.Type, ds.URL, ds.TeamId, data, now, now)
if err != nil {
if e.IsErrUniqueConstraint(err) {
c.JSON(http.StatusBadRequest, common.RespError("name alread exist"))
Expand All @@ -116,7 +124,7 @@ func SaveDatasource(c *gin.Context) {
return
}
// update
_, err = db.Conn.ExecContext(c.Request.Context(), "UPDATE datasource SET name=?,type=?,url=?,updated=? WHERE id=?", ds.Name, ds.Type, ds.URL, now, ds.Id)
_, err = db.Conn.ExecContext(c.Request.Context(), "UPDATE datasource SET name=?,type=?,url=?,data=?,updated=? WHERE id=?", ds.Name, ds.Type, ds.URL, data, now, ds.Id)
if err != nil {
if e.IsErrUniqueConstraint(err) {
c.JSON(http.StatusBadRequest, common.RespError("name alread exist"))
Expand All @@ -139,9 +147,9 @@ func GetDatasources(c *gin.Context) {
var err error

if strings.TrimSpace(teamId) != "" {
rows, err = db.Conn.QueryContext(c.Request.Context(), "SELECT id,name,type,url,team_id, created FROM datasource WHERE team_id=?", teamId)
rows, err = db.Conn.QueryContext(c.Request.Context(), "SELECT id,name,type,url,team_id,data, created FROM datasource WHERE team_id=?", teamId)
} else {
rows, err = db.Conn.QueryContext(c.Request.Context(), "SELECT id,name,type,url,team_id, created FROM datasource")
rows, err = db.Conn.QueryContext(c.Request.Context(), "SELECT id,name,type,url,team_id,data, created FROM datasource")
}

if err != nil {
Expand All @@ -152,12 +160,22 @@ func GetDatasources(c *gin.Context) {
defer rows.Close()
for rows.Next() {
ds := &models.Datasource{}
err := rows.Scan(&ds.Id, &ds.Name, &ds.Type, &ds.URL, &ds.TeamId, &ds.Created)
var rawdata []byte
err := rows.Scan(&ds.Id, &ds.Name, &ds.Type, &ds.URL, &ds.TeamId, &rawdata, &ds.Created)
if err != nil {
logger.Warn("get datasource error", "error", err)
c.JSON(http.StatusInternalServerError, common.RespInternalError())
return
}
if rawdata != nil {
err = json.Unmarshal(rawdata, &ds.Data)
if err != nil {
logger.Warn("Error decode datasource data", "error", err, "data", rawdata)
c.JSON(http.StatusInternalServerError, common.RespInternalError())
continue
}
}

dss = append(dss, ds)
}

Expand Down Expand Up @@ -212,7 +230,14 @@ func GetDatasource(ctx context.Context, id int64) (*models.Datasource, error) {
ds, ok := datasources[id]
if !ok {
ds := &models.Datasource{}
err := db.Conn.QueryRowContext(ctx, "SELECT name,type,url, created FROM datasource WHERE id=?", id).Scan(&ds.Name, &ds.Type, &ds.URL, &ds.Created)
var rawdata []byte
err := db.Conn.QueryRowContext(ctx, "SELECT name,type,url,data, created FROM datasource WHERE id=?", id).Scan(&ds.Name, &ds.Type, &ds.URL, &rawdata, &ds.Created)
if err != nil {
return nil, err
}
if rawdata != nil {
err = json.Unmarshal(rawdata, &ds.Data)
}
return ds, err
}

Expand Down
5 changes: 5 additions & 0 deletions query/internal/plugins/builtin/observability/api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package api

const (
TestDatasourceAPI = "testDatasource"
)
10 changes: 10 additions & 0 deletions query/internal/plugins/builtin/observability/api/testDatasource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package api

import (
"github.com/DataObserve/datav/query/pkg/models"
"github.com/gin-gonic/gin"
)

func TestDatasource(c *gin.Context, ds *models.Datasource) models.PluginResult {
return models.PluginResult{models.PluginStatusSuccess, "", nil}
}
164 changes: 164 additions & 0 deletions query/internal/plugins/builtin/observability/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package clickhouse

import (
"context"
"net"
"reflect"
"strings"
"sync"
"time"

ch "github.com/ClickHouse/clickhouse-go/v2"
"github.com/DataObserve/datav/query/internal/plugins/builtin/observability/api"
"github.com/DataObserve/datav/query/pkg/colorlog"
"github.com/DataObserve/datav/query/pkg/models"
"github.com/gin-gonic/gin"
)

/* Query plugin for clickhouse database */

var datasourceName = "observability"

type ObservabilityPlugin struct{}

var conns = make(map[int64]ch.Conn)
var connsLock = &sync.Mutex{}

func (p *ObservabilityPlugin) Query(c *gin.Context, ds *models.Datasource) models.PluginResult {
query := c.Query("query")
if query == api.TestDatasourceAPI {
return testDatasource(c)
}

conn, ok := conns[ds.Id]
if !ok {
var err error
conn, err = connectToClickhouse(ds.URL, "default", "default", "")
if err != nil {
colorlog.RootLogger.Warn("connect to clickhouse error:", err, "ds_id", ds.Id, "url", ds.URL)
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)

}
connsLock.Lock()
conns[ds.Id] = conn
connsLock.Unlock()
}

rows, err := conn.Query(c.Request.Context(), query)
if err != nil {
colorlog.RootLogger.Info("Error query clickhouse :", "error", err, "ds_id", ds.Id, "query:", query)
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}
defer rows.Close()

columns := rows.Columns()
columnTypes := rows.ColumnTypes()
types := make(map[string]string)
data := make([][]interface{}, 0)
for rows.Next() {
v := make([]interface{}, len(columns))
for i := range v {
t := columnTypes[i].ScanType()
v[i] = reflect.New(t).Interface()

tp := t.String()
if tp == "time.Time" {
types[columns[i]] = "time"
}
}

err = rows.Scan(v...)
if err != nil {
colorlog.RootLogger.Info("Error scan clickhouse :", "error", err, "ds_id", ds.Id)
continue
}

for i, v0 := range v {
v1, ok := v0.(*time.Time)
if ok {
v[i] = v1.Unix()
}
}

data = append(data, v)
}

return models.PluginResult{
Status: models.PluginStatusSuccess,
Error: "",
Data: map[string]interface{}{
"columns": columns,
"data": data,
"types": types,
}}
}

func init() {
// register datasource
models.RegisterPlugin(datasourceName, &ObservabilityPlugin{})
}

func connectToClickhouse(url, database, username, password string) (ch.Conn, error) {
conn, err := ch.Open(&ch.Options{
Addr: strings.Split(url, ","),
Auth: ch.Auth{
Database: database,
Username: username,
Password: password,
},
DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "tcp", addr)
},
Debug: false,
// Debugf: func(format string, v ...any) {
// fmt.Printf(format, v)
// },
Settings: ch.Settings{
"max_execution_time": 60,
},
Compression: &ch.Compression{
Method: ch.CompressionLZ4,
},
DialTimeout: time.Second * 30,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnMaxLifetime: time.Duration(10) * time.Minute,
ConnOpenStrategy: ch.ConnOpenInOrder,
BlockBufferSize: 10,
MaxCompressionBuffer: 10240,
ClientInfo: ch.ClientInfo{ // optional, please see Client info section in the README.md
Products: []struct {
Name string
Version string
}{
{Name: "my-app", Version: "0.1"},
},
},
})
if err != nil {
return nil, err
}

err = conn.Ping(context.Background())
if err != nil {
return nil, err
}

return conn, nil
}

func testDatasource(c *gin.Context) models.PluginResult {
url := c.Query("url")
conn, err := connectToClickhouse(url, "default", "default", "")
if err != nil {
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}

err = conn.Ping(context.Background())
if err != nil {
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}

return models.GenPluginResult(models.PluginStatusSuccess, "", nil)
}
2 changes: 2 additions & 0 deletions query/internal/plugins/builtin/plugins.go
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
package builtin

import _ "github.com/DataObserve/datav/query/internal/plugins/builtin/observability"
2 changes: 2 additions & 0 deletions query/internal/proxy/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package proxy

import (
"bytes"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -47,6 +48,7 @@ func ProxyDatasource(c *gin.Context) {
}

queryPlugin := models.GetPlugin(ds.Type)
fmt.Println(ds.Type, queryPlugin)
if queryPlugin != nil {
result := queryPlugin.Query(c, ds)
c.JSON(http.StatusOK, result)
Expand Down
8 changes: 8 additions & 0 deletions query/pkg/models/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@ func GetPlugin(name string) Plugin {
func RegisterPlugin(name string, p Plugin) {
plugins[name] = p
}

func GenPluginResult(status, err string, data interface{}) PluginResult {
return PluginResult{
Status: status,
Error: err,
Data: data,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,51 @@
// limitations under the License.
import { Datasource } from "types/datasource"
import React from "react";
import FormItem from "components/form/Item";
import { Input, Text } from "@chakra-ui/react";
interface Props {
datasource: Datasource
onChange: any
}

const HttpDatasourceEditor = ({datasource, onChange}: Props) => {
return (<></>)
}
const defaultUrl = "localhost:9000"
const DatasourceEditor = ({ datasource, onChange }: Props) => {
if (datasource.url === null) {
onChange(d => { d.url = defaultUrl })
return
}

export default HttpDatasourceEditor
if (!datasource.data) {
onChange(d => { d.data = {database: "default", username: "default", password: "" } })
return
}
return (<>
<Text>Clickhouse</Text>
<FormItem title="URL">
<Input value={datasource.url} placeholder={defaultUrl} onChange={e => {
const v = e.currentTarget.value
onChange((d: Datasource) => { d.url = v })
}} />
</FormItem>
<FormItem title="Database">
<Input value={datasource.data.database} onChange={e => {
const v = e.currentTarget.value
onChange((d: Datasource) => { d.data.database = v })
}} placeholder="clickhouse database" />
</FormItem>
<FormItem title="Username">
<Input value={datasource.data.username} placeholder="clickhouse username" onChange={e => {
const v = e.currentTarget.value
onChange((d: Datasource) => { d.data.username = v })
}}/>
</FormItem>
<FormItem title="Password">
<Input type="password" value={datasource.data.password} placeholder="clickhouse password" onChange={e => {
const v = e.currentTarget.value
onChange((d: Datasource) => { d.data.password = v })
}}/>
</FormItem>
</>)
}

export const isHttpDatasourceValid = (ds: Datasource) => {

}
export default DatasourceEditor
Loading

0 comments on commit dc432df

Please sign in to comment.