Skip to content

Commit

Permalink
refactor(storagenode): remove synchronous Append
Browse files Browse the repository at this point in the history
This PR removes the synchronous Append method from Executor, which is unused
code. It also updates tests to use the asynchronous AppendAsync method and
ensures proper handling of AppendTask in tests.
  • Loading branch information
ijsong committed Feb 3, 2025
1 parent efdf19f commit 99f94ef
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 103 deletions.
81 changes: 0 additions & 81 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,60 +172,6 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append
return nil
}

// Append appends a batch of logs to the log stream.
func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.AppendResult, error) {
lse.inflight.Add(1)
lse.inflightAppend.Add(1)

defer func() {
lse.inflightAppend.Add(-1)
lse.inflight.Add(-1)
}()

switch lse.esm.load() {
case executorStateSealing, executorStateSealed, executorStateLearning:
return nil, verrors.ErrSealed
case executorStateClosed:
return nil, verrors.ErrClosed
}

if !lse.isPrimary() {
return nil, snerrors.ErrNotPrimary
}

startTime := time.Now()
var preparationDuration time.Duration
dataBatchLen := len(dataBatch)

apc := appendContext{
awgs: make([]*appendWaitGroup, 0, dataBatchLen),
}

defer func() {
if lse.lsm == nil {
return
}
lse.lsm.AppendLogs.Add(int64(dataBatchLen))
lse.lsm.AppendBytes.Add(apc.totalBytes)
lse.lsm.AppendDuration.Add(time.Since(startTime).Microseconds())
lse.lsm.AppendOperations.Add(1)
lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds())

// TODO: Set a correct error code.
lse.lsm.LogRPCServerBatchSize.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, apc.totalBytes)
lse.lsm.LogRPCServerLogEntriesPerBatch.Record(context.Background(), telemetry.RPCKindAppend, codes.OK, int64(dataBatchLen))
}()

lse.prepareAppendContext(dataBatch, &apc)
preparationDuration = time.Since(startTime)
lse.sendSequenceTask(ctx, apc.st)
res, err := lse.waitForCompletionOfAppends(ctx, dataBatchLen, apc.awgs)
if err == nil {
apc.wwg.release()
}
return res, err
}

func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext) {
numBackups := len(lse.primaryBackups) - 1

Expand Down Expand Up @@ -275,30 +221,3 @@ func (lse *Executor) sendSequenceTask(ctx context.Context, st *sequenceTask) {
st.release()
}
}

func (lse *Executor) waitForCompletionOfAppends(ctx context.Context, dataBatchLen int, awgs []*appendWaitGroup) ([]snpb.AppendResult, error) {
var err error
result := make([]snpb.AppendResult, dataBatchLen)
for i := range awgs {
cerr := awgs[i].wait(ctx)
if cerr != nil {
result[i].Error = cerr.Error()
if err == nil {
err = cerr
}
continue
}
if err != nil {
lse.logger.Panic("Results of batch requests of Append RPC must not be interleaved with success and failure", zap.Error(err))
}
result[i].Meta.TopicID = lse.tpid
result[i].Meta.LogStreamID = lse.lsid
result[i].Meta.GLSN = awgs[i].glsn
result[i].Meta.LLSN = awgs[i].llsn
awgs[i].release()
}
if result[0].Meta.GLSN.Invalid() {
return nil, err
}
return result, nil
}
125 changes: 103 additions & 22 deletions internal/storagenode/logstream/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func TestExecutor_Closed(t *testing.T) {
assert.NoError(t, lse.Close())
assert.Equal(t, executorStateClosed, lse.esm.load())

_, err := lse.Append(context.Background(), TestNewBatchData(t, 1, 0))
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), TestNewBatchData(t, 1, 0), appendTask)
assert.ErrorIs(t, err, verrors.ErrClosed)

