Skip to content

Commit

Permalink
fix: fixed bug with management of TX isolation from cached vertices
Browse files Browse the repository at this point in the history
The solution is in the force of reloading of the record when the page is not in transaction. This assure the record is the same as for the loaded page. Without this fix, the record can be outdated managed with the loading of an updated page, so the MVCC check passes causing a overwrite of a new record with an old one.
  • Loading branch information
lvca committed Sep 26, 2023
1 parent 805631d commit 130d555
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 32 deletions.
2 changes: 1 addition & 1 deletion engine/src/main/java/com/arcadedb/database/BaseRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void reload() {
buffer = null;
} else if (loaded != this) {
// CREATE A BUFFER FROM THE MODIFIED RECORD
buffer = database.getSerializer().serialize(database, loaded);
buffer = ((RecordInternal) loaded).getBuffer().copy();
}

} catch (final RecordNotFoundException e) {
Expand Down
34 changes: 27 additions & 7 deletions engine/src/main/java/com/arcadedb/database/ImmutableDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package com.arcadedb.database;

import com.arcadedb.exception.DatabaseOperationException;
import com.arcadedb.graph.MutableEdge;
import com.arcadedb.schema.DocumentType;
import com.arcadedb.serializer.json.JSONObject;

import java.io.*;
import java.lang.reflect.*;
import java.util.*;

Expand Down Expand Up @@ -53,14 +55,29 @@ public synchronized Object get(final String propertyName) {
return null;

checkForLazyLoading();
return database.getSerializer().deserializeProperty(database, buffer, new EmbeddedModifierProperty(this, propertyName), propertyName, type);
return database.getSerializer()
.deserializeProperty(database, buffer, new EmbeddedModifierProperty(this, propertyName), propertyName, type);
}

@Override
public synchronized MutableDocument modify() {
final Record recordInCache = database.getTransaction().getRecordFromCache(rid);
if (recordInCache instanceof MutableDocument)
return (MutableDocument) recordInCache;
if (recordInCache != null) {
if (recordInCache instanceof MutableDocument)
return (MutableDocument) recordInCache;
else if (!database.getTransaction().hasPageForRecord(rid.getPageId())) {
// THE RECORD IS NOT IN TX, SO IT MUST HAVE BEEN LOADED WITHOUT A TX OR PASSED FROM ANOTHER TX
// IT MUST BE RELOADED TO GET THE LATEST CHANGES. FORCE RELOAD
try {
// RELOAD THE PAGE FIRST TO AVOID LOOP WITH TRIGGERS (ENCRYPTION)
database.getTransaction()
.getPageToModify(rid.getPageId(), database.getSchema().getBucketById(rid.getBucketId()).getPageSize(), false);
reload();
} catch (final IOException e) {
throw new DatabaseOperationException("Error on reloading document " + rid, e);
}
}
}

checkForLazyLoading();
buffer.rewind();
Expand All @@ -70,7 +87,8 @@ public synchronized MutableDocument modify() {
@Override
public synchronized JSONObject toJSON(final boolean includeMetadata) {
checkForLazyLoading();
final Map<String, Object> map = database.getSerializer().deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type);
final Map<String, Object> map = database.getSerializer()
.deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type);
final JSONObject result = new JSONSerializer(database).map2json(map);
if (includeMetadata) {
result.put("@cat", "d");
Expand All @@ -97,7 +115,8 @@ public synchronized Map<String, Object> toMap() {
@Override
public synchronized Map<String, Object> toMap(final boolean includeMetadata) {
checkForLazyLoading();
final Map<String, Object> result = database.getSerializer().deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type);
final Map<String, Object> result = database.getSerializer()
.deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type);
if (includeMetadata) {
result.put("@cat", "d");
result.put("@type", type.getName());
Expand All @@ -119,7 +138,8 @@ public synchronized String toString() {
final int currPosition = buffer.position();

buffer.position(propertiesStartingPosition);
final Map<String, Object> map = this.database.getSerializer().deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type);
final Map<String, Object> map = this.database.getSerializer()
.deserializeProperties(database, buffer, new EmbeddedModifierObject(this), type);

buffer.position(currPosition);

Expand Down Expand Up @@ -164,7 +184,7 @@ protected boolean checkForLazyLoading() {
buffer = null;
return false;
} else if (loaded != this) {
// CREATE A BUFFER FROM THE MODIFIED RECORD
// CREATE A BUFFER FROM THE MODIFIED RECORD. THIS IS NEEDED FOR ENCRYPTION THAT UPDATE THE RECORD WITH A MUTABLE
buffer = database.getSerializer().serialize(database, loaded);
}

Expand Down
6 changes: 6 additions & 0 deletions engine/src/main/java/com/arcadedb/database/RID.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package com.arcadedb.database;

import com.arcadedb.engine.PageId;
import com.arcadedb.exception.RecordNotFoundException;
import com.arcadedb.graph.Edge;
import com.arcadedb.graph.Vertex;
Expand Down Expand Up @@ -206,4 +207,9 @@ public BasicDatabase getDatabase() {
public boolean isValid() {
return bucketId > -1 && offset > -1;
}

public PageId getPageId() {
return new PageId(bucketId,
(int) (getPosition() / ((DatabaseInternal) database).getSchema().getBucketById(bucketId).getMaxRecordsInPage()));
}
}
38 changes: 29 additions & 9 deletions engine/src/main/java/com/arcadedb/database/TransactionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ public void removeImmutableRecordsOfSamePage(final RID rid) {
final long pageNum = pos / bucket.getMaxRecordsInPage();

// FOR IMMUTABLE RECORDS AVOID THAT THEY ARE POINTING TO THE OLD OFFSET IN A MODIFIED PAGE
immutableRecordsCache.values()
.removeIf(r -> r.getIdentity().getBucketId() == bucketId && r.getIdentity().getPosition() / bucket.getMaxRecordsInPage() == pageNum);
immutableRecordsCache.values().removeIf(
r -> r.getIdentity().getBucketId() == bucketId && r.getIdentity().getPosition() / bucket.getMaxRecordsInPage() == pageNum);
}

public void removeRecordFromCache(final RID rid) {
Expand Down Expand Up @@ -197,7 +197,8 @@ public void setWALFlush(final WALFile.FLUSH_TYPE flush) {
@Override
public void rollback() {
LogManager.instance()
.log(this, Level.FINE, "Rollback transaction newPages=%s modifiedPages=%s (threadId=%d)", newPages, modifiedPages, Thread.currentThread().getId());
.log(this, Level.FINE, "Rollback transaction newPages=%s modifiedPages=%s (threadId=%d)", newPages, modifiedPages,
Thread.currentThread().getId());

if (database.isOpen() && database.getSchema().getDictionary() != null) {
if (modifiedPages != null) {
Expand Down Expand Up @@ -248,6 +249,21 @@ public void addUpdatedRecord(final Record record) throws IOException {
removeImmutableRecordsOfSamePage(record.getIdentity());
}

/**
* Used to determine if a page has been already loaded. This is important for isolation.
*/
public boolean hasPageForRecord(final PageId pageId) {
if (modifiedPages != null)
if (modifiedPages.containsKey(pageId))
return true;

if (newPages != null)
if (newPages.containsKey(pageId))
return true;

return immutablePages.containsKey(pageId);
}

/**
* Looks for the page in the TX context first, then delegates to the database.
*/
Expand Down Expand Up @@ -288,7 +304,7 @@ public BasePage getPage(final PageId pageId, final int size) throws IOException
/**
* If the page is not already in transaction tx, loads from the database and clone it locally.
*/
public MutablePage getPageToModify(final PageId pageId, final int size, final boolean isNew) throws IOException {
public MutablePage getPageToModify(final PageId pageId, final int pageSize, final boolean isNew) throws IOException {
if (!isActive())
throw new TransactionException("Transaction not active");

Expand All @@ -302,7 +318,7 @@ public MutablePage getPageToModify(final PageId pageId, final int size, final bo
final ImmutablePage loadedPage = immutablePages.remove(pageId);
if (loadedPage == null)
// NOT FOUND, DELEGATES TO THE DATABASE
page = database.getPageManager().getMutablePage(pageId, size, isNew, true);
page = database.getPageManager().getMutablePage(pageId, pageSize, isNew, true);
else
page = loadedPage.modify();

Expand Down Expand Up @@ -522,7 +538,8 @@ public TransactionPhase1 commit1stPhase(final boolean isLeader) {
database.updateRecordNoLock(rec, false);
} catch (final RecordNotFoundException e) {
// DELETED IN TRANSACTION, THIS IS FULLY MANAGED TO NEVER HAPPEN, BUT IF IT DOES DUE TO THE INTRODUCTION OF A BUG, JUST LOG SOMETHING AND MOVE ON
LogManager.instance().log(this, Level.WARNING, "Attempt to update the delete record %s in transaction", rec.getIdentity());
LogManager.instance()
.log(this, Level.WARNING, "Attempt to update the delete record %s in transaction", rec.getIdentity());
}
updatedRecords = null;
}
Expand Down Expand Up @@ -586,7 +603,8 @@ public TransactionPhase1 commit1stPhase(final boolean isLeader) {
rollback();
throw e;
} catch (final Exception e) {
LogManager.instance().log(this, Level.FINE, "Unknown exception during commit (threadId=%d)", e, Thread.currentThread().getId());
LogManager.instance()
.log(this, Level.FINE, "Unknown exception during commit (threadId=%d)", e, Thread.currentThread().getId());
rollback();
throw new TransactionException("Transaction error on commit", e);
}
Expand All @@ -612,7 +630,8 @@ public void commit2ndPhase(final TransactionContext.TransactionPhase1 changes) {
// AT THIS POINT, LOCK + VERSION CHECK, THERE IS NO NEED TO MANAGE ROLLBACK BECAUSE THERE CANNOT BE CONCURRENT TX THAT UPDATE THE SAME PAGE CONCURRENTLY
// UPDATE PAGE COUNTER FIRST
LogManager.instance()
.log(this, Level.FINE, "TX committing pages newPages=%s modifiedPages=%s (threadId=%d)", newPages, modifiedPages, Thread.currentThread().getId());
.log(this, Level.FINE, "TX committing pages newPages=%s modifiedPages=%s (threadId=%d)", newPages, modifiedPages,
Thread.currentThread().getId());

database.getPageManager().updatePages(newPages, modifiedPages, asyncFlush);

Expand Down Expand Up @@ -644,7 +663,8 @@ public void commit2ndPhase(final TransactionContext.TransactionPhase1 changes) {
} catch (final ConcurrentModificationException e) {
throw e;
} catch (final Exception e) {
LogManager.instance().log(this, Level.FINE, "Unknown exception during commit (threadId=%d)", e, Thread.currentThread().getId());
LogManager.instance()
.log(this, Level.FINE, "Unknown exception during commit (threadId=%d)", e, Thread.currentThread().getId());
throw new TransactionException("Transaction error on commit", e);
} finally {
reset();
Expand Down
22 changes: 19 additions & 3 deletions engine/src/main/java/com/arcadedb/graph/ImmutableEdge.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import com.arcadedb.database.ImmutableDocument;
import com.arcadedb.database.RID;
import com.arcadedb.database.Record;
import com.arcadedb.exception.DatabaseOperationException;
import com.arcadedb.schema.DocumentType;
import com.arcadedb.schema.EdgeType;
import com.arcadedb.serializer.BinaryTypes;
import com.arcadedb.serializer.json.JSONObject;

import java.io.*;
import java.util.*;

/**
Expand Down Expand Up @@ -59,8 +61,22 @@ public ImmutableEdge(final Database graph, final DocumentType type, final RID ri

public synchronized MutableEdge modify() {
final Record recordInCache = database.getTransaction().getRecordFromCache(rid);
if (recordInCache instanceof MutableEdge)
return (MutableEdge) recordInCache;
if (recordInCache != null) {
if (recordInCache instanceof MutableEdge)
return (MutableEdge) recordInCache;
else if (!database.getTransaction().hasPageForRecord(rid.getPageId())) {
// THE RECORD IS NOT IN TX, SO IT MUST HAVE BEEN LOADED WITHOUT A TX OR PASSED FROM ANOTHER TX
// IT MUST BE RELOADED TO GET THE LATEST CHANGES. FORCE RELOAD
try {
// RELOAD THE PAGE FIRST TO AVOID LOOP WITH TRIGGERS (ENCRYPTION)
database.getTransaction()
.getPageToModify(rid.getPageId(), database.getSchema().getBucketById(rid.getBucketId()).getPageSize(), false);
reload();
} catch (final IOException e) {
throw new DatabaseOperationException("Error on reloading edge " + rid, e);
}
}
}

checkForLazyLoading();
if (buffer != null) {
Expand Down Expand Up @@ -157,7 +173,7 @@ public synchronized String toString() {

@Override
protected boolean checkForLazyLoading() {
if (rid != null && super.checkForLazyLoading()) {
if (rid != null && (super.checkForLazyLoading() || (buffer != null && buffer.position() == 1))) {
buffer.position(1); // SKIP RECORD TYPE
out = (RID) database.getSerializer().deserializeValue(database, buffer, BinaryTypes.TYPE_COMPRESSED_RID, null);
in = (RID) database.getSerializer().deserializeValue(database, buffer, BinaryTypes.TYPE_COMPRESSED_RID, null);
Expand Down
31 changes: 27 additions & 4 deletions engine/src/main/java/com/arcadedb/graph/ImmutableVertex.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import com.arcadedb.database.ImmutableDocument;
import com.arcadedb.database.RID;
import com.arcadedb.database.Record;
import com.arcadedb.exception.DatabaseOperationException;
import com.arcadedb.schema.DocumentType;
import com.arcadedb.schema.VertexType;
import com.arcadedb.serializer.json.JSONObject;

import java.io.*;
import java.util.*;

/**
Expand Down Expand Up @@ -63,14 +65,34 @@ public byte getRecordType() {

public synchronized MutableVertex modify() {
final Record recordInCache = database.getTransaction().getRecordFromCache(rid);
if (recordInCache != null && recordInCache != this && recordInCache instanceof MutableVertex)
return (MutableVertex) recordInCache;
if (recordInCache != null) {
if (recordInCache instanceof MutableVertex)
return (MutableVertex) recordInCache;
} else if (!database.getTransaction().hasPageForRecord(rid.getPageId())) {
// THE RECORD IS NOT IN TX, SO IT MUST HAVE BEEN LOADED WITHOUT A TX OR PASSED FROM ANOTHER TX
// IT MUST BE RELOADED TO GET THE LATEST CHANGES. FORCE RELOAD
try {
// RELOAD THE PAGE FIRST TO AVOID LOOP WITH TRIGGERS (ENCRYPTION)
database.getTransaction()
.getPageToModify(rid.getPageId(), database.getSchema().getBucketById(rid.getBucketId()).getPageSize(), false);
reload();
} catch (final IOException e) {
throw new DatabaseOperationException("Error on reloading vertex " + rid, e);
}
}

checkForLazyLoading();
buffer.rewind();
return new MutableVertex(database, (VertexType) type, rid, buffer.copyOfContent());
}

@Override
public void reload() {
// FORCE THE RELOAD
buffer = null;
super.reload();
}

@Override
public synchronized RID getOutEdgesHeadChunk() {
checkForLazyLoading();
Expand All @@ -93,7 +115,8 @@ public void setInEdgesHeadChunk(final RID inEdges) {
throw new UnsupportedOperationException("setInEdgesHeadChunk");
}

public MutableEdge newEdge(final String edgeType, final Identifiable toVertex, final boolean bidirectional, final Object... properties) {
public MutableEdge newEdge(final String edgeType, final Identifiable toVertex, final boolean bidirectional,
final Object... properties) {
return database.getGraphEngine().newEdge(getMostUpdatedVertex(this), edgeType, toVertex, bidirectional, properties);
}

Expand Down Expand Up @@ -169,7 +192,7 @@ public synchronized JSONObject toJSON(final boolean includeMetadata) {

@Override
protected boolean checkForLazyLoading() {
if (super.checkForLazyLoading()) {
if (super.checkForLazyLoading() || buffer != null && buffer.position() == 1) {
buffer.position(1); // SKIP RECORD TYPE
outEdges = new RID(database, buffer.getInt(), buffer.getLong());
if (outEdges.getBucketId() == -1)
Expand Down
21 changes: 13 additions & 8 deletions engine/src/test/java/com/arcadedb/event/RecordEncryptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.arcadedb.event;

import com.arcadedb.TestHelper;
import com.arcadedb.database.Database;
import com.arcadedb.database.Record;
import com.arcadedb.graph.MutableVertex;
import com.arcadedb.graph.Vertex;
Expand All @@ -45,7 +46,8 @@
*
* @author Luca Garulli ([email protected])
*/
public class RecordEncryptionTest extends TestHelper implements BeforeRecordCreateListener, AfterRecordReadListener, BeforeRecordUpdateListener {
public class RecordEncryptionTest extends TestHelper
implements BeforeRecordCreateListener, AfterRecordReadListener, BeforeRecordUpdateListener {
private final static String password = "JustAPassword";
private final static String PASSWORD_ALGORITHM = "PBKDF2WithHmacSHA256";
private final static String ALGORITHM = "AES/CBC/PKCS5Padding";
Expand Down Expand Up @@ -73,13 +75,15 @@ public void beginTest() {
}

@Test
public void testEncryptionAtRest() {
public void testEncryption() {
database.transaction(() -> {
final MutableVertex v1 = database.newVertex("BackAccount").set("secret", "Nobody must know Elon and Zuck are brothers").save();
final MutableVertex v1 = database.newVertex("BackAccount").set("secret", "Nobody must know Elon and Zuck are brothers")
.save();
});

Assertions.assertEquals(1, creates.get());

database.setTransactionIsolationLevel(Database.TRANSACTION_ISOLATION_LEVEL.REPEATABLE_READ);
database.transaction(() -> {
final Vertex v1 = database.iterateType("BackAccount", true).next().asVertex();
Assertions.assertEquals("Nobody must know Elon and Zuck are brothers", v1.getString("secret"));
Expand Down Expand Up @@ -140,25 +144,26 @@ public boolean onBeforeUpdate(Record record) {
}

public static String encrypt(String algorithm, String input, SecretKey key, IvParameterSpec iv)
throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, BadPaddingException,
IllegalBlockSizeException {
throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException,
BadPaddingException, IllegalBlockSizeException {
final Cipher cipher = Cipher.getInstance(algorithm);
cipher.init(Cipher.ENCRYPT_MODE, key, iv);
final byte[] cipherText = cipher.doFinal(input.getBytes());
return Base64.getEncoder().encodeToString(cipherText);
}

public static String decrypt(String algorithm, String cipherText, SecretKey key, IvParameterSpec iv)
throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, BadPaddingException,
IllegalBlockSizeException {
throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException,
BadPaddingException, IllegalBlockSizeException {

final Cipher cipher = Cipher.getInstance(algorithm);
cipher.init(Cipher.DECRYPT_MODE, key, iv);
final byte[] plainText = cipher.doFinal(Base64.getDecoder().decode(cipherText));
return new String(plainText);
}

public static SecretKey getKeyFromPassword(final String password, final String salt) throws NoSuchAlgorithmException, InvalidKeySpecException {
public static SecretKey getKeyFromPassword(final String password, final String salt)
throws NoSuchAlgorithmException, InvalidKeySpecException {
final SecretKeyFactory factory = SecretKeyFactory.getInstance(PASSWORD_ALGORITHM);
final KeySpec spec = new PBEKeySpec(password.toCharArray(), salt.getBytes(), SALT_ITERATIONS, KEY_LENGTH);
return new SecretKeySpec(factory.generateSecret(spec).getEncoded(), "AES");
Expand Down

0 comments on commit 130d555

Please sign in to comment.