Skip to content

Commit

Permalink
sink: fix Canal sink bugs (#966)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored Sep 24, 2020
1 parent d5a0aeb commit d1149b7
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
26 changes: 19 additions & 7 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated
func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowData, error) {
var columns []*canal.Column
for _, column := range e.Columns {
if e == nil {
if column == nil {
continue
}
c, err := b.buildColumn(column, column.Name, !e.IsDelete())
Expand All @@ -227,7 +227,7 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowDa
}
var preColumns []*canal.Column
for _, column := range e.PreColumns {
if e == nil {
if column == nil {
continue
}
c, err := b.buildColumn(column, column.Name, !e.IsDelete())
Expand Down Expand Up @@ -374,16 +374,22 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage,

// Build implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if len(d.messages.Messages) == 0 {
return nil
}

err := d.refreshPacketBody()
if err != nil {
log.Fatal("Error when generating Canal packet", zap.Error(err))
}

value, err := proto.Marshal(d.packet)
if err != nil {
log.Fatal("Error when serializing Canal packet", zap.Error(err))
}
ret := NewMQMessage(nil, value, 0)
d.messages.Reset()
d.resetPacket()
return []*MQMessage{ret}
}

Expand Down Expand Up @@ -414,24 +420,30 @@ func (d *CanalEventBatchEncoder) refreshPacketBody() error {
if newSize > oldSize {
// resize packet body slice
d.packet.Body = append(d.packet.Body, make([]byte, newSize-oldSize)...)
} else {
d.packet.Body = d.packet.Body[:newSize]
}
_, err := d.messages.MarshalToSizedBuffer(d.packet.Body[:newSize])

_, err := d.messages.MarshalToSizedBuffer(d.packet.Body)
return err
}

// NewCanalEventBatchEncoder creates a new CanalEventBatchEncoder.
func NewCanalEventBatchEncoder() EventBatchEncoder {
p := &canal.Packet{
func (d *CanalEventBatchEncoder) resetPacket() {
d.packet = &canal.Packet{
VersionPresent: &canal.Packet_Version{
Version: CanalPacketVersion,
},
Type: canal.PacketType_MESSAGES,
}
}

// NewCanalEventBatchEncoder creates a new CanalEventBatchEncoder.
func NewCanalEventBatchEncoder() EventBatchEncoder {
encoder := &CanalEventBatchEncoder{
messages: &canal.Messages{},
packet: p,
entryBuilder: NewCanalEntryBuilder(),
}

encoder.resetPacket()
return encoder
}
6 changes: 6 additions & 0 deletions cdc/sink/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) {
}
size := encoder.Size()
res := encoder.Build()

if len(cs) == 0 {
c.Assert(res, check.IsNil)
continue
}

c.Assert(res, check.HasLen, 1)
c.Assert(res[0].Key, check.IsNil)
c.Assert(len(res[0].Value), check.Equals, size)
Expand Down

0 comments on commit d1149b7

Please sign in to comment.