err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0))
Expand Down Expand Up @@ -168,7 +170,9 @@ func TestExecutor_Sealing(t *testing.T) {
assert.Equal(t, varlogpb.LogStreamStatusSealing, st)
assert.Equal(t, executorStateSealing, lse.esm.load())

_, err = lse.Append(context.Background(), TestNewBatchData(t, 1, 0))
appendTask := NewAppendTask()
defer appendTask.Release()
err = lse.AppendAsync(context.Background(), TestNewBatchData(t, 1, 0), appendTask)
assert.ErrorIs(t, err, verrors.ErrSealed)
},
},
Expand Down Expand Up @@ -282,7 +286,9 @@ func TestExecutor_Sealed(t *testing.T) {
assert.Equal(t, varlogpb.LogStreamStatusSealed, st)
assert.Equal(t, executorStateSealed, lse.esm.load())

_, err = lse.Append(context.Background(), TestNewBatchData(t, 1, 0))
appendTask := NewAppendTask()
defer appendTask.Release()
err = lse.AppendAsync(context.Background(), TestNewBatchData(t, 1, 0), appendTask)
assert.ErrorIs(t, err, verrors.ErrSealed)

err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0))
Expand All @@ -294,7 +300,9 @@ func testUnsealInitialExecutor(t *testing.T, lse *Executor, replicas []varlogpb.
assert.NoError(t, err)
assert.Equal(t, varlogpb.LogStreamStatusSealing, lsmd.Status)

_, err = lse.Append(context.Background(), [][]byte{[]byte("hello")})
appendTask := NewAppendTask()
defer appendTask.Release()
err = lse.AppendAsync(context.Background(), [][]byte{[]byte("hello")}, appendTask)
assert.Error(t, err)

st, localHWM, err := lse.Seal(context.Background(), lastGLSN)
Expand Down Expand Up @@ -481,8 +489,10 @@ func TestExecutor_Append(t *testing.T) {

// backup
if tc.isErr {
_, err := lse.Append(context.Background(), [][]byte{nil})
appendTask := NewAppendTask()
err := lse.AppendAsync(context.Background(), [][]byte{nil}, appendTask)
assert.Error(t, err)
appendTask.Release()
return
}

Expand All @@ -508,7 +518,11 @@ func TestExecutor_Append(t *testing.T) {
go func() {
defer wg.Done()
batch := TestNewBatchData(t, batchLen, 0)
_, err := lse.Append(context.Background(), batch)
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), batch, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
}
Expand Down Expand Up @@ -733,7 +747,13 @@ func TestExecutor_AppendSeal(t *testing.T) {
go func() {
defer wg.Done()
for {
_, err := lse.Append(context.Background(), [][]byte{[]byte("hello")})
appendTask := NewAppendTask()
if err := lse.AppendAsync(context.Background(), [][]byte{[]byte("hello")}, appendTask); err != nil {
appendTask.Release()
break
}
_, err := appendTask.WaitForCompletion(context.Background())
appendTask.Release()
if err != nil {
break
}
Expand Down Expand Up @@ -805,10 +825,17 @@ func TestExecutor_AppendSeal(t *testing.T) {
go func() {
defer wg.Done()
for {
_, err := lse.Append(context.Background(), [][]byte{[]byte("hello")})
appendTask := NewAppendTask()
if err := lse.AppendAsync(context.Background(), [][]byte{[]byte("hello")}, appendTask); err != nil {
appendTask.Release()
break
}
_, err := appendTask.WaitForCompletion(context.Background())
appendTask.Release()
if err != nil {
break
}

}
}()
}
Expand Down Expand Up @@ -1305,7 +1332,11 @@ func TestExecutor_SubscribeWithGLSN(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := lse.Append(context.Background(), [][]byte{msg})
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), [][]byte{msg}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
}
Expand Down Expand Up @@ -1353,8 +1384,12 @@ func TestExecutor_Subscribe(t *testing.T) {
defer wg.Done()
for i := 0; i < numLogs; i++ {
data := []byte(strconv.Itoa(int(expectedGLSN)))
res, err := lse.Append(context.Background(), [][]byte{data})
appendTask := NewAppendTask()
err := lse.AppendAsync(context.Background(), [][]byte{data}, appendTask)
assert.NoError(t, err)
res, err := appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
appendTask.Release()
assert.Equal(t, []snpb.AppendResult{{
Meta: varlogpb.LogEntryMeta{
TopicID: lse.tpid,
Expand Down Expand Up @@ -1492,7 +1527,11 @@ func TestExecutor_Subscribe(t *testing.T) {
go func() {
defer appendWg.Done()
data := []byte(strconv.Itoa(int(expectedGLSN)))
res, err := lse.Append(context.Background(), [][]byte{data})
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), [][]byte{data}, appendTask)
assert.NoError(t, err)
res, err := appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
assert.Equal(t, []snpb.AppendResult{{
Meta: varlogpb.LogEntryMeta{
Expand Down Expand Up @@ -1653,10 +1692,16 @@ func TestExecutor_Recover(t *testing.T) {
wg.Done()
}()
for {
_, err := lse.Append(context.Background(), [][]byte{[]byte("hello")})
appendTask := NewAppendTask()
err := lse.AppendAsync(context.Background(), [][]byte{[]byte("hello")}, appendTask)
if err == nil {
continue
_, err = appendTask.WaitForCompletion(context.Background())
if err == nil {
appendTask.Release()
continue
}
}
appendTask.Release()
if assert.ErrorIs(t, err, verrors.ErrClosed) {
return
}
Expand Down Expand Up @@ -2015,12 +2060,20 @@ func TestExecutor_SealAfterRestart(t *testing.T) {
wg.Add(2)
go func() {
defer wg.Done()
_, err := lse.Append(context.Background(), [][]byte{[]byte("foo")})
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), [][]byte{[]byte("foo")}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
_, err := lse.Append(context.Background(), [][]byte{[]byte("foo")})
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), [][]byte{[]byte("foo")}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -2095,12 +2148,20 @@ func TestExecutor_SealAfterRestart(t *testing.T) {
wg.Add(2)
go func() {
defer wg.Done()
_, err := lse.Append(context.Background(), [][]byte{[]byte("foo")})
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), [][]byte{[]byte("foo")}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
_, err := lse.Append(context.Background(), [][]byte{[]byte("foo")})
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), [][]byte{[]byte("foo")}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -2165,12 +2226,20 @@ func TestExecutor_SealAfterRestart(t *testing.T) {
wg.Add(2)
go func() {
defer wg.Done()
_, err := lse.Append(context.Background(), [][]byte{[]byte("foo")})
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), [][]byte{[]byte("foo")}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
_, err := lse.Append(context.Background(), [][]byte{[]byte("foo")})
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), [][]byte{[]byte("foo")}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -3075,7 +3144,11 @@ func TestExecutorSyncInit(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := dst.Append(context.Background(), [][]byte{[]byte("foo")})
appendTask := NewAppendTask()
defer appendTask.Release()
err := dst.AppendAsync(context.Background(), [][]byte{[]byte("foo")}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
}
Expand Down Expand Up @@ -3483,7 +3556,11 @@ func TestExecutorSyncReplicate(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := dst.Append(context.Background(), [][]byte{[]byte("foo")})
appendTask := NewAppendTask()
defer appendTask.Release()
err := dst.AppendAsync(context.Background(), [][]byte{[]byte("foo")}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
}
Expand Down Expand Up @@ -3566,7 +3643,11 @@ func TestExecutor_Trim(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := lse.Append(context.Background(), [][]byte{[]byte("hello")})
appendTask := NewAppendTask()
defer appendTask.Release()
err := lse.AppendAsync(context.Background(), [][]byte{[]byte("hello")}, appendTask)
assert.NoError(t, err)
_, err = appendTask.WaitForCompletion(context.Background())
assert.NoError(t, err)
}()
}
Expand Down

0 comments on commit 99f94ef

Please sign in to comment.