Skip to content

Commit

Permalink
fix(core): fix deadlock in runMutation and txn.Update() (#9085)
Browse files Browse the repository at this point in the history
We call txn.Update() once the transaction has finished. This moves all
the uncommited posting lists to a delta map in the txn. This can cause a
deadlock with runMutation. Mostly we have seen it when a transaction has
a timeout. RunMutation keeps on going, while txn.Update() gets
triggered. This PR fixes it by not calling txn.Update() if the
transaction has failed. We would futher look into cancelling runMutation
at that time too.

Fixes:
https://linear.app/hypermode/issue/DGR-477/dgraph-v24-alpha-2-hangs

---------

Co-authored-by: ShivajiKharse <[email protected]>
Co-authored-by: shivaji-dgraph <[email protected]>
  • Loading branch information
3 people authored May 19, 2024
1 parent c437860 commit d3d9c5b
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 10 deletions.
9 changes: 7 additions & 2 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (txn *Txn) addConflictKey(conflictKey uint64) {
}

// FillContext updates the given transaction context with data from this transaction.
func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) {
func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32, isErrored bool) {
txn.Lock()
ctx.StartTs = txn.StartTs

Expand All @@ -249,7 +249,12 @@ func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) {
ctx.Keys = x.Unique(ctx.Keys)

txn.Unlock()
txn.Update()
// If the trasnaction has errored out, we don't need to update it, as these values will never be read.
// Sometimes, the transaction might have failed due to timeout. If we let this trasnactino update, there
// could be deadlock with the running transaction.
if !isErrored {
txn.Update()
}
txn.cache.fillPreds(ctx, gid)
}

Expand Down
43 changes: 43 additions & 0 deletions query/vector/vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/dgraph-io/dgo/v230/protos/api"
"github.com/dgraph-io/dgraph/dgraphtest"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -432,6 +433,48 @@ func TestVectorsMutateFixedLengthWithDiffrentIndexes(t *testing.T) {
dropPredicate("vtest")
}

func TestVectorDeadlockwithTimeout(t *testing.T) {
pred := "vtest1"
dc = dgraphtest.NewComposeCluster()
var cleanup func()
client, cleanup, err := dc.Client()
x.Panic(err)
defer cleanup()

for i := 0; i < 5; i++ {
fmt.Println("Testing iteration: ", i)
ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
err = client.LoginIntoNamespace(ctx, dgraphtest.DefaultUser,
dgraphtest.DefaultPassword, x.GalaxyNamespace)
require.NoError(t, err)

err = client.Alter(context.Background(), &api.Operation{
DropAttr: pred,
})
dropPredicate(pred)
setSchema(fmt.Sprintf(vectorSchemaWithIndex, pred, "4", "euclidian"))
numVectors := 1000
vectorSize := 10

randomVectors, _ := generateRandomVectors(numVectors, vectorSize, pred)

txn := client.NewTxn()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer func() { _ = txn.Discard(ctx) }()
defer cancel()

_, err = txn.Mutate(ctx, &api.Mutation{
SetNquads: []byte(randomVectors),
CommitNow: true,
})
require.Error(t, err)

err = txn.Commit(ctx)
require.Contains(t, err.Error(), "Transaction has already been committed or discarded")
}
}

func TestVectorMutateDiffrentLengthWithDiffrentIndexes(t *testing.T) {
dropPredicate("vtest")

Expand Down
51 changes: 51 additions & 0 deletions systest/vector/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,54 @@ func TestVectorBackupRestoreDropIndex(t *testing.T) {
}
}
}

func TestVectorBackupRestoreReIndexing(t *testing.T) {
conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour)
c, err := dgraphtest.NewLocalCluster(conf)
require.NoError(t, err)
defer func() { c.Cleanup(t.Failed()) }()
require.NoError(t, c.Start())

gc, cleanup, err := c.Client()
require.NoError(t, err)
defer cleanup()
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace))

hc, err := c.HTTPClient()
require.NoError(t, err)
require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser,
dgraphtest.DefaultPassword, x.GalaxyNamespace))

require.NoError(t, gc.SetupSchema(testSchema))

numVectors := 1000
pred := "project_discription_v"
rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 10, pred)

mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)

t.Log("taking backup \n")
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))

rdfs2, vectors2 := dgraphtest.GenerateRandomVectors(numVectors, numVectors+300, 10, pred)

mu = &api.Mutation{SetNquads: []byte(rdfs2), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)
t.Log("restoring backup \n")
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 2, 1))
require.NoError(t, dgraphtest.WaitForRestore(c))

for i := 0; i < 5; i++ {
// drop index
require.NoError(t, gc.SetupSchema(testSchemaWithoutIndex))
// add index
require.NoError(t, gc.SetupSchema(testSchema))
}
vectors = append(vectors, vectors2...)
rdfs = rdfs + rdfs2
testVectorQuery(t, gc, vectors, rdfs, pred, numVectors)
}
5 changes: 3 additions & 2 deletions tok/hnsw/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,10 @@ func (ph *persistentHNSW[T]) createEntryAndStartNodes(
err := ph.getVecFromUid(entry, c, vec)
if err != nil || len(*vec) == 0 {
// The entry vector has been deleted. We have to create a new entry vector.
entry, err := ph.PickStartNode(ctx, c, vec)
entry, err := ph.calculateNewEntryVec(ctx, c, vec)
if err != nil {
return 0, []*index.KeyValue{}, err
// No other node exists, go with the new node that has come
return create_edges(inUuid)
}
return create_edges(entry)
}
Expand Down
16 changes: 13 additions & 3 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,19 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
errCh <- process(m.Edges[start:end])
}(start, end)
}
// Earlier we were returning after even if one thread had an error. We should wait for
// all the transactions to finish. We call txn.Update() when this function exists. This could cause
// a deadlock with runMutation.
var errs error
for i := 0; i < numGo; i++ {
if err := <-errCh; err != nil {
return err
if errs == nil {
errs = errors.New("Got error while running mutation")
}
errs = errors.Wrapf(err, errs.Error())
}
}
return nil
return errs
}

func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error {
Expand Down Expand Up @@ -839,7 +846,10 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
if txn == nil {
return
}
txn.Update()
// If the transaction has failed, we dont need to update it.
if commit != 0 {
txn.Update()
}
// We start with 20 ms, so that we end up waiting 5 mins by the end.
// If there is any transient issue, it should get fixed within that timeframe.
err := x.ExponentialRetry(int(x.Config.MaxRetries),
Expand Down
7 changes: 4 additions & 3 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,9 @@ func Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
return c.Timestamps(ctx, num)
}

func fillTxnContext(tctx *api.TxnContext, startTs uint64) {
func fillTxnContext(tctx *api.TxnContext, startTs uint64, isErrored bool) {
if txn := posting.Oracle().GetTxn(startTs); txn != nil {
txn.FillContext(tctx, groups().groupId())
txn.FillContext(tctx, groups().groupId(), isErrored)
}
// We do not need to fill linread mechanism anymore, because transaction
// start ts is sufficient to wait for, to achieve lin reads.
Expand Down Expand Up @@ -950,7 +950,8 @@ func (w *grpcWorker) proposeAndWait(ctx context.Context, txnCtx *api.TxnContext,

node := groups().Node
err := node.proposeAndWait(ctx, &pb.Proposal{Mutations: m})
fillTxnContext(txnCtx, m.StartTs)
// When we are filling txn context, we don't need to update latest delta if the transaction has failed.
fillTxnContext(txnCtx, m.StartTs, err != nil)
return err
}

Expand Down

0 comments on commit d3d9c5b

Please sign in to comment.