Skip to content

Commit

Permalink
sink(ticdc): add event key output for debezium (#11649)
Browse files Browse the repository at this point in the history
close #11652
  • Loading branch information
wk989898 authored Oct 21, 2024
1 parent 62d07b5 commit 76ebb37
Show file tree
Hide file tree
Showing 13 changed files with 657 additions and 188 deletions.
25 changes: 25 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,31 @@ func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
return pkeyCols, pkeyColInfos
}

// HandleKeyColDataXInfos returns the columnDataX(s) and colInfo(s) corresponding to the handle key(s)
func (r *RowChangedEvent) HandleKeyColDataXInfos() ([]ColumnDataX, []rowcodec.ColInfo) {
pkeyColDataXs := make([]ColumnDataX, 0)
pkeyColInfos := make([]rowcodec.ColInfo, 0)

var cols []*ColumnData
if r.IsDelete() {
cols = r.PreColumns
} else {
cols = r.Columns
}

tableInfo := r.TableInfo
colInfos := tableInfo.GetColInfosForRowChangedEvent()
for i, col := range cols {
if col != nil && tableInfo.ForceGetColumnFlagType(col.ColumnID).IsHandleKey() {
pkeyColDataXs = append(pkeyColDataXs, GetColumnDataX(col, tableInfo))
pkeyColInfos = append(pkeyColInfos, colInfos[i])
}
}

// It is okay not to have handle keys, so the empty array is an acceptable result
return pkeyColDataXs, pkeyColInfos
}

