From 08b834760234cb37ea4ce90193eec29547b240fd Mon Sep 17 00:00:00 2001 From: mahjonp Date: Fri, 26 Feb 2021 10:49:16 +0800 Subject: [PATCH] *: add a flag to retry commit when data loading fails on prepare stage (#72) Signed-off-by: mahjonp --- Makefile | 11 ++++++--- ch/workload.go | 6 ++--- cmd/go-tpc/main.go | 1 + cmd/go-tpc/misc.go | 3 +++ cmd/go-tpc/tpcc.go | 3 +++ cmd/go-tpc/versioninfo.go | 25 ++++++++++++++++++++ pkg/load/batch_loader.go | 35 +++++++++++++++++++++++----- pkg/util/versioninfo.go | 8 +++++++ tpcc/load.go | 18 +++++++------- tpcc/new_order.go | 2 +- tpcc/workload.go | 4 ++++ tpch/loader.go | 49 ++++++++++++++++++++------------------- tpch/workload.go | 18 +++++++------- 13 files changed, 127 insertions(+), 56 deletions(-) create mode 100644 cmd/go-tpc/versioninfo.go create mode 100644 pkg/util/versioninfo.go diff --git a/Makefile b/Makefile index 12a8504..7bceb55 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,16 @@ -GOOS := $(if $(GOOS),$(GOOS),linux) GOARCH := $(if $(GOARCH),$(GOARCH),amd64) -GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) GO111MODULE=on go +GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOARCH=$(GOARCH) GO111MODULE=on go PACKAGE_LIST := go list ./...| grep -vE "cmd" PACKAGES := $$($(PACKAGE_LIST)) FILES_TO_FMT := $(shell find . -path -prune -o -name '*.go' -print) +LDFLAGS += -X "github.com/pingcap/go-tpc/pkg/util.ReleaseVersion=$(shell git describe --tags --dirty --always)" +LDFLAGS += -X "github.com/pingcap/go-tpc/pkg/util.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')" +LDFLAGS += -X "github.com/pingcap/go-tpc/pkg/util.BuildHash=$(shell git rev-parse HEAD)" + +GOBUILD=$(GO) build -ldflags '$(LDFLAGS)' + # Image URL to use all building/pushing image targets IMG ?= go-tpc:latest @@ -22,7 +27,7 @@ test: go test ./... -cover $(PACKAGES) build: mod - go build -o ./bin/go-tpc cmd/go-tpc/* + $(GOBUILD) -o ./bin/go-tpc cmd/go-tpc/* vet: go vet ./... diff --git a/ch/workload.go b/ch/workload.go index 7bdbcbc..447aa2b 100644 --- a/ch/workload.go +++ b/ch/workload.go @@ -106,9 +106,9 @@ func (w Workloader) Prepare(ctx context.Context, threadID int) error { return err } sqlLoader := map[dbgen.Table]dbgen.Loader{ - dbgen.TSupp: tpch.NewSuppLoader(ctx, s.Conn), - dbgen.TNation: tpch.NewNationLoader(ctx, s.Conn), - dbgen.TRegion: tpch.NewRegionLoader(ctx, s.Conn), + dbgen.TSupp: tpch.NewSuppLoader(ctx, w.db), + dbgen.TNation: tpch.NewNationLoader(ctx, w.db), + dbgen.TRegion: tpch.NewRegionLoader(ctx, w.db), } dbgen.InitDbGen(1) if err := dbgen.DbGen(sqlLoader, []dbgen.Table{dbgen.TNation, dbgen.TRegion, dbgen.TSupp}); err != nil { diff --git a/cmd/go-tpc/main.go b/cmd/go-tpc/main.go index ca2669a..0f11ca9 100644 --- a/cmd/go-tpc/main.go +++ b/cmd/go-tpc/main.go @@ -109,6 +109,7 @@ func main() { cobra.EnablePrefixMatching = true + registerVersionInfo(rootCmd) registerTpcc(rootCmd) registerTpch(rootCmd) registerCHBenchmark(rootCmd) diff --git a/cmd/go-tpc/misc.go b/cmd/go-tpc/misc.go index b6c5bb1..2da90d8 100644 --- a/cmd/go-tpc/misc.go +++ b/cmd/go-tpc/misc.go @@ -105,6 +105,9 @@ func executeWorkload(ctx context.Context, w workload.Workloader, threads int, ac go func(index int) { defer wg.Done() if err := execute(ctx, w, action, threads, index); err != nil { + if action == "prepare" { + panic(fmt.Sprintf("a fatal occurred when preparing data: %v", err)) + } fmt.Printf("execute %s failed, err %v\n", action, err) return } diff --git a/cmd/go-tpc/tpcc.go b/cmd/go-tpc/tpcc.go index 5f6a699..2300bb3 100644 --- a/cmd/go-tpc/tpcc.go +++ b/cmd/go-tpc/tpcc.go @@ -7,6 +7,7 @@ import ( _ "net/http/pprof" "os" "runtime" + "time" "github.com/pingcap/go-tpc/pkg/measurement" "github.com/pingcap/go-tpc/pkg/workload" @@ -90,6 +91,8 @@ func registerTpcc(root *cobra.Command) { cmdPrepare.PersistentFlags().StringVar(&tpccConfig.OutputDir, "output-dir", "", "Output directory for generating file if specified") cmdPrepare.PersistentFlags().StringVar(&tpccConfig.SpecifiedTables, "tables", "", "Specified tables for "+ "generating file, separated by ','. Valid only if output is set. If this flag is not set, generate all tables by default") + cmdPrepare.PersistentFlags().IntVar(&tpccConfig.PrepareReCommitCount, "retry-count", 50, "Retry count when errors occur") + cmdPrepare.PersistentFlags().DurationVar(&tpccConfig.PrepareReCommitDuration, "retry-duration", 10*time.Second, "The duration for each retry") var cmdRun = &cobra.Command{ Use: "run", diff --git a/cmd/go-tpc/versioninfo.go b/cmd/go-tpc/versioninfo.go new file mode 100644 index 0000000..ab94c3f --- /dev/null +++ b/cmd/go-tpc/versioninfo.go @@ -0,0 +1,25 @@ +package main + +import ( + "fmt" + + "github.com/spf13/cobra" + + "github.com/pingcap/go-tpc/pkg/util" +) + +func printVersion() { + fmt.Println("Git Commit Hash:", util.BuildHash) + fmt.Println("UTC Build Time:", util.BuildTS) + fmt.Println("Release version:", util.ReleaseVersion) +} + +func registerVersionInfo(root *cobra.Command) { + cmd := &cobra.Command{ + Use: "version", + Run: func(cmd *cobra.Command, args []string) { + printVersion() + }, + } + root.AddCommand(cmd) +} diff --git a/pkg/load/batch_loader.go b/pkg/load/batch_loader.go index 1d4a1ce..e81c968 100644 --- a/pkg/load/batch_loader.go +++ b/pkg/load/batch_loader.go @@ -5,7 +5,10 @@ import ( "context" "database/sql" "encoding/csv" + "fmt" "os" + "strings" + "time" ) const ( @@ -20,17 +23,23 @@ type BatchLoader interface { // SQLBatchLoader helps us insert in batch type SQLBatchLoader struct { insertHint string - conn *sql.Conn + db *sql.DB buf bytes.Buffer count int + + // loader retry + retryCount int + retryDuration time.Duration } // NewSQLBatchLoader creates a batch loader for database connection -func NewSQLBatchLoader(conn *sql.Conn, hint string) *SQLBatchLoader { +func NewSQLBatchLoader(db *sql.DB, hint string, retryCount int, retryDuration time.Duration) *SQLBatchLoader { return &SQLBatchLoader{ - count: 0, - insertHint: hint, - conn: conn, + count: 0, + insertHint: hint, + db: db, + retryCount: retryCount, + retryDuration: retryDuration, } } @@ -59,7 +68,21 @@ func (b *SQLBatchLoader) Flush(ctx context.Context) error { return nil } - _, err := b.conn.ExecContext(ctx, b.buf.String()) + var err error + for i := 0; i < 1+b.retryCount; i++ { + _, err = b.db.ExecContext(ctx, b.buf.String()) + if err == nil || (strings.Contains(err.Error(), "Error 1062: Duplicate entry") && i == 0) { + break + } + if i < b.retryCount { + fmt.Printf("exec statement error: %v, may try again later...\n", err) + time.Sleep(b.retryDuration) + } + } + if err != nil { + return fmt.Errorf("exec statement error: %v", err) + } + b.count = 0 b.buf.Reset() diff --git a/pkg/util/versioninfo.go b/pkg/util/versioninfo.go new file mode 100644 index 0000000..28b63e6 --- /dev/null +++ b/pkg/util/versioninfo.go @@ -0,0 +1,8 @@ +package util + +// Version information +var ( + ReleaseVersion string + BuildTS string + BuildHash string +) diff --git a/tpcc/load.go b/tpcc/load.go index 41834c6..32621f3 100644 --- a/tpcc/load.go +++ b/tpcc/load.go @@ -25,7 +25,7 @@ func (w *Workloader) loadItem(ctx context.Context) error { s := getTPCCState(ctx) hint := "INSERT INTO item (i_id, i_im_id, i_name, i_price, i_data) VALUES " - l := load.NewSQLBatchLoader(s.Conn, hint) + l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration) for i := 0; i < maxItems; i++ { s.Buf.Reset() @@ -50,7 +50,7 @@ func (w *Workloader) loadWarehouse(ctx context.Context, warehouse int) error { s := getTPCCState(ctx) hint := "INSERT INTO warehouse (w_id, w_name, w_street_1, w_street_2, w_city, w_state, w_zip, w_tax, w_ytd) VALUES " - l := load.NewSQLBatchLoader(s.Conn, hint) + l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration) wName := randChars(s.R, s.Buf, 6, 10) wStree1 := randChars(s.R, s.Buf, 10, 20) @@ -80,7 +80,7 @@ func (w *Workloader) loadStock(ctx context.Context, warehouse int) error { s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06, s_dist_07, s_dist_08, s_dist_09, s_dist_10, s_ytd, s_order_cnt, s_remote_cnt, s_data) VALUES ` - l := load.NewSQLBatchLoader(s.Conn, hint) + l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration) for i := 0; i < stockPerWarehouse; i++ { s.Buf.Reset() @@ -122,7 +122,7 @@ func (w *Workloader) loadDistrict(ctx context.Context, warehouse int) error { hint := `INSERT INTO district (d_id, d_w_id, d_name, d_street_1, d_street_2, d_city, d_state, d_zip, d_tax, d_ytd, d_next_o_id) VALUES ` - l := load.NewSQLBatchLoader(s.Conn, hint) + l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration) for i := 0; i < districtPerWarehouse; i++ { s.Buf.Reset() @@ -158,7 +158,7 @@ func (w *Workloader) loadCustomer(ctx context.Context, warehouse int, district i c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_since, c_credit, c_credit_lim, c_discount, c_balance, c_ytd_payment, c_payment_cnt, c_delivery_cnt, c_data) VALUES ` - l := load.NewSQLBatchLoader(s.Conn, hint) + l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration) for i := 0; i < customerPerDistrict; i++ { s.Buf.Reset() @@ -212,7 +212,7 @@ func (w *Workloader) loadHistory(ctx context.Context, warehouse int, district in s := getTPCCState(ctx) hint := `INSERT INTO history (h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES ` - l := load.NewSQLBatchLoader(s.Conn, hint) + l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration) // 1 customer has 1 row for i := 0; i < customerPerDistrict; i++ { @@ -245,7 +245,7 @@ func (w *Workloader) loadOrder(ctx context.Context, warehouse int, district int) hint := `INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d, o_carrier_id, o_ol_cnt, o_all_local) VALUES ` - l := load.NewSQLBatchLoader(s.Conn, hint) + l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration) cids := rand.Perm(orderPerDistrict) s.R.Shuffle(len(cids), func(i, j int) { @@ -285,7 +285,7 @@ func (w *Workloader) loadNewOrder(ctx context.Context, warehouse int, district i hint := `INSERT INTO new_order (no_o_id, no_d_id, no_w_id) VALUES ` - l := load.NewSQLBatchLoader(s.Conn, hint) + l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration) for i := 0; i < newOrderPerDistrict; i++ { s.Buf.Reset() @@ -312,7 +312,7 @@ func (w *Workloader) loadOrderLine(ctx context.Context, warehouse int, district hint := `INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number, ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES ` - l := load.NewSQLBatchLoader(s.Conn, hint) + l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration) for i := 0; i < orderPerDistrict; i++ { for j := 0; j < olCnts[i]; j++ { diff --git a/tpcc/new_order.go b/tpcc/new_order.go index 2b5ddea..51fd8fc 100644 --- a/tpcc/new_order.go +++ b/tpcc/new_order.go @@ -174,7 +174,7 @@ func (w *Workloader) runNewOrder(ctx context.Context, thread int) error { // Process 1 if err := s.newOrderStmts[newOrderSelectCustomer].QueryRowContext(ctx, d.wID, d.dID, d.cID).Scan(&d.cDiscount, &d.cLast, &d.cCredit, &d.wTax); err != nil { - return fmt.Errorf("exec %s failed %v", newOrderSelectCustomer, err) + return fmt.Errorf("exec %s(wID=%d,dID=%d,cID=%d) failed %v", newOrderSelectCustomer, d.wID, d.dID, d.cID, err) } // Process 2 diff --git a/tpcc/workload.go b/tpcc/workload.go index f7179b1..7f9e985 100644 --- a/tpcc/workload.go +++ b/tpcc/workload.go @@ -64,6 +64,10 @@ type Config struct { OutputType string OutputDir string SpecifiedTables string + + // connection, retry count when commiting statement fails, default 0 + PrepareReCommitCount int + PrepareReCommitDuration time.Duration } // Workloader is TPCC workload diff --git a/tpch/loader.go b/tpch/loader.go index 28b53fd..26c9ba0 100644 --- a/tpch/loader.go +++ b/tpch/loader.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "github.com/pingcap/go-tpc/pkg/load" "github.com/pingcap/go-tpc/tpch/dbgen" ) @@ -173,43 +174,43 @@ func (r *regionLoader) Load(item interface{}) error { return r.InsertValue(v) } -func NewOrderLoader(ctx context.Context, conn *sql.Conn) *orderLoader { - return &orderLoader{sqlLoader{load.NewSQLBatchLoader(conn, - `INSERT INTO orders (O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT) VALUES `), +func NewOrderLoader(ctx context.Context, db *sql.DB) *orderLoader { + return &orderLoader{sqlLoader{load.NewSQLBatchLoader(db, + `INSERT INTO orders (O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT) VALUES `, 0, 0), ctx}} } -func NewLineItemLoader(ctx context.Context, conn *sql.Conn) *lineItemloader { - return &lineItemloader{sqlLoader{load.NewSQLBatchLoader(conn, - `INSERT INTO lineitem (L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT) VALUES `), +func NewLineItemLoader(ctx context.Context, db *sql.DB) *lineItemloader { + return &lineItemloader{sqlLoader{load.NewSQLBatchLoader(db, + `INSERT INTO lineitem (L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT) VALUES `, 0, 0), ctx}} } -func NewCustLoader(ctx context.Context, conn *sql.Conn) *custLoader { - return &custLoader{sqlLoader{load.NewSQLBatchLoader(conn, - `INSERT INTO customer (C_CUSTKEY, C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT) VALUES `), +func NewCustLoader(ctx context.Context, db *sql.DB) *custLoader { + return &custLoader{sqlLoader{load.NewSQLBatchLoader(db, + `INSERT INTO customer (C_CUSTKEY, C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT) VALUES `, 0, 0), ctx}} } -func NewPartLoader(ctx context.Context, conn *sql.Conn) *partLoader { - return &partLoader{sqlLoader{load.NewSQLBatchLoader(conn, - `INSERT INTO part (P_PARTKEY, P_NAME, P_MFGR, P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT) VALUES `), +func NewPartLoader(ctx context.Context, db *sql.DB) *partLoader { + return &partLoader{sqlLoader{load.NewSQLBatchLoader(db, + `INSERT INTO part (P_PARTKEY, P_NAME, P_MFGR, P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT) VALUES `, 0, 0), ctx}} } -func NewPartSuppLoader(ctx context.Context, conn *sql.Conn) *partSuppLoader { - return &partSuppLoader{sqlLoader{load.NewSQLBatchLoader(conn, - `INSERT INTO partsupp (PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT) VALUES `), +func NewPartSuppLoader(ctx context.Context, db *sql.DB) *partSuppLoader { + return &partSuppLoader{sqlLoader{load.NewSQLBatchLoader(db, + `INSERT INTO partsupp (PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT) VALUES `, 0, 0), ctx}} } -func NewSuppLoader(ctx context.Context, conn *sql.Conn) *suppLoader { - return &suppLoader{sqlLoader{load.NewSQLBatchLoader(conn, - `INSERT INTO supplier (S_SUPPKEY, S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT) VALUES `), +func NewSuppLoader(ctx context.Context, db *sql.DB) *suppLoader { + return &suppLoader{sqlLoader{load.NewSQLBatchLoader(db, + `INSERT INTO supplier (S_SUPPKEY, S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT) VALUES `, 0, 0), ctx}} } -func NewNationLoader(ctx context.Context, conn *sql.Conn) *nationLoader { - return &nationLoader{sqlLoader{load.NewSQLBatchLoader(conn, - `INSERT INTO nation (N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT) VALUES `), +func NewNationLoader(ctx context.Context, db *sql.DB) *nationLoader { + return &nationLoader{sqlLoader{load.NewSQLBatchLoader(db, + `INSERT INTO nation (N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT) VALUES `, 0, 0), ctx}} } -func NewRegionLoader(ctx context.Context, conn *sql.Conn) *regionLoader { - return ®ionLoader{sqlLoader{load.NewSQLBatchLoader(conn, - `INSERT INTO region (R_REGIONKEY, R_NAME, R_COMMENT) VALUES `), +func NewRegionLoader(ctx context.Context, db *sql.DB) *regionLoader { + return ®ionLoader{sqlLoader{load.NewSQLBatchLoader(db, + `INSERT INTO region (R_REGIONKEY, R_NAME, R_COMMENT) VALUES `, 0, 0), ctx}} } diff --git a/tpch/workload.go b/tpch/workload.go index 1dde9c0..aa2b1c3 100644 --- a/tpch/workload.go +++ b/tpch/workload.go @@ -103,20 +103,18 @@ func (w Workloader) Prepare(ctx context.Context, threadID int) error { if threadID != 0 { return nil } - s := w.getState(ctx) - if err := w.createTables(ctx); err != nil { return err } sqlLoader := map[dbgen.Table]dbgen.Loader{ - dbgen.TOrder: NewOrderLoader(ctx, s.Conn), - dbgen.TLine: NewLineItemLoader(ctx, s.Conn), - dbgen.TPart: NewPartLoader(ctx, s.Conn), - dbgen.TPsupp: NewPartSuppLoader(ctx, s.Conn), - dbgen.TSupp: NewSuppLoader(ctx, s.Conn), - dbgen.TCust: NewCustLoader(ctx, s.Conn), - dbgen.TNation: NewNationLoader(ctx, s.Conn), - dbgen.TRegion: NewRegionLoader(ctx, s.Conn), + dbgen.TOrder: NewOrderLoader(ctx, w.db), + dbgen.TLine: NewLineItemLoader(ctx, w.db), + dbgen.TPart: NewPartLoader(ctx, w.db), + dbgen.TPsupp: NewPartSuppLoader(ctx, w.db), + dbgen.TSupp: NewSuppLoader(ctx, w.db), + dbgen.TCust: NewCustLoader(ctx, w.db), + dbgen.TNation: NewNationLoader(ctx, w.db), + dbgen.TRegion: NewRegionLoader(ctx, w.db), } dbgen.InitDbGen(int64(w.cfg.ScaleFactor)) if err := dbgen.DbGen(sqlLoader, []dbgen.Table{dbgen.TNation, dbgen.TRegion, dbgen.TCust, dbgen.TSupp, dbgen.TPartPsupp, dbgen.TOrderLine}); err != nil {