From d841247601ad7c49fb1fdf058fc2583c621bbd58 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 15:12:14 +0800 Subject: [PATCH 1/7] ddl: use the correct timezone to encode record for adding index --- ddl/index.go | 12 ++++++++++++ ddl/ingest/integration_test.go | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/ddl/index.go b/ddl/index.go index 2d3b2ef5cd9c4..6963a31eff685 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1656,6 +1656,8 @@ func (w *addIndexIngestWorker) WriteLocal(rs *idxRecResult) (count int, nextKey oprStartTime := time.Now() copCtx := w.copReqSenderPool.copCtx vars := w.sessCtx.GetSessionVars() + restore := setUTCTimezone(vars) + defer restore() cnt, lastHandle, err := writeChunkToLocal(w.writer, w.index, copCtx, vars, rs.chunk) if err != nil || cnt == 0 { return 0, nil, err @@ -1666,6 +1668,16 @@ func (w *addIndexIngestWorker) WriteLocal(rs *idxRecResult) (count int, nextKey return cnt, nextKey, nil } +// setUTCTimezone sets the timezone to UTC for the session variable. +// This is because the time value in coprocessor reader is in UTC. +func setUTCTimezone(vars *variable.SessionVars) (restore func()) { + originTZ := vars.TimeZone + vars.TimeZone = time.UTC + return func() { + vars.TimeZone = originTZ + } +} + func writeChunkToLocal(writer ingest.Writer, index table.Index, copCtx *copContext, vars *variable.SessionVars, copChunk *chunk.Chunk) (int, kv.Handle, error) { diff --git a/ddl/ingest/integration_test.go b/ddl/ingest/integration_test.go index 433a9b15cc65a..77c8742e448d9 100644 --- a/ddl/ingest/integration_test.go +++ b/ddl/ingest/integration_test.go @@ -323,3 +323,16 @@ func TestAddIndexIngestRecoverPartition(t *testing.T) { tk.MustExec("alter table t add index idx(b);") tk.MustExec("admin check table t;") } + +func TestAddIndexIngestTimezone(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + defer injectMockBackendMgr(t, store)() + + tk.MustExec("SET time_zone = '-06:00';") + tk.MustExec("create table t (`src` varchar(48),`t` timestamp,`timezone` varchar(100));") + tk.MustExec("insert into t values('2000-07-29 23:15:30 -0600','2000-07-29 23:15:30 -0600','-6:00');") + tk.MustExec("alter table t add index idx(t);") + tk.MustExec("admin check table t;") +} From b49a7dd05f5bd20b62802a925e0c2a3793fb6c83 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 15:39:26 +0800 Subject: [PATCH 2/7] update bazel --- ddl/ingest/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/ingest/BUILD.bazel b/ddl/ingest/BUILD.bazel index 4f716cfd51771..f18bc20f46c53 100644 --- a/ddl/ingest/BUILD.bazel +++ b/ddl/ingest/BUILD.bazel @@ -65,7 +65,7 @@ go_test( embed = [":ingest"], flaky = True, race = "on", - shard_count = 14, + shard_count = 15, deps = [ "//config", "//ddl", From 58d432c11218646ceb47af23ad96d395dec9d318 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 16:00:22 +0800 Subject: [PATCH 3/7] set statement context instead --- ddl/index.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 6963a31eff685..c8950a31ffc17 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1656,8 +1656,6 @@ func (w *addIndexIngestWorker) WriteLocal(rs *idxRecResult) (count int, nextKey oprStartTime := time.Now() copCtx := w.copReqSenderPool.copCtx vars := w.sessCtx.GetSessionVars() - restore := setUTCTimezone(vars) - defer restore() cnt, lastHandle, err := writeChunkToLocal(w.writer, w.index, copCtx, vars, rs.chunk) if err != nil || cnt == 0 { return 0, nil, err @@ -1670,7 +1668,7 @@ func (w *addIndexIngestWorker) WriteLocal(rs *idxRecResult) (count int, nextKey // setUTCTimezone sets the timezone to UTC for the session variable. // This is because the time value in coprocessor reader is in UTC. -func setUTCTimezone(vars *variable.SessionVars) (restore func()) { +func setUTCTimezone(vars *stmtctx.StatementContext) (restore func()) { originTZ := vars.TimeZone vars.TimeZone = time.UTC return func() { @@ -1682,6 +1680,8 @@ func writeChunkToLocal(writer ingest.Writer, index table.Index, copCtx *copContext, vars *variable.SessionVars, copChunk *chunk.Chunk) (int, kv.Handle, error) { sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() + restore := setUTCTimezone(sCtx) + defer restore() iter := chunk.NewIterator4Chunk(copChunk) idxDataBuf := make([]types.Datum, len(copCtx.idxColOutputOffsets)) handleDataBuf := make([]types.Datum, len(copCtx.handleOutputOffsets)) From 9c189bc36f6e6ee2583687f4b6d8dbced6084a04 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Aug 2023 18:06:09 +0800 Subject: [PATCH 4/7] address comment --- ddl/export_test.go | 5 +++-- ddl/index.go | 16 ++-------------- ddl/index_cop.go | 22 ++++++++++++++++++++-- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/ddl/export_test.go b/ddl/export_test.go index 633c28c8a9de6..8d06495f5a070 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -60,8 +60,9 @@ func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endK } func ConvertRowToHandleAndIndexDatum(row chunk.Row, copCtx *copContext) (kv.Handle, []types.Datum, error) { - idxData := extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, nil) - handleData := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, nil) + sVars := copCtx.sessCtx.GetSessionVars() + idxData := extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, sVars, nil) + handleData := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, sVars, nil) handle, err := buildHandle(handleData, copCtx.tblInfo, copCtx.pkInfo, &stmtctx.StatementContext{TimeZone: time.Local}) return handle, idxData, err } diff --git a/ddl/index.go b/ddl/index.go index c8950a31ffc17..46e9cc8dd6073 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1666,22 +1666,10 @@ func (w *addIndexIngestWorker) WriteLocal(rs *idxRecResult) (count int, nextKey return cnt, nextKey, nil } -// setUTCTimezone sets the timezone to UTC for the session variable. -// This is because the time value in coprocessor reader is in UTC. -func setUTCTimezone(vars *stmtctx.StatementContext) (restore func()) { - originTZ := vars.TimeZone - vars.TimeZone = time.UTC - return func() { - vars.TimeZone = originTZ - } -} - func writeChunkToLocal(writer ingest.Writer, index table.Index, copCtx *copContext, vars *variable.SessionVars, copChunk *chunk.Chunk) (int, kv.Handle, error) { sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() - restore := setUTCTimezone(sCtx) - defer restore() iter := chunk.NewIterator4Chunk(copChunk) idxDataBuf := make([]types.Datum, len(copCtx.idxColOutputOffsets)) handleDataBuf := make([]types.Datum, len(copCtx.handleOutputOffsets)) @@ -1691,8 +1679,8 @@ func writeChunkToLocal(writer ingest.Writer, defer unlock() for row := iter.Begin(); row != iter.End(); row = iter.Next() { idxDataBuf, handleDataBuf = idxDataBuf[:0], handleDataBuf[:0] - idxDataBuf = extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, idxDataBuf) - handleDataBuf := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, handleDataBuf) + idxDataBuf = extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, vars, idxDataBuf) + handleDataBuf := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, vars, handleDataBuf) handle, err := buildHandle(handleDataBuf, copCtx.tblInfo, copCtx.pkInfo, sCtx) if err != nil { return 0, nil, errors.Trace(err) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index a220c9a57f7eb..060f787788dd1 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -18,6 +18,7 @@ import ( "context" "encoding/hex" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -506,10 +508,26 @@ func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, col return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err } -func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.Column, buf []types.Datum) []types.Datum { +func extractDatumByOffsets( + row chunk.Row, + offsets []int, + expCols []*expression.Column, + vars *variable.SessionVars, + buf []types.Datum, +) []types.Datum { for _, offset := range offsets { c := expCols[offset] - rowDt := row.GetDatum(offset, c.GetType()) + ft := c.GetType() + rowDt := row.GetDatum(offset, ft) + if ft.GetType() == mysql.TypeTimestamp && vars.TimeZone != time.UTC { + // Convert from utc to current timezone. + t := rowDt.GetMysqlTime() + err := t.ConvertTimeZone(time.UTC, vars.TimeZone) + if err != nil { + logutil.BgLogger().Warn("convert timestamp timezone failed", zap.Error(err)) + } + rowDt.SetMysqlTime(t) + } buf = append(buf, rowDt) } return buf From 019d1fe069cb680a512c563f83fd8efad8e99f31 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 15 Aug 2023 14:49:40 +0800 Subject: [PATCH 5/7] fix integration test --- ddl/index_cop.go | 16 ++-------------- ddl/ingest/integration_test.go | 8 +++++++- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 060f787788dd1..a1d20abf3ab94 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -18,7 +18,6 @@ import ( "context" "encoding/hex" "sync" - "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -29,7 +28,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -486,7 +484,7 @@ func getRestoreData(tblInfo *model.TableInfo, targetIdx, pkIdx *model.IndexInfo, func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} - dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location()) + _, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location()) sc := sCtx.GetSessionVars().StmtCtx dagReq.Flags = sc.PushDownFlags() for i := range colInfos { @@ -517,17 +515,7 @@ func extractDatumByOffsets( ) []types.Datum { for _, offset := range offsets { c := expCols[offset] - ft := c.GetType() - rowDt := row.GetDatum(offset, ft) - if ft.GetType() == mysql.TypeTimestamp && vars.TimeZone != time.UTC { - // Convert from utc to current timezone. - t := rowDt.GetMysqlTime() - err := t.ConvertTimeZone(time.UTC, vars.TimeZone) - if err != nil { - logutil.BgLogger().Warn("convert timestamp timezone failed", zap.Error(err)) - } - rowDt.SetMysqlTime(t) - } + rowDt := row.GetDatum(offset, c.GetType()) buf = append(buf, rowDt) } return buf diff --git a/ddl/ingest/integration_test.go b/ddl/ingest/integration_test.go index 77c8742e448d9..088e4bcbee59d 100644 --- a/ddl/ingest/integration_test.go +++ b/ddl/ingest/integration_test.go @@ -332,7 +332,13 @@ func TestAddIndexIngestTimezone(t *testing.T) { tk.MustExec("SET time_zone = '-06:00';") tk.MustExec("create table t (`src` varchar(48),`t` timestamp,`timezone` varchar(100));") - tk.MustExec("insert into t values('2000-07-29 23:15:30 -0600','2000-07-29 23:15:30 -0600','-6:00');") + tk.MustExec("insert into t values('2000-07-29 23:15:30','2000-07-29 23:15:30','-6:00');") + tk.MustExec("alter table t add index idx(t);") + tk.MustExec("admin check table t;") + + tk.MustExec("alter table t drop index idx;") + tk.MustExec("SET time_zone = 'Asia/Shanghai';") + tk.MustExec("insert into t values('2000-07-29 23:15:30','2000-07-29 23:15:30', '+8:00');") tk.MustExec("alter table t add index idx(t);") tk.MustExec("admin check table t;") } From 22cf55f09b6e26c06a343caf66553582a5f5b504 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 15 Aug 2023 14:51:03 +0800 Subject: [PATCH 6/7] revert unnecessary change --- ddl/export_test.go | 5 ++--- ddl/index.go | 4 ++-- ddl/index_cop.go | 1 - 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/ddl/export_test.go b/ddl/export_test.go index 8d06495f5a070..633c28c8a9de6 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -60,9 +60,8 @@ func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endK } func ConvertRowToHandleAndIndexDatum(row chunk.Row, copCtx *copContext) (kv.Handle, []types.Datum, error) { - sVars := copCtx.sessCtx.GetSessionVars() - idxData := extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, sVars, nil) - handleData := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, sVars, nil) + idxData := extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, nil) + handleData := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, nil) handle, err := buildHandle(handleData, copCtx.tblInfo, copCtx.pkInfo, &stmtctx.StatementContext{TimeZone: time.Local}) return handle, idxData, err } diff --git a/ddl/index.go b/ddl/index.go index 46e9cc8dd6073..2d3b2ef5cd9c4 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1679,8 +1679,8 @@ func writeChunkToLocal(writer ingest.Writer, defer unlock() for row := iter.Begin(); row != iter.End(); row = iter.Next() { idxDataBuf, handleDataBuf = idxDataBuf[:0], handleDataBuf[:0] - idxDataBuf = extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, vars, idxDataBuf) - handleDataBuf := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, vars, handleDataBuf) + idxDataBuf = extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, idxDataBuf) + handleDataBuf := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, handleDataBuf) handle, err := buildHandle(handleDataBuf, copCtx.tblInfo, copCtx.pkInfo, sCtx) if err != nil { return 0, nil, errors.Trace(err) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index a1d20abf3ab94..88dabda331951 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -510,7 +510,6 @@ func extractDatumByOffsets( row chunk.Row, offsets []int, expCols []*expression.Column, - vars *variable.SessionVars, buf []types.Datum, ) []types.Datum { for _, offset := range offsets { From 24d00ab4de7f47c313c8efd3e2301a4fdb5e3c37 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 15 Aug 2023 14:52:21 +0800 Subject: [PATCH 7/7] revert unnecessary change --- ddl/index_cop.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 88dabda331951..909981f70f015 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -506,12 +506,7 @@ func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, col return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err } -func extractDatumByOffsets( - row chunk.Row, - offsets []int, - expCols []*expression.Column, - buf []types.Datum, -) []types.Datum { +func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.Column, buf []types.Datum) []types.Datum { for _, offset := range offsets { c := expCols[offset] rowDt := row.GetDatum(offset, c.GetType())