// ApproximateBytes returns approximate bytes in memory consumed by the event.
func (r *RowChangedEvent) ApproximateBytes() int {
const sizeOfRowEvent = int(unsafe.Sizeof(*r))
Expand Down
68 changes: 13 additions & 55 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,61 +448,19 @@ func mysqlTypeFromTiDBType(tidbType string) byte {
return result
}

const (
replacementChar = "_"
numberPrefix = "_"
)

// sanitizeName escapes not permitted chars for avro
// debezium-core/src/main/java/io/debezium/schema/FieldNameSelector.java
// https://avro.apache.org/docs/current/spec.html#names
func sanitizeName(name string) string {
changed := false
var sb strings.Builder
for i, c := range name {
if i == 0 && (c >= '0' && c <= '9') {
sb.WriteString(numberPrefix)
sb.WriteRune(c)
changed = true
} else if !(c == '_' ||
('a' <= c && c <= 'z') ||
('A' <= c && c <= 'Z') ||
('0' <= c && c <= '9')) {
sb.WriteString(replacementChar)
changed = true
} else {
sb.WriteRune(c)
}
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Name is potentially not safe for serialization, replace it",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName
}

// sanitizeTopic escapes ".", it may have special meanings for sink connectors
func sanitizeTopic(name string) string {
return strings.ReplaceAll(name, ".", replacementChar)
}

// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
func escapeEnumAndSetOptions(option string) string {
option = strings.ReplaceAll(option, ",", "\\,")
option = strings.ReplaceAll(option, "\\'", "'")
option = strings.ReplaceAll(option, "''", "'")
return option
return strings.ReplaceAll(name, ".", "_")
}

// <empty> | <name>[(<dot><name>)*]
func getAvroNamespace(namespace string, schema string) string {
return sanitizeName(namespace) + "." + sanitizeName(schema)
ns := common.SanitizeName(namespace)
s := common.SanitizeName(schema)
if s != "" {
return ns + "." + s
}
return ns
}

type avroSchema struct {
Expand Down Expand Up @@ -567,7 +525,7 @@ func (a *BatchEncoder) columns2AvroSchema(
) (*avroSchemaTop, error) {
top := &avroSchemaTop{
Tp: "record",
Name: sanitizeName(tableName.Table),
Name: common.SanitizeName(tableName.Table),
Namespace: getAvroNamespace(a.namespace, tableName.Schema),
Fields: nil,
}
Expand All @@ -580,7 +538,7 @@ func (a *BatchEncoder) columns2AvroSchema(
return nil, err
}
field := make(map[string]interface{})
field["name"] = sanitizeName(col.Name)
field["name"] = common.SanitizeName(col.Name)

copied := *col
copied.Value = copied.Default
Expand Down Expand Up @@ -679,9 +637,9 @@ func (a *BatchEncoder) columns2AvroData(

// https: //pkg.go.dev/github.com/linkedin/goavro/v2#Union
if col.Flag.IsNullable() {
ret[sanitizeName(col.Name)] = goavro.Union(str, data)
ret[common.SanitizeName(col.Name)] = goavro.Union(str, data)
} else {
ret[sanitizeName(col.Name)] = data
ret[common.SanitizeName(col.Name)] = data
}
}

Expand Down Expand Up @@ -790,7 +748,7 @@ func (a *BatchEncoder) columnToAvroSchema(
case mysql.TypeEnum, mysql.TypeSet:
es := make([]string, 0, len(ft.GetElems()))
for _, e := range ft.GetElems() {
e = escapeEnumAndSetOptions(e)
e = common.EscapeEnumAndSetOptions(e)
es = append(es, e)
}
return avroSchema{
Expand Down
14 changes: 10 additions & 4 deletions pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,14 @@ func TestAvroEnvelope(t *testing.T) {
func TestSanitizeName(t *testing.T) {
t.Parallel()

require.Equal(t, "normalColumnName123", sanitizeName("normalColumnName123"))
require.Equal(t, "normalColumnName123", common.SanitizeName("normalColumnName123"))
require.Equal(
t,
"_1ColumnNameStartWithNumber",
sanitizeName("1ColumnNameStartWithNumber"),
common.SanitizeName("1ColumnNameStartWithNumber"),
)
require.Equal(t, "A_B", sanitizeName("A.B"))
require.Equal(t, "columnNameWith__", sanitizeName("columnNameWith中文"))
require.Equal(t, "A_B", common.SanitizeName("A.B"))
require.Equal(t, "columnNameWith______", common.SanitizeName("columnNameWith中文"))
}

func TestGetAvroNamespace(t *testing.T) {
Expand All @@ -335,6 +335,12 @@ func TestGetAvroNamespace(t *testing.T) {
"N_amespace.S_chema",
getAvroNamespace("N-amespace", "S.chema"),
)

require.Equal(
t,
"normalNamespace",
getAvroNamespace("normalNamespace", ""),
)
}

func TestArvoAppendRowChangedEventWithCallback(t *testing.T) {
Expand Down
92 changes: 92 additions & 0 deletions pkg/sink/codec/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"fmt"
"math"
"strings"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/log"
Expand Down Expand Up @@ -292,3 +293,94 @@ func trimLeadingZeroBytes(bytes []byte) []byte {
}
return bytes[pos:]
}

const (
replacementChar = "_"
numberPrefix = 'x'
)

// EscapeEnumAndSetOptions escapes ",", "\" and "”"
// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
func EscapeEnumAndSetOptions(option string) string {
option = strings.ReplaceAll(option, ",", "\\,")
option = strings.ReplaceAll(option, "\\'", "'")
option = strings.ReplaceAll(option, "''", "'")
return option
}

func isValidFirstCharacter(c rune) bool {
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_'
}

func isValidNonFirstCharacter(c rune) bool {
return isValidFirstCharacter(c) || (c >= '0' && c <= '9')
}

func isValidNonFirstCharacterForTopicName(c rune) bool {
return isValidNonFirstCharacter(c) || c == '.'
}

// SanitizeName escapes not permitted chars
// https://avro.apache.org/docs/1.12.0/specification/#names
// see https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/schema/SchemaNameAdjuster.java
func SanitizeName(name string) string {
changed := false
var sb strings.Builder
for i, c := range name {
if i == 0 && !isValidFirstCharacter(c) {
sb.WriteString(replacementChar)
if c >= '0' && c <= '9' {
sb.WriteRune(c)
}
changed = true
} else if !isValidNonFirstCharacter(c) {
b := []byte(string(c))
for k := 0; k < len(b); k++ {
sb.WriteString(replacementChar)
}
changed = true
} else {
sb.WriteRune(c)
}
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Name is potentially not safe for serialization, replace it",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName
}

// SanitizeTopicName escapes not permitted chars for topic name
// https://github.com/debezium/debezium/blob/main/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java
func SanitizeTopicName(name string) string {
changed := false
var sb strings.Builder
for _, c := range name {
if !isValidNonFirstCharacterForTopicName(c) {
b := []byte(string(c))
for k := 0; k < len(b); k++ {
sb.WriteString(replacementChar)
}
changed = true
} else {
sb.WriteRune(c)
}
}

sanitizedName := sb.String()
if changed {
log.Warn(
"Table name sanitize",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
)
}
return sanitizedName
}
Loading

0 comments on commit 76ebb37

Please sign in to comment.