Skip to content

Commit

Permalink
sink(cdc): clear memquota when restart a table sink (#9091) (#9094)
Browse files Browse the repository at this point in the history
close #9092
  • Loading branch information
ti-chi-bot authored May 29, 2023
1 parent a331ede commit 6330ad2
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 9 deletions.
22 changes: 18 additions & 4 deletions cdc/processor/memquota/mem_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,27 @@ func (m *MemQuota) Release(span tablepb.Span, resolved model.ResolvedTs) {
}
}

// Clean all records of the table.
// RemoveTable clears all records of the table and remove the table.
// Return the cleaned memory quota.
func (m *MemQuota) Clean(span tablepb.Span) uint64 {
func (m *MemQuota) RemoveTable(span tablepb.Span) uint64 {
m.mu.Lock()
defer m.mu.Unlock()
cleaned := m.clear(span)
m.tableMemory.Delete(span)
m.mu.Unlock()
return cleaned
}

// ClearTable is like RemoveTable but only clear the memory usage records but doesn't
// remove the table.
func (m *MemQuota) ClearTable(span tablepb.Span) uint64 {
m.mu.Lock()
cleaned := m.clear(span)
m.tableMemory.ReplaceOrInsert(span, make([]*MemConsumeRecord, 0, 2))
m.mu.Unlock()
return cleaned
}

func (m *MemQuota) clear(span tablepb.Span) uint64 {
if _, ok := m.tableMemory.Get(span); !ok {
// This can happen when the table has no data and never been recorded.
log.Warn("Table consumed memory records not found",
Expand All @@ -241,7 +256,6 @@ func (m *MemQuota) Clean(span tablepb.Span) uint64 {
for _, record := range records {
cleaned += record.Size
}
m.tableMemory.Delete(span)

if m.usedBytes.Add(^(cleaned - 1)) < m.totalBytes {
m.blockAcquireCond.Broadcast()
Expand Down
5 changes: 4 additions & 1 deletion cdc/processor/memquota/mem_quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ func TestMemQuotaRecordAndClean(t *testing.T) {
require.False(t, m.hasAvailable(1))

// clean the all memory.
cleanedBytes := m.Clean(span)
cleanedBytes := m.ClearTable(span)
require.Equal(t, uint64(300), cleanedBytes)
require.True(t, m.hasAvailable(100))

cleanedBytes = m.RemoveTable(span)
require.Equal(t, uint64(0), cleanedBytes)
}
8 changes: 4 additions & 4 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,8 +833,8 @@ func (m *SinkManager) AsyncStopTable(span tablepb.Span) bool {
zap.Stringer("span", &span))
}
if tableSink.(*tableSinkWrapper).asyncClose() {
cleanedBytes := m.sinkMemQuota.Clean(span)
cleanedBytes += m.redoMemQuota.Clean(span)
cleanedBytes := m.sinkMemQuota.RemoveTable(span)
cleanedBytes += m.redoMemQuota.RemoveTable(span)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand Down Expand Up @@ -904,8 +904,8 @@ func (m *SinkManager) GetTableState(span tablepb.Span) (tablepb.TableState, bool
// if necessary. It's better to remove the dirty logic in the future.
tableSink := wrapper.(*tableSinkWrapper)
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() {
cleanedBytes := m.sinkMemQuota.Clean(span)
cleanedBytes += m.redoMemQuota.Clean(span)
cleanedBytes := m.sinkMemQuota.RemoveTable(span)
cleanedBytes += m.redoMemQuota.RemoveTable(span)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// at the checkpoint position.
case tablesink.SinkInternalError:
task.tableSink.clearTableSink()
// After the table sink is cleared all pending events are sent out or dropped.
// So we can re-add the table into sinkMemQuota.
w.sinkMemQuota.ClearTable(task.tableSink.span)

// Restart the table sink based on the checkpoint position.
if finalErr = task.tableSink.restart(ctx); finalErr == nil {
ckpt := task.tableSink.getCheckpointTs().ResolvedMark()
Expand Down

0 comments on commit 6330ad2

Please sign in to comment.