From 130d555dbd475120b3c30ecf126804312e96d138 Mon Sep 17 00:00:00 2001 From: lvca Date: Tue, 26 Sep 2023 10:32:37 -0400 Subject: [PATCH] fix: fixed bug with management of TX isolation from cached vertices The solution is in the force of reloading of the record when the page is not in transaction. This assure the record is the same as for the loaded page. Without this fix, the record can be outdated managed with the loading of an updated page, so the MVCC check passes causing a overwrite of a new record with an old one. --- .../com/arcadedb/database/BaseRecord.java | 2 +- .../arcadedb/database/ImmutableDocument.java | 34 +++++++++++++---- .../main/java/com/arcadedb/database/RID.java | 6 +++ .../arcadedb/database/TransactionContext.java | 38 ++++++++++++++----- .../com/arcadedb/graph/ImmutableEdge.java | 22 +++++++++-- .../com/arcadedb/graph/ImmutableVertex.java | 31 +++++++++++++-- .../arcadedb/event/RecordEncryptionTest.java | 21 ++++++---- 7 files changed, 122 insertions(+), 32 deletions(-) diff --git a/engine/src/main/java/com/arcadedb/database/BaseRecord.java b/engine/src/main/java/com/arcadedb/database/BaseRecord.java index 077fc6f5d..98ac45d54 100644 --- a/engine/src/main/java/com/arcadedb/database/BaseRecord.java +++ b/engine/src/main/java/com/arcadedb/database/BaseRecord.java @@ -61,7 +61,7 @@ public void reload() { buffer = null; } else if (loaded != this) { // CREATE A BUFFER FROM THE MODIFIED RECORD - buffer = database.getSerializer().serialize(database, loaded); + buffer = ((RecordInternal) loaded).getBuffer().copy(); } } catch (final RecordNotFoundException e) { diff --git a/engine/src/main/java/com/arcadedb/database/ImmutableDocument.java b/engine/src/main/java/com/arcadedb/database/ImmutableDocument.java index 021fa849f..e71e66b52 100644 --- a/engine/src/main/java/com/arcadedb/database/ImmutableDocument.java +++ b/engine/src/main/java/com/arcadedb/database/ImmutableDocument.java @@ -19,9 +19,11 @@ package com.arcadedb.database; import com.arcadedb.exception.DatabaseOperationException; +import com.arcadedb.graph.MutableEdge; import com.arcadedb.schema.DocumentType; import com.arcadedb.serializer.json.JSONObject; +import java.io.*; import java.lang.reflect.*; import java.util.*; @@ -53,14 +55,29 @@ public synchronized Object get(final String propertyName) { return null; checkForLazyLoading(); - return database.getSerializer().deserializeProperty(database, buffer, new EmbeddedModifierProperty(this, propertyName), propertyName, type); + return database.getSerializer() + .deserializeProperty(database, buffer, new EmbeddedModifierProperty(this, propertyName), propertyName, type); } @Override public synchronized MutableDocument modify() { final Record recordInCache = database.getTransaction().getRecordFromCache(rid); - if (recordInCache instanceof MutableDocument) - return (MutableDocument) recordInCache; + if (recordInCache != null) { + if (recordInCache instanceof MutableDocument) + return (MutableDocument) recordInCache; + else if (!database.getTransaction().hasPageForRecord(rid.getPageId())) { + // THE RECORD IS NOT IN TX, SO IT MUST HAVE BEEN LOADED WITHOUT A TX OR PASSED FROM ANOTHER TX + // IT MUST BE RELOADED TO GET THE LATEST CHANGES. FORCE RELOAD + try { + // RELOAD THE PAGE FIRST TO AVOID LOOP WITH TRIGGERS (ENCRYPTION) + database.getTransaction() + .getPageToModify(rid.getPageId(), database.getSchema().getBucketById(rid.getBucketId()).getPageSize(), false); + reload(); + } catch (final IOException e) { + throw new DatabaseOperationException("Error on reloading document " + rid, e); + } + } + } checkForLazyLoading(); buffer.rewind(); @@ -70,7 +87,8 @@ public synchronized MutableDocument modify() { @Override public synchronized JSONObject toJSON(final boolean includeMetadata) { checkForLazyLoading(); - final Map map = database.getSerializer().deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type); + final Map map = database.getSerializer() + .deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type); final JSONObject result = new JSONSerializer(database).map2json(map); if (includeMetadata) { result.put("@cat", "d"); @@ -97,7 +115,8 @@ public synchronized Map toMap() { @Override public synchronized Map toMap(final boolean includeMetadata) { checkForLazyLoading(); - final Map result = database.getSerializer().deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type); + final Map result = database.getSerializer() + .deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type); if (includeMetadata) { result.put("@cat", "d"); result.put("@type", type.getName()); @@ -119,7 +138,8 @@ public synchronized String toString() { final int currPosition = buffer.position(); buffer.position(propertiesStartingPosition); - final Map map = this.database.getSerializer().deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type); + final Map map = this.database.getSerializer() + .deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type); buffer.position(currPosition); @@ -164,7 +184,7 @@ protected boolean checkForLazyLoading() { buffer = null; return false; } else if (loaded != this) { - // CREATE A BUFFER FROM THE MODIFIED RECORD + // CREATE A BUFFER FROM THE MODIFIED RECORD. THIS IS NEEDED FOR ENCRYPTION THAT UPDATE THE RECORD WITH A MUTABLE buffer = database.getSerializer().serialize(database, loaded); } diff --git a/engine/src/main/java/com/arcadedb/database/RID.java b/engine/src/main/java/com/arcadedb/database/RID.java index ed7b675ff..e770534fb 100644 --- a/engine/src/main/java/com/arcadedb/database/RID.java +++ b/engine/src/main/java/com/arcadedb/database/RID.java @@ -18,6 +18,7 @@ */ package com.arcadedb.database; +import com.arcadedb.engine.PageId; import com.arcadedb.exception.RecordNotFoundException; import com.arcadedb.graph.Edge; import com.arcadedb.graph.Vertex; @@ -206,4 +207,9 @@ public BasicDatabase getDatabase() { public boolean isValid() { return bucketId > -1 && offset > -1; } + + public PageId getPageId() { + return new PageId(bucketId, + (int) (getPosition() / ((DatabaseInternal) database).getSchema().getBucketById(bucketId).getMaxRecordsInPage())); + } } diff --git a/engine/src/main/java/com/arcadedb/database/TransactionContext.java b/engine/src/main/java/com/arcadedb/database/TransactionContext.java index 2f9eb2ca8..8912c1a43 100644 --- a/engine/src/main/java/com/arcadedb/database/TransactionContext.java +++ b/engine/src/main/java/com/arcadedb/database/TransactionContext.java @@ -162,8 +162,8 @@ public void removeImmutableRecordsOfSamePage(final RID rid) { final long pageNum = pos / bucket.getMaxRecordsInPage(); // FOR IMMUTABLE RECORDS AVOID THAT THEY ARE POINTING TO THE OLD OFFSET IN A MODIFIED PAGE - immutableRecordsCache.values() - .removeIf(r -> r.getIdentity().getBucketId() == bucketId && r.getIdentity().getPosition() / bucket.getMaxRecordsInPage() == pageNum); + immutableRecordsCache.values().removeIf( + r -> r.getIdentity().getBucketId() == bucketId && r.getIdentity().getPosition() / bucket.getMaxRecordsInPage() == pageNum); } public void removeRecordFromCache(final RID rid) { @@ -197,7 +197,8 @@ public void setWALFlush(final WALFile.FLUSH_TYPE flush) { @Override public void rollback() { LogManager.instance() - .log(this, Level.FINE, "Rollback transaction newPages=%s modifiedPages=%s (threadId=%d)", newPages, modifiedPages, Thread.currentThread().getId()); + .log(this, Level.FINE, "Rollback transaction newPages=%s modifiedPages=%s (threadId=%d)", newPages, modifiedPages, + Thread.currentThread().getId()); if (database.isOpen() && database.getSchema().getDictionary() != null) { if (modifiedPages != null) { @@ -248,6 +249,21 @@ public void addUpdatedRecord(final Record record) throws IOException { removeImmutableRecordsOfSamePage(record.getIdentity()); } + /** + * Used to determine if a page has been already loaded. This is important for isolation. + */ + public boolean hasPageForRecord(final PageId pageId) { + if (modifiedPages != null) + if (modifiedPages.containsKey(pageId)) + return true; + + if (newPages != null) + if (newPages.containsKey(pageId)) + return true; + + return immutablePages.containsKey(pageId); + } + /** * Looks for the page in the TX context first, then delegates to the database. */ @@ -288,7 +304,7 @@ public BasePage getPage(final PageId pageId, final int size) throws IOException /** * If the page is not already in transaction tx, loads from the database and clone it locally. */ - public MutablePage getPageToModify(final PageId pageId, final int size, final boolean isNew) throws IOException { + public MutablePage getPageToModify(final PageId pageId, final int pageSize, final boolean isNew) throws IOException { if (!isActive()) throw new TransactionException("Transaction not active"); @@ -302,7 +318,7 @@ public MutablePage getPageToModify(final PageId pageId, final int size, final bo final ImmutablePage loadedPage = immutablePages.remove(pageId); if (loadedPage == null) // NOT FOUND, DELEGATES TO THE DATABASE - page = database.getPageManager().getMutablePage(pageId, size, isNew, true); + page = database.getPageManager().getMutablePage(pageId, pageSize, isNew, true); else page = loadedPage.modify(); @@ -522,7 +538,8 @@ public TransactionPhase1 commit1stPhase(final boolean isLeader) { database.updateRecordNoLock(rec, false); } catch (final RecordNotFoundException e) { // DELETED IN TRANSACTION, THIS IS FULLY MANAGED TO NEVER HAPPEN, BUT IF IT DOES DUE TO THE INTRODUCTION OF A BUG, JUST LOG SOMETHING AND MOVE ON - LogManager.instance().log(this, Level.WARNING, "Attempt to update the delete record %s in transaction", rec.getIdentity()); + LogManager.instance() + .log(this, Level.WARNING, "Attempt to update the delete record %s in transaction", rec.getIdentity()); } updatedRecords = null; } @@ -586,7 +603,8 @@ public TransactionPhase1 commit1stPhase(final boolean isLeader) { rollback(); throw e; } catch (final Exception e) { - LogManager.instance().log(this, Level.FINE, "Unknown exception during commit (threadId=%d)", e, Thread.currentThread().getId()); + LogManager.instance() + .log(this, Level.FINE, "Unknown exception during commit (threadId=%d)", e, Thread.currentThread().getId()); rollback(); throw new TransactionException("Transaction error on commit", e); } @@ -612,7 +630,8 @@ public void commit2ndPhase(final TransactionContext.TransactionPhase1 changes) { // AT THIS POINT, LOCK + VERSION CHECK, THERE IS NO NEED TO MANAGE ROLLBACK BECAUSE THERE CANNOT BE CONCURRENT TX THAT UPDATE THE SAME PAGE CONCURRENTLY // UPDATE PAGE COUNTER FIRST LogManager.instance() - .log(this, Level.FINE, "TX committing pages newPages=%s modifiedPages=%s (threadId=%d)", newPages, modifiedPages, Thread.currentThread().getId()); + .log(this, Level.FINE, "TX committing pages newPages=%s modifiedPages=%s (threadId=%d)", newPages, modifiedPages, + Thread.currentThread().getId()); database.getPageManager().updatePages(newPages, modifiedPages, asyncFlush); @@ -644,7 +663,8 @@ public void commit2ndPhase(final TransactionContext.TransactionPhase1 changes) { } catch (final ConcurrentModificationException e) { throw e; } catch (final Exception e) { - LogManager.instance().log(this, Level.FINE, "Unknown exception during commit (threadId=%d)", e, Thread.currentThread().getId()); + LogManager.instance() + .log(this, Level.FINE, "Unknown exception during commit (threadId=%d)", e, Thread.currentThread().getId()); throw new TransactionException("Transaction error on commit", e); } finally { reset(); diff --git a/engine/src/main/java/com/arcadedb/graph/ImmutableEdge.java b/engine/src/main/java/com/arcadedb/graph/ImmutableEdge.java index 84226fdaa..791c3a4ba 100644 --- a/engine/src/main/java/com/arcadedb/graph/ImmutableEdge.java +++ b/engine/src/main/java/com/arcadedb/graph/ImmutableEdge.java @@ -23,11 +23,13 @@ import com.arcadedb.database.ImmutableDocument; import com.arcadedb.database.RID; import com.arcadedb.database.Record; +import com.arcadedb.exception.DatabaseOperationException; import com.arcadedb.schema.DocumentType; import com.arcadedb.schema.EdgeType; import com.arcadedb.serializer.BinaryTypes; import com.arcadedb.serializer.json.JSONObject; +import java.io.*; import java.util.*; /** @@ -59,8 +61,22 @@ public ImmutableEdge(final Database graph, final DocumentType type, final RID ri public synchronized MutableEdge modify() { final Record recordInCache = database.getTransaction().getRecordFromCache(rid); - if (recordInCache instanceof MutableEdge) - return (MutableEdge) recordInCache; + if (recordInCache != null) { + if (recordInCache instanceof MutableEdge) + return (MutableEdge) recordInCache; + else if (!database.getTransaction().hasPageForRecord(rid.getPageId())) { + // THE RECORD IS NOT IN TX, SO IT MUST HAVE BEEN LOADED WITHOUT A TX OR PASSED FROM ANOTHER TX + // IT MUST BE RELOADED TO GET THE LATEST CHANGES. FORCE RELOAD + try { + // RELOAD THE PAGE FIRST TO AVOID LOOP WITH TRIGGERS (ENCRYPTION) + database.getTransaction() + .getPageToModify(rid.getPageId(), database.getSchema().getBucketById(rid.getBucketId()).getPageSize(), false); + reload(); + } catch (final IOException e) { + throw new DatabaseOperationException("Error on reloading edge " + rid, e); + } + } + } checkForLazyLoading(); if (buffer != null) { @@ -157,7 +173,7 @@ public synchronized String toString() { @Override protected boolean checkForLazyLoading() { - if (rid != null && super.checkForLazyLoading()) { + if (rid != null && (super.checkForLazyLoading() || (buffer != null && buffer.position() == 1))) { buffer.position(1); // SKIP RECORD TYPE out = (RID) database.getSerializer().deserializeValue(database, buffer, BinaryTypes.TYPE_COMPRESSED_RID, null); in = (RID) database.getSerializer().deserializeValue(database, buffer, BinaryTypes.TYPE_COMPRESSED_RID, null); diff --git a/engine/src/main/java/com/arcadedb/graph/ImmutableVertex.java b/engine/src/main/java/com/arcadedb/graph/ImmutableVertex.java index d82845f37..1258d074a 100644 --- a/engine/src/main/java/com/arcadedb/graph/ImmutableVertex.java +++ b/engine/src/main/java/com/arcadedb/graph/ImmutableVertex.java @@ -24,10 +24,12 @@ import com.arcadedb.database.ImmutableDocument; import com.arcadedb.database.RID; import com.arcadedb.database.Record; +import com.arcadedb.exception.DatabaseOperationException; import com.arcadedb.schema.DocumentType; import com.arcadedb.schema.VertexType; import com.arcadedb.serializer.json.JSONObject; +import java.io.*; import java.util.*; /** @@ -63,14 +65,34 @@ public byte getRecordType() { public synchronized MutableVertex modify() { final Record recordInCache = database.getTransaction().getRecordFromCache(rid); - if (recordInCache != null && recordInCache != this && recordInCache instanceof MutableVertex) - return (MutableVertex) recordInCache; + if (recordInCache != null) { + if (recordInCache instanceof MutableVertex) + return (MutableVertex) recordInCache; + } else if (!database.getTransaction().hasPageForRecord(rid.getPageId())) { + // THE RECORD IS NOT IN TX, SO IT MUST HAVE BEEN LOADED WITHOUT A TX OR PASSED FROM ANOTHER TX + // IT MUST BE RELOADED TO GET THE LATEST CHANGES. FORCE RELOAD + try { + // RELOAD THE PAGE FIRST TO AVOID LOOP WITH TRIGGERS (ENCRYPTION) + database.getTransaction() + .getPageToModify(rid.getPageId(), database.getSchema().getBucketById(rid.getBucketId()).getPageSize(), false); + reload(); + } catch (final IOException e) { + throw new DatabaseOperationException("Error on reloading vertex " + rid, e); + } + } checkForLazyLoading(); buffer.rewind(); return new MutableVertex(database, (VertexType) type, rid, buffer.copyOfContent()); } + @Override + public void reload() { + // FORCE THE RELOAD + buffer = null; + super.reload(); + } + @Override public synchronized RID getOutEdgesHeadChunk() { checkForLazyLoading(); @@ -93,7 +115,8 @@ public void setInEdgesHeadChunk(final RID inEdges) { throw new UnsupportedOperationException("setInEdgesHeadChunk"); } - public MutableEdge newEdge(final String edgeType, final Identifiable toVertex, final boolean bidirectional, final Object... properties) { + public MutableEdge newEdge(final String edgeType, final Identifiable toVertex, final boolean bidirectional, + final Object... properties) { return database.getGraphEngine().newEdge(getMostUpdatedVertex(this), edgeType, toVertex, bidirectional, properties); } @@ -169,7 +192,7 @@ public synchronized JSONObject toJSON(final boolean includeMetadata) { @Override protected boolean checkForLazyLoading() { - if (super.checkForLazyLoading()) { + if (super.checkForLazyLoading() || buffer != null && buffer.position() == 1) { buffer.position(1); // SKIP RECORD TYPE outEdges = new RID(database, buffer.getInt(), buffer.getLong()); if (outEdges.getBucketId() == -1) diff --git a/engine/src/test/java/com/arcadedb/event/RecordEncryptionTest.java b/engine/src/test/java/com/arcadedb/event/RecordEncryptionTest.java index 5a316cb96..32a4b91b4 100644 --- a/engine/src/test/java/com/arcadedb/event/RecordEncryptionTest.java +++ b/engine/src/test/java/com/arcadedb/event/RecordEncryptionTest.java @@ -19,6 +19,7 @@ package com.arcadedb.event; import com.arcadedb.TestHelper; +import com.arcadedb.database.Database; import com.arcadedb.database.Record; import com.arcadedb.graph.MutableVertex; import com.arcadedb.graph.Vertex; @@ -45,7 +46,8 @@ * * @author Luca Garulli (l.garulli@arcadedata.com) */ -public class RecordEncryptionTest extends TestHelper implements BeforeRecordCreateListener, AfterRecordReadListener, BeforeRecordUpdateListener { +public class RecordEncryptionTest extends TestHelper + implements BeforeRecordCreateListener, AfterRecordReadListener, BeforeRecordUpdateListener { private final static String password = "JustAPassword"; private final static String PASSWORD_ALGORITHM = "PBKDF2WithHmacSHA256"; private final static String ALGORITHM = "AES/CBC/PKCS5Padding"; @@ -73,13 +75,15 @@ public void beginTest() { } @Test - public void testEncryptionAtRest() { + public void testEncryption() { database.transaction(() -> { - final MutableVertex v1 = database.newVertex("BackAccount").set("secret", "Nobody must know Elon and Zuck are brothers").save(); + final MutableVertex v1 = database.newVertex("BackAccount").set("secret", "Nobody must know Elon and Zuck are brothers") + .save(); }); Assertions.assertEquals(1, creates.get()); + database.setTransactionIsolationLevel(Database.TRANSACTION_ISOLATION_LEVEL.REPEATABLE_READ); database.transaction(() -> { final Vertex v1 = database.iterateType("BackAccount", true).next().asVertex(); Assertions.assertEquals("Nobody must know Elon and Zuck are brothers", v1.getString("secret")); @@ -140,8 +144,8 @@ public boolean onBeforeUpdate(Record record) { } public static String encrypt(String algorithm, String input, SecretKey key, IvParameterSpec iv) - throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, BadPaddingException, - IllegalBlockSizeException { + throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, + BadPaddingException, IllegalBlockSizeException { final Cipher cipher = Cipher.getInstance(algorithm); cipher.init(Cipher.ENCRYPT_MODE, key, iv); final byte[] cipherText = cipher.doFinal(input.getBytes()); @@ -149,8 +153,8 @@ public static String encrypt(String algorithm, String input, SecretKey key, IvPa } public static String decrypt(String algorithm, String cipherText, SecretKey key, IvParameterSpec iv) - throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, BadPaddingException, - IllegalBlockSizeException { + throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, + BadPaddingException, IllegalBlockSizeException { final Cipher cipher = Cipher.getInstance(algorithm); cipher.init(Cipher.DECRYPT_MODE, key, iv); @@ -158,7 +162,8 @@ public static String decrypt(String algorithm, String cipherText, SecretKey key, return new String(plainText); } - public static SecretKey getKeyFromPassword(final String password, final String salt) throws NoSuchAlgorithmException, InvalidKeySpecException { + public static SecretKey getKeyFromPassword(final String password, final String salt) + throws NoSuchAlgorithmException, InvalidKeySpecException { final SecretKeyFactory factory = SecretKeyFactory.getInstance(PASSWORD_ALGORITHM); final KeySpec spec = new PBEKeySpec(password.toCharArray(), salt.getBytes(), SALT_ITERATIONS, KEY_LENGTH); return new SecretKeySpec(factory.generateSecret(spec).getEncoded(), "AES");