From 51877478a89112f26b1db60b07949a7493767934 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Thu, 28 Feb 2019 17:32:46 +0100 Subject: [PATCH 1/4] Alternative support for Mongo Transactions in v4 Needs some testing around multiple version handling, and proper error messages if you try to use transactions with Mongo versions that don't support them. We'll also need more testing around multi threading, etc. This was mostly a proof of concept that it could be done without a separate 'Transaction' object that users need to worry about. It also supports having Query be transaction aware. This does make sure that Start/Commit/AbortTransaction all do essentially what you want, and it doesn't involve any client-side changes. If you call Session.StartTransaction then all Update/Insert/Remove/Find requests will be done in that context, and won't be visible to other sessions. There is also still a fair bit more that can be done wrt things like WriteConcern. I mostly wanted to get feedback if people like how this is structured and where it is going. As far as I can tell, while there is a txnNumber associated with a given transaction, Mongo doesn't actually support having >1 open transaction for a given session, so it isn't worth tracking transaction objects for users. --- session.go | 159 ++++++++++++++++++++++++++++++++-- session_test.go | 225 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 368 insertions(+), 16 deletions(-) diff --git a/session.go b/session.go index f2d957fac..be65f66a4 100644 --- a/session.go +++ b/session.go @@ -110,6 +110,25 @@ type Session struct { slaveOk bool dialInfo *DialInfo + sessionId bson.Binary + nextTransactionNumber int64 + transaction *transaction +} + +type sessionId struct { + Id bson.Binary `bson:"id"` +} + +type sessionInfo struct { + Id sessionId `bson:"id"` + TimeoutMinutes int `bson:"timeoutMinutes"` +} + +type transaction struct { + number int64 + sessionId bson.Binary + started bool + finished bool } // Database holds collections of documents @@ -3840,6 +3859,7 @@ Error: func (q *Query) One(result interface{}) (err error) { q.m.Lock() session := q.session + txn := session.transaction op := q.op // Copy. q.m.Unlock() @@ -3853,7 +3873,7 @@ func (q *Query) One(result interface{}) (err error) { session.prepareQuery(&op) - expectFindReply := prepareFindOp(socket, &op, 1) + expectFindReply := prepareFindOp(socket, &op, 1, txn) data, err := socket.SimpleQuery(&op) if err != nil { @@ -3897,7 +3917,7 @@ func (q *Query) One(result interface{}) (err error) { // a new-style find command if that's supported by the MongoDB server (3.2+). // It returns whether to expect a find command result or not. Note op may be // translated into an explain command, in which case the function returns false. -func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool { +func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32, txn *transaction) bool { if socket.ServerInfo().MaxWireVersion < 4 || op.collection == "admin.$cmd" { return false } @@ -3936,6 +3956,20 @@ func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool { find.BatchSize = op.limit } + if txn != nil { + if txn.finished { + // ??? + } + if !txn.started { + txn.started = true + find.StartTransaction = true + } + find.TXNNumber = txn.number + find.LSID = bson.D{{Name: "id", Value: txn.sessionId}} + autocommit := false + find.Autocommit = &autocommit + } + explain := op.options.Explain op.collection = op.collection[:nameDot] + ".$cmd" @@ -3990,6 +4024,10 @@ type findCmd struct { NoCursorTimeout bool `bson:"noCursorTimeout,omitempty"` AllowPartialResults bool `bson:"allowPartialResults,omitempty"` Collation *Collation `bson:"collation,omitempty"` + LSID bson.D `bson:"lsid,omitempty"` + TXNNumber int64 `bson:"txnNumber,omitempty"` + Autocommit *bool `bson:"autocommit,omitempty"` + StartTransaction bool `bson:"startTransaction,omitempty"` } // readLevel provides the nested "level: majority" serialisation needed for the @@ -4200,6 +4238,7 @@ func (s *Session) DatabaseNames() (names []string, err error) { func (q *Query) Iter() *Iter { q.m.Lock() session := q.session + txn := session.transaction op := q.op prefetch := q.prefetch limit := q.limit @@ -4227,7 +4266,7 @@ func (q *Query) Iter() *Iter { session.prepareQuery(&op) op.replyFunc = iter.op.replyFunc - if prepareFindOp(socket, &op, limit) { + if prepareFindOp(socket, &op, limit, txn) { iter.isFindCmd = true } @@ -5654,13 +5693,13 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int } var cmd bson.D + txn := c.Database.Session.transaction switch op := op.(type) { case *insertOp: // http://docs.mongodb.org/manual/reference/command/insert cmd = bson.D{ {Name: "insert", Value: c.Name}, {Name: "documents", Value: op.documents}, - {Name: "writeConcern", Value: writeConcern}, {Name: "ordered", Value: op.flags&1 == 0}, } case *updateOp: @@ -5668,7 +5707,6 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int cmd = bson.D{ {Name: "update", Value: c.Name}, {Name: "updates", Value: []interface{}{op}}, - {Name: "writeConcern", Value: writeConcern}, {Name: "ordered", Value: ordered}, } case bulkUpdateOp: @@ -5676,7 +5714,6 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int cmd = bson.D{ {Name: "update", Value: c.Name}, {Name: "updates", Value: op}, - {Name: "writeConcern", Value: writeConcern}, {Name: "ordered", Value: ordered}, } case *deleteOp: @@ -5684,7 +5721,6 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int cmd = bson.D{ {Name: "delete", Value: c.Name}, {Name: "deletes", Value: []interface{}{op}}, - {Name: "writeConcern", Value: writeConcern}, {Name: "ordered", Value: ordered}, } case bulkDeleteOp: @@ -5692,13 +5728,26 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int cmd = bson.D{ {Name: "delete", Value: c.Name}, {Name: "deletes", Value: op}, - {Name: "writeConcern", Value: writeConcern}, {Name: "ordered", Value: ordered}, } } if bypassValidation { cmd = append(cmd, bson.DocElem{Name: "bypassDocumentValidation", Value: true}) } + if txn != nil { + if txn.finished { + //TODO ERROR + } + if !txn.started { + cmd = append(cmd, bson.DocElem{Name: "startTransaction", Value: true}) + txn.started = true + } + cmd = append(cmd, bson.DocElem{Name: "autocommit", Value: false}) + cmd = append(cmd, bson.DocElem{Name: "txnNumber", Value: txn.number}) + cmd = append(cmd, bson.DocElem{Name: "lsid", Value: bson.M{"id": txn.sessionId}}) + } else { + cmd = append(cmd, bson.DocElem{Name: "writeConcern", Value: writeConcern}) + } var result writeCmdResult err = c.Database.run(socket, cmd, &result) @@ -5811,3 +5860,97 @@ func rdnOIDToShortName(oid asn1.ObjectIdentifier) string { return "" } + +func (s *Session) ensureSessionId() error { + if len(s.sessionId.Data) != 0 { + return nil + } + var info sessionInfo + // TODO(jam): 2019-02-27 the startSession call can take a few optional parameters. + // We could put them as Session attributes that we pass along. It seems to be + // things like 'casual consistency' and 'write preference', which we seem to be + // setting elsewhere. + + err := s.Run("startSession", &info) + if err != nil { + return err + } + s.sessionId = info.Id.Id + return nil +} + +func (s *Session) StartTransaction() error { + if err := s.ensureSessionId(); err != nil { + return err + } + if s.transaction != nil { + return errors.New("transaction already started") + } + s.nextTransactionNumber++ + s.transaction = &transaction{ + number: s.nextTransactionNumber, + sessionId: s.sessionId, + started: false, + } + // TODO: readConcern, writeConcern, readPreference can all be set separately for a given transaction + return nil +} + +func (s *Session) finishTransaction(command string) error { + cmd := bson.D{ + {Name: command, Value: 1}, + {Name: "txnNumber", Value: s.transaction.number}, + {Name: "autocommit", Value: false}, + {Name: "lsid", Value: bson.M{"id": s.sessionId}}, + } + return s.Run(cmd, nil) +} + +func (s *Session) CommitTransaction() error { + if len(s.sessionId.Data) == 0 { + // XXX: error?, shouldn't commit if we never called s.ensureSessionId + return nil + } + if s.transaction == nil || !s.transaction.started { + // XXX: error?, no active transaction + return nil + } + if s.transaction.finished { + // XXX: logic error, we shouldn't be able to get here + return nil + } + // XXX: Python has a retry tracking 'retryable' errors around finishTransaction + err := s.finishTransaction("commitTransaction") + // TODO(jam): 2019-02-28 do we need a mutex around this since Insert/Update/etc are potentially different goroutines? + s.transaction.finished = false + s.transaction = nil + if err != nil { + return err + } + return nil +} + +func (s *Session) AbortTransaction() error { + if len(s.sessionId.Data) == 0 { + // XXX: error?, shouldn't commit if we never called s.ensureSessionId + return nil + } + if s.transaction == nil { + // XXX: error?, no active transaction + return nil + } + if !s.transaction.started { + // nothing to do + return nil + } + // XXX: do we want to track a transaction STATE, like they do in Python with states of: + // NONE, STARTING, IN_PROGRESS, ABORTED, COMMITTED, COMMITTED_EMPTY + err := s.finishTransaction("abortTransaction") + // TODO(jam): 2019-02-28 do we need a mutex around this since Insert/Update/etc are potentially different goroutines? + s.transaction.finished = false + s.transaction = nil + if err != nil { + return err + } + return nil +} diff --git a/session_test.go b/session_test.go index 864d6593a..54b742985 100644 --- a/session_test.go +++ b/session_test.go @@ -2028,14 +2028,12 @@ func (s *S) TestQueryComment(c *C) { commentField := "query.$comment" nField := "query.$query.n" - if s.versionAtLeast(3, 2) { - if s.versionAtLeast(3, 6) { - commentField = "command.comment" - nField = "command.filter.n" - } else { - commentField = "query.comment" - nField = "query.filter.n" - } + if s.versionAtLeast(3, 6) { + commentField = "command.comment" + nField = "command.filter.n" + } else if s.versionAtLeast(3, 2) { + commentField = "query.comment" + nField = "query.filter.n" } n, err := session.DB("mydb").C("system.profile").Find(bson.M{nField: 41, commentField: "some comment"}).Count() c.Assert(err, IsNil) @@ -5205,6 +5203,217 @@ func (s *S) TestCollationQueries(c *C) { } } +func (s *S) setup2Sessions(c *C) (*mgo.Session, *mgo.Collection, *mgo.Session, *mgo.Collection) { + // get the test infrastructure ready for doing transactions. + if !s.versionAtLeast(4, 0) { + c.Skip("transactions not supported before 4.0") + } + session1, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + // Collections must be created outside of a transaction + coll1 := session1.DB("mydb").C("mycoll") + err = coll1.Create(&mgo.CollectionInfo{}) + session2 := session1.Copy() + if err != nil { + session1.Close() + c.Assert(err, IsNil) + } + coll2 := session2.DB("mydb").C("mycoll") + return session1, coll1, session2, coll2 +} +func (s *S) TestTransactionInsertCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + var res bson.M + // Should be visible in the session that has the transaction + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + // Since the change was made in a transaction, session 2 should not see the document + err := coll2.Find(bson.M{"a": "a"}).One(&res) + c.Check(err, Equals, mgo.ErrNotFound) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, session2 should see it + err = coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res) + c.Check(err, IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) +} + +func (s *S) TestTransactionInsertAborted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + var res bson.M + // Should be visible in the session that has the transaction + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + // Since the change was made in a transaction, session 2 should not see the document + err := coll2.Find(bson.M{"a": "a"}).One(&res) + c.Check(err, Equals, mgo.ErrNotFound) + c.Assert(session1.AbortTransaction(), IsNil) + // Since it is Aborted, nobody should see the object + err = coll2.Find(bson.M{"a": "a"}).One(&res) + c.Check(err, Equals, mgo.ErrNotFound) + err = coll1.Find(bson.M{"a": "a"}).One(&res) + c.Check(err, Equals, mgo.ErrNotFound) + +} + +func (s *S) TestTransactionUpdateCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + c.Assert(coll1.Update(bson.M{"a": "a"}, bson.M{"$set": bson.M{"b": "c"}}), IsNil) + // Should be visible in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + // Since the change was made in a transaction, session 2 should not see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, session2 should see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) +} + +func (s *S) TestTransactionUpdateAllCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(coll1.Insert(bson.M{"a": "2", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + changeInfo, err := coll1.UpdateAll(nil, bson.M{"$set": bson.M{"b": "c"}}) + c.Assert(err, IsNil) + c.Check(changeInfo.Matched, Equals, 2) + c.Check(changeInfo.Updated, Equals, 2) + // Should be visible in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + c.Assert(coll1.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) + // Since the change was made in a transaction, session 2 should not see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "b"}) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, session2 should see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) +} + +func (s *S) TestTransactionUpsertCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + // One Upsert updates, the other Upsert creates + changeInfo, err := coll1.Upsert(bson.M{"a": "a"}, bson.M{"$set": bson.M{"b": "c"}}) + c.Assert(err, IsNil) + c.Check(changeInfo.Matched, Equals, 1) + c.Check(changeInfo.Updated, Equals, 1) + changeInfo, err = coll1.Upsert(bson.M{"a": "2"}, bson.M{"$set": bson.M{"b": "c"}}) + c.Assert(err, IsNil) + c.Check(changeInfo.Matched, Equals, 0) + c.Check(changeInfo.UpsertedId, NotNil) + // Should be visible in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + c.Assert(coll1.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) + // Since the change was made in a transaction, session 2 should not see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, session2 should see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) +} + +func (s *S) TestTransactionRemoveCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + c.Assert(coll1.Remove(bson.M{"a": "a"}), IsNil) + // Should be gone in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + // Since the change was made in a transaction, session 2 should still see the document + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, it should be gone + c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll2.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) +} + +func (s *S) TestTransactionRemoveAllCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(coll1.Insert(bson.M{"a": "2", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + changeInfo, err := coll1.RemoveAll(bson.M{"a": bson.M{"$exists": true}}) + c.Assert(err, IsNil) + c.Check(changeInfo.Matched, Equals, 2) + c.Check(changeInfo.Removed, Equals, 2) + // Should be gone in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll1.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) + // Since the change was made in a transaction, session 2 should still see the document + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "b"}) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, it should be gone + c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll1.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll2.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll2.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) +} + // -------------------------------------------------------------------------- // Some benchmarks that require a running database. From 3987ed467de2ea000cf54be08f561eb868aa3dc0 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Thu, 14 Mar 2019 15:45:26 +0400 Subject: [PATCH 2/4] Rework all the paths to be threadsafe. They don't give guaranteed ordering, but they do avoid object race conditions. --- session.go | 162 +++++++------ session_test.go | 225 +----------------- transaction_test.go | 567 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 669 insertions(+), 285 deletions(-) create mode 100644 transaction_test.go diff --git a/session.go b/session.go index be65f66a4..9aa8f4ffc 100644 --- a/session.go +++ b/session.go @@ -109,7 +109,7 @@ type Session struct { bypassValidation bool slaveOk bool - dialInfo *DialInfo + dialInfo *DialInfo sessionId bson.Binary nextTransactionNumber int64 transaction *transaction @@ -128,7 +128,6 @@ type transaction struct { number int64 sessionId bson.Binary started bool - finished bool } // Database holds collections of documents @@ -2054,12 +2053,23 @@ func (s *Session) Clone() *Session { // Close terminates the session. It's a runtime error to use a session // after it has been closed. func (s *Session) Close() { + s.m.RLock() + txnActive := s.transaction != nil && s.transaction.started + s.m.RUnlock() + if txnActive { + // No chance to give the user an error + err := s.AbortTransaction() + if err != nil { + logf("abort during Session.Close failed: %v", err) + } + } s.m.Lock() if s.mgoCluster != nil { debugf("Closing session %p", s) s.unsetSocket() s.mgoCluster.Release() s.mgoCluster = nil + s.transaction = nil } s.m.Unlock() } @@ -3859,9 +3869,12 @@ Error: func (q *Query) One(result interface{}) (err error) { q.m.Lock() session := q.session - txn := session.transaction op := q.op // Copy. q.m.Unlock() + session.m.RLock() + txn := session.transaction + startTxn := txn != nil && !txn.started + session.m.RUnlock() socket, err := session.acquireSocket(true) if err != nil { @@ -3873,12 +3886,17 @@ func (q *Query) One(result interface{}) (err error) { session.prepareQuery(&op) - expectFindReply := prepareFindOp(socket, &op, 1, txn) + expectFindReply := prepareFindOp(socket, &op, 1, txn, startTxn) data, err := socket.SimpleQuery(&op) if err != nil { return err } + if startTxn { + session.m.Lock() + txn.started = true + session.m.Unlock() + } if data == nil { return ErrNotFound } @@ -3917,7 +3935,7 @@ func (q *Query) One(result interface{}) (err error) { // a new-style find command if that's supported by the MongoDB server (3.2+). // It returns whether to expect a find command result or not. Note op may be // translated into an explain command, in which case the function returns false. -func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32, txn *transaction) bool { +func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32, txn *transaction, startTxn bool) bool { if socket.ServerInfo().MaxWireVersion < 4 || op.collection == "admin.$cmd" { return false } @@ -3957,11 +3975,7 @@ func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32, txn *transacti } if txn != nil { - if txn.finished { - // ??? - } - if !txn.started { - txn.started = true + if startTxn { find.StartTransaction = true } find.TXNNumber = txn.number @@ -4238,11 +4252,14 @@ func (s *Session) DatabaseNames() (names []string, err error) { func (q *Query) Iter() *Iter { q.m.Lock() session := q.session - txn := session.transaction op := q.op prefetch := q.prefetch limit := q.limit q.m.Unlock() + session.m.RLock() + txn := session.transaction + startTxn := txn != nil && !txn.started + session.m.RUnlock() iter := &Iter{ session: session, @@ -4266,7 +4283,7 @@ func (q *Query) Iter() *Iter { session.prepareQuery(&op) op.replyFunc = iter.op.replyFunc - if prepareFindOp(socket, &op, limit, txn) { + if prepareFindOp(socket, &op, limit, txn, startTxn) { iter.isFindCmd = true } @@ -4278,6 +4295,11 @@ func (q *Query) Iter() *Iter { iter.err = err iter.m.Unlock() } + if startTxn { + session.m.Lock() + txn.started = true + session.m.Unlock() + } return iter } @@ -5514,6 +5536,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err s.m.RLock() safeOp := s.safeOp bypassValidation := s.bypassValidation + txn := s.transaction s.m.RUnlock() if socket.ServerInfo().MaxWireVersion >= 2 { @@ -5529,7 +5552,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err l = len(all) } op.documents = all[i:l] - oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation) + oplerr, err := c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation, txn) lerr.N += oplerr.N lerr.modified += oplerr.modified if err != nil { @@ -5557,7 +5580,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err l = len(updateOp) } - oplerr, err := c.writeOpCommand(socket, safeOp, updateOp[i:l], ordered, bypassValidation) + oplerr, err := c.writeOpCommand(socket, safeOp, updateOp[i:l], ordered, bypassValidation, txn) lerr.N += oplerr.N lerr.modified += oplerr.modified @@ -5583,7 +5606,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err l = len(deleteOps) } - oplerr, err := c.writeOpCommand(socket, safeOp, deleteOps[i:l], ordered, bypassValidation) + oplerr, err := c.writeOpCommand(socket, safeOp, deleteOps[i:l], ordered, bypassValidation, txn) lerr.N += oplerr.N lerr.modified += oplerr.modified @@ -5599,7 +5622,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err } return &lerr, nil } - return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation) + return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation, txn) } else if updateOps, ok := op.(bulkUpdateOp); ok { var lerr LastError for i, updateOp := range updateOps { @@ -5684,7 +5707,7 @@ func (c *Collection) writeOpQuery(socket *mongoSocket, safeOp *queryOp, op inter return result, nil } -func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool) (lerr *LastError, err error) { +func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered, bypassValidation bool, txn *transaction) (lerr *LastError, err error) { var writeConcern interface{} if safeOp == nil { writeConcern = bson.D{{Name: "w", Value: 0}} @@ -5693,7 +5716,6 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int } var cmd bson.D - txn := c.Database.Session.transaction switch op := op.(type) { case *insertOp: // http://docs.mongodb.org/manual/reference/command/insert @@ -5734,19 +5756,25 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int if bypassValidation { cmd = append(cmd, bson.DocElem{Name: "bypassDocumentValidation", Value: true}) } + started := false if txn != nil { - if txn.finished { - //TODO ERROR - } + c.Database.Session.m.RLock() if !txn.started { cmd = append(cmd, bson.DocElem{Name: "startTransaction", Value: true}) - txn.started = true + started = true } + c.Database.Session.m.RUnlock() cmd = append(cmd, bson.DocElem{Name: "autocommit", Value: false}) cmd = append(cmd, bson.DocElem{Name: "txnNumber", Value: txn.number}) cmd = append(cmd, bson.DocElem{Name: "lsid", Value: bson.M{"id": txn.sessionId}}) } else { - cmd = append(cmd, bson.DocElem{Name: "writeConcern", Value: writeConcern}) + cmd = append(cmd, bson.DocElem{"writeConcern", writeConcern}) + } + + if started { + c.Database.Session.m.Lock() + txn.started = started + c.Database.Session.m.Unlock() } var result writeCmdResult @@ -5862,9 +5890,12 @@ func rdnOIDToShortName(oid asn1.ObjectIdentifier) string { } func (s *Session) ensureSessionId() error { + s.m.RLock() if len(s.sessionId.Data) != 0 { + s.m.RUnlock() return nil } + s.m.RUnlock() var info sessionInfo // TODO(jam): 2019-02-27 the startSession call can take a few optional parameters. // We could put them as Session attributes that we pass along. It seems to be @@ -5875,7 +5906,9 @@ func (s *Session) ensureSessionId() error { if err != nil { return err } + s.m.Lock() s.sessionId = info.Id.Id + s.m.Unlock() return nil } @@ -5883,7 +5916,9 @@ func (s *Session) StartTransaction() error { if err := s.ensureSessionId(); err != nil { return err } + s.m.Lock() if s.transaction != nil { + s.m.Unlock() return errors.New("transaction already started") } s.nextTransactionNumber++ @@ -5892,65 +5927,56 @@ func (s *Session) StartTransaction() error { sessionId: s.sessionId, started: false, } + s.m.Unlock() // TODO: readConcern, writeConcern, readPreference can all be set separately for a given transaction return nil } func (s *Session) finishTransaction(command string) error { - cmd := bson.D{ - {Name: command, Value: 1}, - {Name: "txnNumber", Value: s.transaction.number}, - {Name: "autocommit", Value: false}, - {Name: "lsid", Value: bson.M{"id": s.sessionId}}, - } - return s.Run(cmd, nil) -} - -func (s *Session) CommitTransaction() error { + s.m.RLock() if len(s.sessionId.Data) == 0 { - // XXX: error?, shouldn't commit if we never called s.ensureSessionId - return nil + s.m.RUnlock() + return errors.New("no transaction in progress") } - if s.transaction == nil || !s.transaction.started { - // XXX: error?, no active transaction - return nil + if s.transaction == nil { + s.m.RUnlock() + return errors.New("no transaction in progress") } - if s.transaction.finished { - // XXX: logic error, we shouldn't be able to get here - return nil + txn := s.transaction + sessionId := s.sessionId + txnNumber := txn.number + started := txn.started + s.m.RUnlock() + var err error + if started { + // XXX: Python has a retry tracking 'retryable' errors around finishTransaction + cmd := bson.D{ + {Name: command, Value: 1}, + {Name: "txnNumber", Value: txnNumber}, + {Name: "autocommit", Value: false}, + {Name: "lsid", Value: bson.M{"id": sessionId}}, + } + err = s.Run(cmd, nil) } - // XXX: Python has a retry tracking 'retryable' errors around finishTransaction - err := s.finishTransaction("commitTransaction") - // TODO(jam): 2019-02-28 do we need a mutex around this since Insert/Update/etc are potentially different goroutines? - s.transaction.finished = false - s.transaction = nil + s.m.Lock() + if s.transaction == txn { + s.transaction = nil + } else { + // TODO: How to exercise this code? + err = errors.New(fmt.Sprintf("transaction changed during %s, %v != %v", + command, txn, s.transaction)) + } + s.m.Unlock() if err != nil { return err } return nil } +func (s *Session) CommitTransaction() error { + return s.finishTransaction("commitTransaction") +} + func (s *Session) AbortTransaction() error { - if len(s.sessionId.Data) == 0 { - // XXX: error?, shouldn't commit if we never called s.ensureSessionId - return nil - } - if s.transaction == nil { - // XXX: error?, no active transaction - return nil - } - if !s.transaction.started { - // nothing to do - return nil - } - // XXX: do we want to track a transaction STATE, like they do in Python with states of: - // NONE, STARTING, IN_PROGRESS, ABORTED, COMMITTED, COMMITTED_EMPTY - err := s.finishTransaction("abortTransaction") - // TODO(jam): 2019-02-28 do we need a mutex around this since Insert/Update/etc are potentially different goroutines? - s.transaction.finished = false - s.transaction = nil - if err != nil { - return err - } - return nil + return s.finishTransaction("abortTransaction") } diff --git a/session_test.go b/session_test.go index 54b742985..864d6593a 100644 --- a/session_test.go +++ b/session_test.go @@ -2028,12 +2028,14 @@ func (s *S) TestQueryComment(c *C) { commentField := "query.$comment" nField := "query.$query.n" - if s.versionAtLeast(3, 6) { - commentField = "command.comment" - nField = "command.filter.n" - } else if s.versionAtLeast(3, 2) { - commentField = "query.comment" - nField = "query.filter.n" + if s.versionAtLeast(3, 2) { + if s.versionAtLeast(3, 6) { + commentField = "command.comment" + nField = "command.filter.n" + } else { + commentField = "query.comment" + nField = "query.filter.n" + } } n, err := session.DB("mydb").C("system.profile").Find(bson.M{nField: 41, commentField: "some comment"}).Count() c.Assert(err, IsNil) @@ -5203,217 +5205,6 @@ func (s *S) TestCollationQueries(c *C) { } } -func (s *S) setup2Sessions(c *C) (*mgo.Session, *mgo.Collection, *mgo.Session, *mgo.Collection) { - // get the test infrastructure ready for doing transactions. - if !s.versionAtLeast(4, 0) { - c.Skip("transactions not supported before 4.0") - } - session1, err := mgo.Dial("localhost:40011") - c.Assert(err, IsNil) - // Collections must be created outside of a transaction - coll1 := session1.DB("mydb").C("mycoll") - err = coll1.Create(&mgo.CollectionInfo{}) - session2 := session1.Copy() - if err != nil { - session1.Close() - c.Assert(err, IsNil) - } - coll2 := session2.DB("mydb").C("mycoll") - return session1, coll1, session2, coll2 -} -func (s *S) TestTransactionInsertCommitted(c *C) { - session1, coll1, session2, coll2 := s.setup2Sessions(c) - defer session1.Close() - defer session2.Close() - c.Assert(session1.StartTransaction(), IsNil) - // call Abort in case there is a problem, but ignore an error if it was committed, - // otherwise the server will block in DropCollection because the transaction is active. - defer session1.AbortTransaction() - c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) - var res bson.M - // Should be visible in the session that has the transaction - c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) - // Since the change was made in a transaction, session 2 should not see the document - err := coll2.Find(bson.M{"a": "a"}).One(&res) - c.Check(err, Equals, mgo.ErrNotFound) - c.Assert(session1.CommitTransaction(), IsNil) - // Now that it is committed, session2 should see it - err = coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res) - c.Check(err, IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) -} - -func (s *S) TestTransactionInsertAborted(c *C) { - session1, coll1, session2, coll2 := s.setup2Sessions(c) - defer session1.Close() - defer session2.Close() - c.Assert(session1.StartTransaction(), IsNil) - // call Abort in case there is a problem, but ignore an error if it was committed, - // otherwise the server will block in DropCollection because the transaction is active. - defer session1.AbortTransaction() - c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) - var res bson.M - // Should be visible in the session that has the transaction - c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) - // Since the change was made in a transaction, session 2 should not see the document - err := coll2.Find(bson.M{"a": "a"}).One(&res) - c.Check(err, Equals, mgo.ErrNotFound) - c.Assert(session1.AbortTransaction(), IsNil) - // Since it is Aborted, nobody should see the object - err = coll2.Find(bson.M{"a": "a"}).One(&res) - c.Check(err, Equals, mgo.ErrNotFound) - err = coll1.Find(bson.M{"a": "a"}).One(&res) - c.Check(err, Equals, mgo.ErrNotFound) - -} - -func (s *S) TestTransactionUpdateCommitted(c *C) { - session1, coll1, session2, coll2 := s.setup2Sessions(c) - defer session1.Close() - defer session2.Close() - c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) - c.Assert(session1.StartTransaction(), IsNil) - // call Abort in case there is a problem, but ignore an error if it was committed, - // otherwise the server will block in DropCollection because the transaction is active. - defer session1.AbortTransaction() - c.Assert(coll1.Update(bson.M{"a": "a"}, bson.M{"$set": bson.M{"b": "c"}}), IsNil) - // Should be visible in the session that has the transaction - var res bson.M - c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) - // Since the change was made in a transaction, session 2 should not see it - c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) - c.Assert(session1.CommitTransaction(), IsNil) - // Now that it is committed, session2 should see it - c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) -} - -func (s *S) TestTransactionUpdateAllCommitted(c *C) { - session1, coll1, session2, coll2 := s.setup2Sessions(c) - defer session1.Close() - defer session2.Close() - c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) - c.Assert(coll1.Insert(bson.M{"a": "2", "b": "b"}), IsNil) - c.Assert(session1.StartTransaction(), IsNil) - // call Abort in case there is a problem, but ignore an error if it was committed, - // otherwise the server will block in DropCollection because the transaction is active. - defer session1.AbortTransaction() - changeInfo, err := coll1.UpdateAll(nil, bson.M{"$set": bson.M{"b": "c"}}) - c.Assert(err, IsNil) - c.Check(changeInfo.Matched, Equals, 2) - c.Check(changeInfo.Updated, Equals, 2) - // Should be visible in the session that has the transaction - var res bson.M - c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) - c.Assert(coll1.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) - // Since the change was made in a transaction, session 2 should not see it - c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) - c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "2", "b": "b"}) - c.Assert(session1.CommitTransaction(), IsNil) - // Now that it is committed, session2 should see it - c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) - c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) -} - -func (s *S) TestTransactionUpsertCommitted(c *C) { - session1, coll1, session2, coll2 := s.setup2Sessions(c) - defer session1.Close() - defer session2.Close() - c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) - c.Assert(session1.StartTransaction(), IsNil) - // call Abort in case there is a problem, but ignore an error if it was committed, - // otherwise the server will block in DropCollection because the transaction is active. - defer session1.AbortTransaction() - // One Upsert updates, the other Upsert creates - changeInfo, err := coll1.Upsert(bson.M{"a": "a"}, bson.M{"$set": bson.M{"b": "c"}}) - c.Assert(err, IsNil) - c.Check(changeInfo.Matched, Equals, 1) - c.Check(changeInfo.Updated, Equals, 1) - changeInfo, err = coll1.Upsert(bson.M{"a": "2"}, bson.M{"$set": bson.M{"b": "c"}}) - c.Assert(err, IsNil) - c.Check(changeInfo.Matched, Equals, 0) - c.Check(changeInfo.UpsertedId, NotNil) - // Should be visible in the session that has the transaction - var res bson.M - c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) - c.Assert(coll1.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) - // Since the change was made in a transaction, session 2 should not see it - c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) - c.Assert(coll2.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) - c.Assert(session1.CommitTransaction(), IsNil) - // Now that it is committed, session2 should see it - c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) - c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) -} - -func (s *S) TestTransactionRemoveCommitted(c *C) { - session1, coll1, session2, coll2 := s.setup2Sessions(c) - defer session1.Close() - defer session2.Close() - c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) - c.Assert(session1.StartTransaction(), IsNil) - // call Abort in case there is a problem, but ignore an error if it was committed, - // otherwise the server will block in DropCollection because the transaction is active. - defer session1.AbortTransaction() - c.Assert(coll1.Remove(bson.M{"a": "a"}), IsNil) - // Should be gone in the session that has the transaction - var res bson.M - c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) - // Since the change was made in a transaction, session 2 should still see the document - c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) - c.Assert(session1.CommitTransaction(), IsNil) - // Now that it is committed, it should be gone - c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) - c.Assert(coll2.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) -} - -func (s *S) TestTransactionRemoveAllCommitted(c *C) { - session1, coll1, session2, coll2 := s.setup2Sessions(c) - defer session1.Close() - defer session2.Close() - c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) - c.Assert(coll1.Insert(bson.M{"a": "2", "b": "b"}), IsNil) - c.Assert(session1.StartTransaction(), IsNil) - // call Abort in case there is a problem, but ignore an error if it was committed, - // otherwise the server will block in DropCollection because the transaction is active. - defer session1.AbortTransaction() - changeInfo, err := coll1.RemoveAll(bson.M{"a": bson.M{"$exists": true}}) - c.Assert(err, IsNil) - c.Check(changeInfo.Matched, Equals, 2) - c.Check(changeInfo.Removed, Equals, 2) - // Should be gone in the session that has the transaction - var res bson.M - c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) - c.Assert(coll1.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) - // Since the change was made in a transaction, session 2 should still see the document - c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) - c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) - c.Check(res, DeepEquals, bson.M{"a": "2", "b": "b"}) - c.Assert(session1.CommitTransaction(), IsNil) - // Now that it is committed, it should be gone - c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) - c.Assert(coll1.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) - c.Assert(coll2.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) - c.Assert(coll2.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) -} - // -------------------------------------------------------------------------- // Some benchmarks that require a running database. diff --git a/transaction_test.go b/transaction_test.go new file mode 100644 index 000000000..33ff16170 --- /dev/null +++ b/transaction_test.go @@ -0,0 +1,567 @@ +// mgo - MongoDB driver for Go +// +// Copyright (c) 2019 - Canonical Ltd +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package mgo_test + +import ( + "math/rand" + "regexp" + "sync" + "time" + + . "gopkg.in/check.v1" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" +) + +func (s *S) setupTxnSession(c *C) *mgo.Session { + // get the test infrastructure ready for doing transactions. + if !s.versionAtLeast(4, 0) { + c.Skip("transactions not supported before 4.0") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + return session +} + +func (s *S) setupTxnSessionAndCollection(c *C) (*mgo.Session, *mgo.Collection) { + session := s.setupTxnSession(c) + // Collections must be created outside of a transaction + coll := session.DB("mydb").C("mycoll") + err := coll.Create(&mgo.CollectionInfo{}) + if err != nil { + session.Close() + c.Assert(err, IsNil) + } + return session, coll +} + +func (s *S) setup2Sessions(c *C) (*mgo.Session, *mgo.Collection, *mgo.Session, *mgo.Collection) { + session1, coll1 := s.setupTxnSessionAndCollection(c) + session2 := session1.Copy() + coll2 := session2.DB("mydb").C("mycoll") + return session1, coll1, session2, coll2 +} + +func (s *S) TestTransactionInsertCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + var res bson.M + // Should be visible in the session that has the transaction + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + // Since the change was made in a transaction, session 2 should not see the document + err := coll2.Find(bson.M{"a": "a"}).One(&res) + c.Check(err, Equals, mgo.ErrNotFound) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, session2 should see it + err = coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res) + c.Check(err, IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) +} + +func (s *S) TestTransactionInsertAborted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + var res bson.M + // Should be visible in the session that has the transaction + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + // Since the change was made in a transaction, session 2 should not see the document + err := coll2.Find(bson.M{"a": "a"}).One(&res) + c.Check(err, Equals, mgo.ErrNotFound) + c.Assert(session1.AbortTransaction(), IsNil) + // Since it is Aborted, nobody should see the object + err = coll2.Find(bson.M{"a": "a"}).One(&res) + c.Check(err, Equals, mgo.ErrNotFound) + err = coll1.Find(bson.M{"a": "a"}).One(&res) + c.Check(err, Equals, mgo.ErrNotFound) + +} + +func (s *S) TestTransactionUpdateCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + c.Assert(coll1.Update(bson.M{"a": "a"}, bson.M{"$set": bson.M{"b": "c"}}), IsNil) + // Should be visible in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + // Since the change was made in a transaction, session 2 should not see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, session2 should see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) +} + +func (s *S) TestTransactionUpdateAllCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(coll1.Insert(bson.M{"a": "2", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + changeInfo, err := coll1.UpdateAll(nil, bson.M{"$set": bson.M{"b": "c"}}) + c.Assert(err, IsNil) + c.Check(changeInfo.Matched, Equals, 2) + c.Check(changeInfo.Updated, Equals, 2) + // Should be visible in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + c.Assert(coll1.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) + // Since the change was made in a transaction, session 2 should not see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "b"}) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, session2 should see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) +} + +func (s *S) TestTransactionUpsertCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + // One Upsert updates, the other Upsert creates + changeInfo, err := coll1.Upsert(bson.M{"a": "a"}, bson.M{"$set": bson.M{"b": "c"}}) + c.Assert(err, IsNil) + c.Check(changeInfo.Matched, Equals, 1) + c.Check(changeInfo.Updated, Equals, 1) + changeInfo, err = coll1.Upsert(bson.M{"a": "2"}, bson.M{"$set": bson.M{"b": "c"}}) + c.Assert(err, IsNil) + c.Check(changeInfo.Matched, Equals, 0) + c.Check(changeInfo.UpsertedId, NotNil) + // Should be visible in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + c.Assert(coll1.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) + // Since the change was made in a transaction, session 2 should not see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, session2 should see it + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "c"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "c"}) +} + +func (s *S) TestTransactionRemoveCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + c.Assert(coll1.Remove(bson.M{"a": "a"}), IsNil) + // Should be gone in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + // Since the change was made in a transaction, session 2 should still see the document + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, it should be gone + c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll2.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) +} + +func (s *S) TestTransactionRemoveAllCommitted(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(coll1.Insert(bson.M{"a": "2", "b": "b"}), IsNil) + c.Assert(session1.StartTransaction(), IsNil) + // call Abort in case there is a problem, but ignore an error if it was committed, + // otherwise the server will block in DropCollection because the transaction is active. + defer session1.AbortTransaction() + changeInfo, err := coll1.RemoveAll(bson.M{"a": bson.M{"$exists": true}}) + c.Assert(err, IsNil) + c.Check(changeInfo.Matched, Equals, 2) + c.Check(changeInfo.Removed, Equals, 2) + // Should be gone in the session that has the transaction + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll1.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) + // Since the change was made in a transaction, session 2 should still see the document + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(coll2.Find(bson.M{"a": "2"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "2", "b": "b"}) + c.Assert(session1.CommitTransaction(), IsNil) + // Now that it is committed, it should be gone + c.Assert(coll1.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll1.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll2.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(coll2.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) +} + +func (s *S) TestStartAbortTransactionMultithreaded(c *C) { + // While calling StartTransaction doesn't actually make sense, it shouldn't corrupt + // memory to do so. This should trigger go's '-race' detector if we don't + // have the code structured correctly. + session := s.setupTxnSession(c) + defer session.Close() + // Collections must be created outside of a transaction + coll1 := session.DB("mydb").C("mycoll") + err := coll1.Create(&mgo.CollectionInfo{}) + c.Assert(err, IsNil) + var wg sync.WaitGroup + startFunc := func() { + err := session.StartTransaction() + if err != nil { + // Don't use Assert as we are being called in a goroutine + c.Check(err, ErrorMatches, "transaction already started") + } else { + c.Check(session.AbortTransaction(), IsNil) + } + wg.Done() + } + wg.Add(10) + for i := 0; i < 10; i++ { + go startFunc() + } + wg.Wait() +} + +func (s *S) TestStartCommitTransactionMultithreaded(c *C) { + // While calling StartTransaction doesn't actually make sense, it shouldn't corrupt + // memory to do so. This should trigger go's '-race' detector if we don't + // have the code structured correctly. + session := s.setupTxnSession(c) + defer session.Close() + // Collections must be created outside of a transaction + coll1 := session.DB("mydb").C("mycoll") + err := coll1.Create(&mgo.CollectionInfo{}) + c.Assert(err, IsNil) + var wg sync.WaitGroup + startFunc := func() { + err := session.StartTransaction() + if err != nil { + // Don't use Assert as we are being called in a goroutine + c.Check(err, ErrorMatches, "transaction already started") + } else { + c.Check(session.CommitTransaction(), IsNil) + } + wg.Done() + } + wg.Add(10) + for i := 0; i < 10; i++ { + go startFunc() + } + wg.Wait() +} + +func (s *S) TestAbortTransactionNotStarted(c *C) { + session := s.setupTxnSession(c) + defer session.Close() + err := session.AbortTransaction() + c.Assert(err, ErrorMatches, "no transaction in progress") +} + +func (s *S) TestCommitTransactionNotStarted(c *C) { + session := s.setupTxnSession(c) + defer session.Close() + err := session.CommitTransaction() + c.Assert(err, ErrorMatches, "no transaction in progress") +} + +func (s *S) TestAbortTransactionNoChanges(c *C) { + session := s.setupTxnSession(c) + defer session.Close() + c.Assert(session.StartTransaction(), IsNil) + c.Assert(session.AbortTransaction(), IsNil) +} + +func (s *S) TestCommitTransactionNoChanges(c *C) { + session := s.setupTxnSession(c) + defer session.Close() + c.Assert(session.StartTransaction(), IsNil) + c.Assert(session.CommitTransaction(), IsNil) +} + +func (s *S) TestAbortTransactionTwice(c *C) { + session, coll := s.setupTxnSessionAndCollection(c) + defer session.Close() + c.Assert(session.StartTransaction(), IsNil) + // The DB transaction isn't started until we make a change + c.Assert(coll.Insert(bson.M{"a": "a"}), IsNil) + c.Assert(session.AbortTransaction(), IsNil) + err := session.AbortTransaction() + c.Assert(err, ErrorMatches, "no transaction in progress") +} + +func (s *S) TestCommitTransactionTwice(c *C) { + session, coll := s.setupTxnSessionAndCollection(c) + defer session.Close() + c.Assert(session.StartTransaction(), IsNil) + // The DB transaction isn't started until we make a change + c.Assert(coll.Insert(bson.M{"a": "a"}), IsNil) + c.Assert(session.CommitTransaction(), IsNil) + err := session.CommitTransaction() + c.Assert(err, ErrorMatches, "no transaction in progress") +} + +func (s *S) TestStartTransactionTwice(c *C) { + session, coll := s.setupTxnSessionAndCollection(c) + defer session.Close() + c.Assert(session.StartTransaction(), IsNil) + c.Assert(coll.Insert(bson.M{"a": "a"}), IsNil) + c.Assert(session.StartTransaction(), ErrorMatches, "transaction already started") + c.Assert(coll.Insert(bson.M{"b": "b"}), IsNil) + c.Assert(session.CommitTransaction(), IsNil) + // Calling StartTransaction a second time doesn't currently abort + // the txn in progress. Maybe we should as it is a sign that the driver + // is being used incorrectly? + var res bson.M + c.Assert(coll.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a"}) + c.Assert(coll.Find(bson.M{"b": "b"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"b": "b"}) +} + +func (s *S) TestStartCommitAbortStartCommitTransaction(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(session1.StartTransaction(), IsNil) + c.Assert(session1.CommitTransaction(), IsNil) + err := session1.AbortTransaction() + c.Assert(err, ErrorMatches, "no transaction in progress") + // We should be able to recover + c.Assert(session1.StartTransaction(), IsNil) + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + c.Assert(session1.CommitTransaction(), IsNil) + var res bson.M + // Should be visible in the session that has the transaction + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) +} + +func (s *S) TestCloseWithOpenTransaction(c *C) { + session1, coll1, session2, coll2 := s.setup2Sessions(c) + defer session1.Close() + defer session2.Close() + c.Assert(session1.StartTransaction(), IsNil) + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + var res bson.M + c.Assert(coll1.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + // Close should abort the current session, which aborts the active transaction + session1.Close() + c.Assert(coll2.Find(bson.M{"a": "2"}).One(&res), Equals, mgo.ErrNotFound) +} + +func (s *S) TestRefreshDuringTransaction(c *C) { + session, coll := s.setupTxnSessionAndCollection(c) + defer session.Close() + c.Assert(session.StartTransaction(), IsNil) + c.Assert(coll.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + // Refresh closes the active connection, but as long as we preserve the + // SessionId and txnNumber, we should still be able to see the in-progress + // changes + session.Refresh() + var res bson.M + c.Assert(coll.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Assert(session.CommitTransaction(), IsNil) + c.Assert(coll.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) +} + +func (s *S) TestCloneDifferentSessionTransaction(c *C) { + session1, coll1 := s.setupTxnSessionAndCollection(c) + defer session1.Close() + c.Assert(session1.StartTransaction(), IsNil) + c.Assert(coll1.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + session2 := session1.Clone() + defer session2.Close() + coll2 := session2.DB("mydb").C("mycoll") + var res bson.M + c.Check(coll2.Find(bson.M{"a": "a"}).One(&res), Equals, mgo.ErrNotFound) + c.Assert(session2.StartTransaction(), IsNil) + c.Assert(coll2.Insert(bson.M{"a": "second", "b": "c"}), IsNil) + c.Check(coll1.Find(bson.M{"a": "second"}).One(&res), Equals, mgo.ErrNotFound) + session1.CommitTransaction() + session2.CommitTransaction() + c.Check(coll2.Find(bson.M{"a": "a"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "a", "b": "b"}) + c.Check(coll1.Find(bson.M{"a": "second"}).Select(bson.M{"a": 1, "b": 1, "_id": 0}).One(&res), IsNil) + c.Check(res, DeepEquals, bson.M{"a": "second", "b": "c"}) +} + +func (s *S) TestMultithreadedTransactionStartAbortAllActions(c *C) { + session, coll := s.setupTxnSessionAndCollection(c) + defer session.Close() + // It isn't particularly sane to have one thread starting and aborting transactions + // while other goroutines make modifications, but we want to be -race safe. + // This doesn't assert that things are sequenced correctly, just that the driver + // won't break if it is abused. + c.Assert(coll.Insert(bson.M{"a": "a", "b": "b"}), IsNil) + var wg sync.WaitGroup + stop := make(chan struct{}) + wg.Add(1) + // Start and Abort the transaction every millisecond + go func() { + defer wg.Done() + started := false + for { + select { + case <-stop: + if started { + session.AbortTransaction() + started = false + } + return + case <-time.After(1 * time.Millisecond): + if started { + session.AbortTransaction() + started = false + } else { + session.StartTransaction() + started = true + } + } + } + }() + possibleErrors := []string{ + "^Transaction .* has been aborted\\.$", + "^Given transaction number .* does not match any in-progress transactions\\.$", + "^Cannot specify 'startTransaction' on transaction .* since it is already in progress\\.$", + "^Cannot start transaction .* on session .* because a newer transaction .* has already started\\.", + } + timeoutRegex := "i/o timeout" + checkError := func(err error) { + if err == nil { + return + } + if err == mgo.ErrNotFound { + return + } + errStr := err.Error() + for _, errRegex := range possibleErrors { + matched, _ := regexp.MatchString(errRegex, errStr) + if matched { + return + } + } + if matched, _ := regexp.MatchString(timeoutRegex, errStr); matched { + // When we get i/o timeout, that means we have to reset the Session + session.Refresh() + return + } + c.Errorf("error did not match a known response: %v", err.Error()) + } + doInsert := func() { + err := coll.Insert(bson.M{"a": "alt", "b": "something"}) + checkError(err) + } + doUpsert := func() { + _, err := coll.Upsert(bson.M{"a": "upserted"}, bson.M{"$inc": bson.M{"b": 1}}) + checkError(err) + } + doRemove := func() { + err := coll.Remove(bson.M{"a": "a"}) + checkError(err) + } + doFind := func() { + var res bson.M + err := coll.Find(bson.M{"a": "a"}).One(&res) + if err == nil { + c.Check(res["b"], Equals, "b") + } else { + checkError(err) + } + } + funcs := []func(){ + doInsert, + doUpsert, + doRemove, + doFind, + } + // do Insert/Update/Remove/Find concurrently with starting & aborting the transaction + for _, tFunc := range funcs { + wg.Add(1) + go func(f func()) { + for { + nextSleep := time.Duration(rand.Int63n(int64(time.Millisecond))) + select { + case <-stop: + wg.Done() + return + case <-time.After(nextSleep): + f() + } + } + }(tFunc) + } + // Let those run for a bit + time.Sleep(50 * time.Millisecond) + close(stop) + wg.Wait() +} From 222dfb60700a5f97e12180b6489aa343eefbcec1 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Thu, 14 Mar 2019 16:04:56 +0400 Subject: [PATCH 3/4] Use the right package :). globalsign now is thread unsafe if you use .Dial because it calls SetSocketTimeout which mutates dialInfo in an unsafe way. But we can just use DialWithTimeout instead. --- transaction_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/transaction_test.go b/transaction_test.go index 33ff16170..75cf7c165 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -33,8 +33,8 @@ import ( "time" . "gopkg.in/check.v1" - "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" ) func (s *S) setupTxnSession(c *C) *mgo.Session { @@ -42,7 +42,7 @@ func (s *S) setupTxnSession(c *C) *mgo.Session { if !s.versionAtLeast(4, 0) { c.Skip("transactions not supported before 4.0") } - session, err := mgo.Dial("localhost:40011") + session, err := mgo.DialWithTimeout("localhost:40011", 10*time.Second) c.Assert(err, IsNil) return session } From ab102d5e008260fdeb1bfab204e4d59d10b20fff Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Thu, 14 Mar 2019 16:47:25 +0400 Subject: [PATCH 4/4] go fmt and go vet cleanups --- bson/bson_test.go | 6 +++--- session.go | 2 +- transaction_test.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/bson/bson_test.go b/bson/bson_test.go index 8eae0d079..c7f71c5c7 100644 --- a/bson/bson_test.go +++ b/bson/bson_test.go @@ -292,7 +292,7 @@ func (s *S) TestPtrInline(c *C) { // deeper struct with inline pointer { - in := InlineG2{G2: 15, InlineG1: &InlineG1{G1:16, Final: &Final{G0: 23}}} + in := InlineG2{G2: 15, InlineG1: &InlineG1{G1: 16, Final: &Final{G0: 23}}} c.Assert(in.InlineG1, NotNil) c.Assert(in.Final, NotNil) @@ -1272,11 +1272,11 @@ type Final struct { G0 int `bson:"g0,omitempty"` } type InlineG1 struct { - G1 int `bson:"g1,omitempty"` + G1 int `bson:"g1,omitempty"` *Final `bson:",inline"` } type InlineG2 struct { - G2 int `bson:"g2,omitempty"` + G2 int `bson:"g2,omitempty"` *InlineG1 `bson:",inline"` } diff --git a/session.go b/session.go index 9aa8f4ffc..d71d55e2c 100644 --- a/session.go +++ b/session.go @@ -5768,7 +5768,7 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int cmd = append(cmd, bson.DocElem{Name: "txnNumber", Value: txn.number}) cmd = append(cmd, bson.DocElem{Name: "lsid", Value: bson.M{"id": txn.sessionId}}) } else { - cmd = append(cmd, bson.DocElem{"writeConcern", writeConcern}) + cmd = append(cmd, bson.DocElem{Name: "writeConcern", Value: writeConcern}) } if started { diff --git a/transaction_test.go b/transaction_test.go index 75cf7c165..6e99e571e 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -32,9 +32,9 @@ import ( "sync" "time" - . "gopkg.in/check.v1" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" + . "gopkg.in/check.v1" ) func (s *S) setupTxnSession(c *C) *mgo.Session {