Skip to content

Commit

Permalink
refactor(indexer/postgres)!: use schema/indexer API and hide types/me…
Browse files Browse the repository at this point in the history
…thods (#21363)
  • Loading branch information
aaronc authored Aug 21, 2024
1 parent fb7775a commit e88c138
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 144 deletions.
4 changes: 2 additions & 2 deletions indexer/postgres/base_sql.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package postgres

// BaseSQL is the base SQL that is always included in the schema.
const BaseSQL = `
// baseSQL is the base SQL that is always included in the schema.
const baseSQL = `
CREATE OR REPLACE FUNCTION nanos_to_timestamptz(nanos bigint) RETURNS timestamptz AS $$
SELECT to_timestamp(nanos / 1000000000) + (nanos / 1000000000) * INTERVAL '1 microsecond'
$$ LANGUAGE SQL IMMUTABLE;
Expand Down
4 changes: 2 additions & 2 deletions indexer/postgres/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// createColumnDefinition writes a column definition within a CREATE TABLE statement for the field.
func (tm *ObjectIndexer) createColumnDefinition(writer io.Writer, field schema.Field) error {
func (tm *objectIndexer) createColumnDefinition(writer io.Writer, field schema.Field) error {
_, err := fmt.Fprintf(writer, "%q ", field.Name)
if err != nil {
return err
Expand Down Expand Up @@ -110,7 +110,7 @@ func simpleColumnType(kind schema.Kind) string {
// updatableColumnName is the name of the insertable/updatable column name for the field.
// This is the field name in most cases, except for time columns which are stored as nanos
// and then converted to timestamp generated columns.
func (tm *ObjectIndexer) updatableColumnName(field schema.Field) (name string, err error) {
func (tm *objectIndexer) updatableColumnName(field schema.Field) (name string, err error) {
name = field.Name
if field.Kind == schema.TimeKind {
name = fmt.Sprintf("%s_nanos", name)
Expand Down
4 changes: 2 additions & 2 deletions indexer/postgres/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"database/sql"
)

// DBConn is an interface that abstracts the *sql.DB, *sql.Tx and *sql.Conn types.
type DBConn interface {
// dbConn is an interface that abstracts the *sql.DB, *sql.Tx and *sql.Conn types.
type dbConn interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
Expand Down
20 changes: 10 additions & 10 deletions indexer/postgres/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ import (
"strings"
)

// CreateTable creates the table for the object type.
func (tm *ObjectIndexer) CreateTable(ctx context.Context, conn DBConn) error {
// createTable creates the table for the object type.
func (tm *objectIndexer) createTable(ctx context.Context, conn dbConn) error {
buf := new(strings.Builder)
err := tm.CreateTableSql(buf)
err := tm.createTableSql(buf)
if err != nil {
return err
}

sqlStr := buf.String()
if tm.options.Logger != nil {
tm.options.Logger(fmt.Sprintf("Creating table %s", tm.TableName()), sqlStr)
if tm.options.logger != nil {
tm.options.logger.Debug("Creating table %s", "table", tm.tableName(), "sql", sqlStr)
}
_, err = conn.ExecContext(ctx, sqlStr)
return err
}

// CreateTableSql generates a CREATE TABLE statement for the object type.
func (tm *ObjectIndexer) CreateTableSql(writer io.Writer) error {
_, err := fmt.Fprintf(writer, "CREATE TABLE IF NOT EXISTS %q (\n\t", tm.TableName())
// createTableSql generates a CREATE TABLE statement for the object type.
func (tm *objectIndexer) createTableSql(writer io.Writer) error {
_, err := fmt.Fprintf(writer, "CREATE TABLE IF NOT EXISTS %q (\n\t", tm.tableName())
if err != nil {
return err
}
Expand Down Expand Up @@ -53,7 +53,7 @@ func (tm *ObjectIndexer) CreateTableSql(writer io.Writer) error {
}

// add _deleted column when we have RetainDeletions set and enabled
if !tm.options.DisableRetainDeletions && tm.typ.RetainDeletions {
if !tm.options.disableRetainDeletions && tm.typ.RetainDeletions {
_, err = fmt.Fprintf(writer, "_deleted BOOLEAN NOT NULL DEFAULT FALSE,\n\t")
if err != nil {
return err
Expand Down Expand Up @@ -87,7 +87,7 @@ func (tm *ObjectIndexer) CreateTableSql(writer io.Writer) error {
// we GRANT SELECT on the table to PUBLIC so that the table is automatically available
// for querying using off-the-shelf tools like pg_graphql, Postgrest, Postgraphile, etc.
// without any login permissions
_, err = fmt.Fprintf(writer, "GRANT SELECT ON TABLE %q TO PUBLIC;", tm.TableName())
_, err = fmt.Fprintf(writer, "GRANT SELECT ON TABLE %q TO PUBLIC;", tm.tableName())
if err != nil {
return err
}
Expand Down
17 changes: 9 additions & 8 deletions indexer/postgres/create_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (

"cosmossdk.io/indexer/postgres/internal/testdata"
"cosmossdk.io/schema"
"cosmossdk.io/schema/logutil"
)

func ExampleObjectIndexer_CreateTableSql_allKinds() {
func Example_objectIndexer_createTableSql_allKinds() {
exampleCreateTable(testdata.AllKindsObject)
// Output:
// CREATE TABLE IF NOT EXISTS "test_all_kinds" (
Expand Down Expand Up @@ -40,7 +41,7 @@ func ExampleObjectIndexer_CreateTableSql_allKinds() {
// GRANT SELECT ON TABLE "test_all_kinds" TO PUBLIC;
}

func ExampleObjectIndexer_CreateTableSql_singleton() {
func Example_objectIndexer_createTableSql_singleton() {
exampleCreateTable(testdata.SingletonObject)
// Output:
// CREATE TABLE IF NOT EXISTS "test_singleton" (
Expand All @@ -53,7 +54,7 @@ func ExampleObjectIndexer_CreateTableSql_singleton() {
// GRANT SELECT ON TABLE "test_singleton" TO PUBLIC;
}

func ExampleObjectIndexer_CreateTableSql_vote() {
func Example_objectIndexer_createTableSql_vote() {
exampleCreateTable(testdata.VoteObject)
// Output:
// CREATE TABLE IF NOT EXISTS "test_vote" (
Expand All @@ -66,7 +67,7 @@ func ExampleObjectIndexer_CreateTableSql_vote() {
// GRANT SELECT ON TABLE "test_vote" TO PUBLIC;
}

func ExampleObjectIndexer_CreateTableSql_vote_no_retain_delete() {
func Example_objectIndexer_createTableSql_vote_no_retain_delete() {
exampleCreateTableOpt(testdata.VoteObject, true)
// Output:
// CREATE TABLE IF NOT EXISTS "test_vote" (
Expand All @@ -83,11 +84,11 @@ func exampleCreateTable(objectType schema.ObjectType) {
}

func exampleCreateTableOpt(objectType schema.ObjectType, noRetainDelete bool) {
tm := NewObjectIndexer("test", objectType, Options{
Logger: func(msg, sql string, params ...interface{}) {},
DisableRetainDeletions: noRetainDelete,
tm := newObjectIndexer("test", objectType, options{
logger: logutil.NoopLogger{},
disableRetainDeletions: noRetainDelete,
})
err := tm.CreateTableSql(os.Stdout)
err := tm.createTableSql(os.Stdout)
if err != nil {
panic(err)
}
Expand Down
14 changes: 7 additions & 7 deletions indexer/postgres/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"cosmossdk.io/schema"
)

// CreateEnumType creates an enum type in the database.
func (m *ModuleIndexer) CreateEnumType(ctx context.Context, conn DBConn, enum schema.EnumType) error {
// createEnumType creates an enum type in the database.
func (m *moduleIndexer) createEnumType(ctx context.Context, conn dbConn, enum schema.EnumType) error {
typeName := enumTypeName(m.moduleName, enum)
row := conn.QueryRowContext(ctx, "SELECT 1 FROM pg_type WHERE typname = $1", typeName)
var res interface{}
Expand All @@ -25,21 +25,21 @@ func (m *ModuleIndexer) CreateEnumType(ctx context.Context, conn DBConn, enum sc
}

buf := new(strings.Builder)
err := CreateEnumTypeSql(buf, m.moduleName, enum)
err := createEnumTypeSql(buf, m.moduleName, enum)
if err != nil {
return err
}

sqlStr := buf.String()
if m.options.Logger != nil {
m.options.Logger("Creating enum type", sqlStr)
if m.options.logger != nil {
m.options.logger.Debug("Creating enum type", "sql", sqlStr)
}
_, err = conn.ExecContext(ctx, sqlStr)
return err
}

// CreateEnumTypeSql generates a CREATE TYPE statement for the enum definition.
func CreateEnumTypeSql(writer io.Writer, moduleName string, enum schema.EnumType) error {
// createEnumTypeSql generates a CREATE TYPE statement for the enum definition.
func createEnumTypeSql(writer io.Writer, moduleName string, enum schema.EnumType) error {
_, err := fmt.Fprintf(writer, "CREATE TYPE %q AS ENUM (", enumTypeName(moduleName, enum))
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions indexer/postgres/enum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"cosmossdk.io/indexer/postgres/internal/testdata"
)

func ExampleCreateEnumTypeSql() {
err := CreateEnumTypeSql(os.Stdout, "test", testdata.MyEnum)
func Example_createEnumTypeSql() {
err := createEnumTypeSql(os.Stdout, "test", testdata.MyEnum)
if err != nil {
panic(err)
}
Expand Down
93 changes: 58 additions & 35 deletions indexer/postgres/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package postgres
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"

"cosmossdk.io/schema/appdata"
"cosmossdk.io/schema/indexer"
"cosmossdk.io/schema/logutil"
)

type Config struct {
Expand All @@ -22,9 +23,28 @@ type Config struct {

type SqlLogger = func(msg, sql string, params ...interface{})

func StartIndexer(ctx context.Context, logger SqlLogger, config Config) (appdata.Listener, error) {
type indexerImpl struct {
ctx context.Context
db *sql.DB
tx *sql.Tx
opts options
modules map[string]*moduleIndexer
logger logutil.Logger
}

func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) {
config, err := decodeConfig(params.Config.Config)
if err != nil {
return indexer.InitResult{}, err
}

ctx := params.Context
if ctx == nil {
ctx = context.Background()
}

if config.DatabaseURL == "" {
return appdata.Listener{}, errors.New("missing database URL")
return indexer.InitResult{}, errors.New("missing database URL")
}

driver := config.DatabaseDriver
Expand All @@ -34,48 +54,51 @@ func StartIndexer(ctx context.Context, logger SqlLogger, config Config) (appdata

db, err := sql.Open(driver, config.DatabaseURL)
if err != nil {
return appdata.Listener{}, err
return indexer.InitResult{}, err
}

tx, err := db.BeginTx(ctx, nil)
if err != nil {
return appdata.Listener{}, err
return indexer.InitResult{}, err
}

// commit base schema
_, err = tx.Exec(BaseSQL)
_, err = tx.Exec(baseSQL)
if err != nil {
return appdata.Listener{}, err
return indexer.InitResult{}, err
}

moduleIndexers := map[string]*moduleIndexer{}
opts := options{
disableRetainDeletions: config.DisableRetainDeletions,
logger: params.Logger,
}

moduleIndexers := map[string]*ModuleIndexer{}
opts := Options{
DisableRetainDeletions: config.DisableRetainDeletions,
Logger: logger,
idx := &indexerImpl{
ctx: ctx,
db: db,
tx: tx,
opts: opts,
modules: moduleIndexers,
logger: params.Logger,
}

return appdata.Listener{
InitializeModuleData: func(data appdata.ModuleInitializationData) error {
moduleName := data.ModuleName
modSchema := data.Schema
_, ok := moduleIndexers[moduleName]
if ok {
return fmt.Errorf("module %s already initialized", moduleName)
}

mm := NewModuleIndexer(moduleName, modSchema, opts)
moduleIndexers[moduleName] = mm

return mm.InitializeSchema(ctx, tx)
},
Commit: func(data appdata.CommitData) (completionCallback func() error, err error) {
err = tx.Commit()
if err != nil {
return nil, err
}

tx, err = db.BeginTx(ctx, nil)
return nil, err
},
return indexer.InitResult{
Listener: idx.listener(),
}, nil
}

func decodeConfig(rawConfig map[string]interface{}) (*Config, error) {
bz, err := json.Marshal(rawConfig)
if err != nil {
return nil, err
}

var config Config
err = json.Unmarshal(bz, &config)
if err != nil {
return nil, err
}

return &config, nil
}
38 changes: 38 additions & 0 deletions indexer/postgres/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package postgres

import (
"fmt"

"cosmossdk.io/schema/appdata"
)

func (i *indexerImpl) listener() appdata.Listener {
return appdata.Listener{
InitializeModuleData: func(data appdata.ModuleInitializationData) error {
moduleName := data.ModuleName
modSchema := data.Schema
_, ok := i.modules[moduleName]
if ok {
return fmt.Errorf("module %s already initialized", moduleName)
}

mm := newModuleIndexer(moduleName, modSchema, i.opts)
i.modules[moduleName] = mm

return mm.initializeSchema(i.ctx, i.tx)
},
StartBlock: func(data appdata.StartBlockData) error {
_, err := i.tx.Exec("INSERT INTO block (number) VALUES ($1)", data.Height)
return err
},
Commit: func(data appdata.CommitData) (func() error, error) {
err := i.tx.Commit()
if err != nil {
return nil, err
}

i.tx, err = i.db.BeginTx(i.ctx, nil)
return nil, err
},
}
}
Loading

0 comments on commit e88c138

Please sign in to comment.