diff --git a/engine/src/main/java/com/arcadedb/database/TransactionContext.java b/engine/src/main/java/com/arcadedb/database/TransactionContext.java index eb3b6f098..b041d08c7 100644 --- a/engine/src/main/java/com/arcadedb/database/TransactionContext.java +++ b/engine/src/main/java/com/arcadedb/database/TransactionContext.java @@ -409,6 +409,13 @@ public void kill() { immutablePages.clear(); } + public Map getBucketRecordDelta() { + final Map map = new HashMap<>(bucketRecordDelta.size()); + for (Map.Entry entry : bucketRecordDelta.entrySet()) + map.put(entry.getKey(), entry.getValue().get()); + return map; + } + /** * Returns the delta of records considering the pending changes in transaction. */ diff --git a/engine/src/main/java/com/arcadedb/engine/TransactionManager.java b/engine/src/main/java/com/arcadedb/engine/TransactionManager.java index 803ac4c96..2b60d8563 100644 --- a/engine/src/main/java/com/arcadedb/engine/TransactionManager.java +++ b/engine/src/main/java/com/arcadedb/engine/TransactionManager.java @@ -225,7 +225,7 @@ public void checkIntegrity() { lastTxId = lowerTxId; - applyChanges(walPositions[lowerTx], true); + applyChanges(walPositions[lowerTx], Collections.emptyMap(), true); walPositions[lowerTx] = activeWALFilePool[lowerTx].getTransaction(walPositions[lowerTx].endPositionInLog); } @@ -267,7 +267,7 @@ public Map getStats() { return map; } - public boolean applyChanges(final WALFile.WALTransaction tx, final boolean ignoreErrors) { + public boolean applyChanges(final WALFile.WALTransaction tx, final Map bucketRecordDelta, final boolean ignoreErrors) { boolean changed = false; boolean involveDictionary = false; @@ -362,6 +362,11 @@ public boolean applyChanges(final WALFile.WALTransaction tx, final boolean ignor } } + for (Map.Entry entry : bucketRecordDelta.entrySet()) { + final Bucket bucket = database.getSchema().getBucketById(entry.getKey()); + bucket.setCachedRecordCount(bucket.getCachedRecordCount() + entry.getValue()); + } + if (involveDictionary) { try { database.getSchema().getDictionary().reload(); diff --git a/engine/src/main/java/com/arcadedb/query/sql/executor/FetchFromSchemaIndexesStep.java b/engine/src/main/java/com/arcadedb/query/sql/executor/FetchFromSchemaIndexesStep.java index 8d80e86cd..5c702aa04 100644 --- a/engine/src/main/java/com/arcadedb/query/sql/executor/FetchFromSchemaIndexesStep.java +++ b/engine/src/main/java/com/arcadedb/query/sql/executor/FetchFromSchemaIndexesStep.java @@ -72,8 +72,7 @@ public ResultSet syncPull(final CommandContext context, final int nRecords) thro r.setProperty("unique", index.isUnique()); r.setProperty("automatic", index.isAutomatic()); - if (index instanceof IndexInternal) - r.setProperty("compacting", ((IndexInternal) index).isCompacting()); + r.setProperty("compacting", ((IndexInternal) index).isCompacting()); if (fileId > -1) { r.setProperty("fileId", fileId); diff --git a/engine/src/main/java/com/arcadedb/query/sql/executor/FetchFromSchemaTypesStep.java b/engine/src/main/java/com/arcadedb/query/sql/executor/FetchFromSchemaTypesStep.java index 6dfee4b63..98edf8f3d 100644 --- a/engine/src/main/java/com/arcadedb/query/sql/executor/FetchFromSchemaTypesStep.java +++ b/engine/src/main/java/com/arcadedb/query/sql/executor/FetchFromSchemaTypesStep.java @@ -72,6 +72,7 @@ else if (type.getType() == Edge.RECORD_TYPE) t = "edge"; r.setProperty("type", t); + r.setProperty("records", context.getDatabase().countType(typeName, false)); r.setProperty("buckets", type.getBuckets(false).stream().map((b) -> b.getName()).collect(Collectors.toList())); r.setProperty("bucketSelectionStrategy", type.getBucketSelectionStrategy().getName()); diff --git a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java index 3a1ea9ec9..f83bc01b7 100644 --- a/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java +++ b/server/src/main/java/com/arcadedb/server/ha/ReplicatedDatabase.java @@ -117,8 +117,8 @@ public void commit() { replicateTx(tx, phase1, bufferChanges); else { // USE A BIGGER TIMEOUT CONSIDERING THE DOUBLE LATENCY - final TxForwardRequest command = new TxForwardRequest(ReplicatedDatabase.this, getTransactionIsolationLevel(), bufferChanges, - tx.getIndexChanges().toMap()); + final TxForwardRequest command = new TxForwardRequest(ReplicatedDatabase.this, getTransactionIsolationLevel(), tx.getBucketRecordDelta(), + bufferChanges, tx.getIndexChanges().toMap()); server.getHA().forwardCommandToLeader(command, timeout * 2); tx.reset(); } @@ -171,7 +171,7 @@ public void replicateTx(final TransactionContext tx, final TransactionContext.Tr throw new IllegalArgumentException("Quorum " + quorum + " not managed"); } - final TxRequest req = new TxRequest(getName(), bufferChanges, reqQuorum > 1); + final TxRequest req = new TxRequest(getName(), tx.getBucketRecordDelta(), bufferChanges, reqQuorum > 1); final DatabaseChangeStructureRequest changeStructureRequest = getChangeStructure(-1); if (changeStructureRequest != null) { diff --git a/server/src/main/java/com/arcadedb/server/ha/message/TxForwardRequest.java b/server/src/main/java/com/arcadedb/server/ha/message/TxForwardRequest.java index 7b592a2f6..b216ce8cd 100755 --- a/server/src/main/java/com/arcadedb/server/ha/message/TxForwardRequest.java +++ b/server/src/main/java/com/arcadedb/server/ha/message/TxForwardRequest.java @@ -50,9 +50,10 @@ public class TxForwardRequest extends TxRequestAbstract { public TxForwardRequest() { } - public TxForwardRequest(final DatabaseInternal database, Database.TRANSACTION_ISOLATION_LEVEL transactionIsolationLevel, final Binary bufferChanges, + public TxForwardRequest(final DatabaseInternal database, Database.TRANSACTION_ISOLATION_LEVEL transactionIsolationLevel, + final Map bucketRecordDelta, final Binary bufferChanges, final Map>> keysTx) { - super(database.getName(), bufferChanges); + super(database.getName(), bucketRecordDelta, bufferChanges); this.isolationLevelIndex = transactionIsolationLevel.ordinal(); writeIndexKeysToBuffer(database, keysTx); } diff --git a/server/src/main/java/com/arcadedb/server/ha/message/TxRequest.java b/server/src/main/java/com/arcadedb/server/ha/message/TxRequest.java index 699adae8a..9bdc8ac78 100755 --- a/server/src/main/java/com/arcadedb/server/ha/message/TxRequest.java +++ b/server/src/main/java/com/arcadedb/server/ha/message/TxRequest.java @@ -29,6 +29,7 @@ import com.arcadedb.server.ha.ReplicationException; import java.nio.channels.*; +import java.util.*; import java.util.logging.*; /** @@ -41,8 +42,8 @@ public class TxRequest extends TxRequestAbstract { public TxRequest() { } - public TxRequest(final String dbName, final Binary bufferChanges, final boolean waitForResponse) { - super(dbName, bufferChanges); + public TxRequest(final String dbName, final Map bucketRecordDelta, final Binary bufferChanges, final boolean waitForResponse) { + super(dbName, bucketRecordDelta, bufferChanges); this.waitForResponse = waitForResponse; } @@ -92,7 +93,7 @@ public HACommand execute(final HAServer server, final String remoteServerName, f try { LogManager.instance().log(this, Level.FINE, "Applying tx %d from server %s (modifiedPages=%d)...", walTx.txId, remoteServerName, walTx.pages.length); - db.getTransactionManager().applyChanges(walTx, false); + db.getTransactionManager().applyChanges(walTx, bucketRecordDelta, false); } catch (final WALException e) { if (e.getCause() instanceof ClosedChannelException) { diff --git a/server/src/main/java/com/arcadedb/server/ha/message/TxRequestAbstract.java b/server/src/main/java/com/arcadedb/server/ha/message/TxRequestAbstract.java index 252f937b0..c78942835 100755 --- a/server/src/main/java/com/arcadedb/server/ha/message/TxRequestAbstract.java +++ b/server/src/main/java/com/arcadedb/server/ha/message/TxRequestAbstract.java @@ -24,20 +24,24 @@ import com.arcadedb.server.ArcadeDBServer; import com.arcadedb.server.ha.ReplicationException; +import java.util.*; + public abstract class TxRequestAbstract extends HAAbstractCommand { - protected String databaseName; - protected int changesUncompressedLength; - protected Binary changesBuffer; + protected String databaseName; + protected int changesUncompressedLength; + protected Binary changesBuffer; + protected Map bucketRecordDelta; // @SINCE 23.7.1 protected TxRequestAbstract() { } - protected TxRequestAbstract(final String dbName, final Binary changesBuffer) { + protected TxRequestAbstract(final String dbName, final Map bucketRecordDelta, final Binary changesBuffer) { this.databaseName = dbName; changesBuffer.rewind(); this.changesUncompressedLength = changesBuffer.size(); this.changesBuffer = CompressionFactory.getDefault().compress(changesBuffer); + this.bucketRecordDelta = bucketRecordDelta; } @Override @@ -45,6 +49,13 @@ public void toStream(final Binary stream) { stream.putString(databaseName); stream.putInt(changesUncompressedLength); stream.putBytes(changesBuffer.getContent(), changesBuffer.size()); + + // @SINCE 23.7.1 + stream.putInt(bucketRecordDelta.size()); + for (Map.Entry entry : bucketRecordDelta.entrySet()) { + stream.putInt(entry.getKey()); + stream.putInt(entry.getValue()); + } } @Override @@ -52,6 +63,13 @@ public void fromStream(final ArcadeDBServer server, final Binary stream) { databaseName = stream.getString(); changesUncompressedLength = stream.getInt(); changesBuffer = CompressionFactory.getDefault().decompress(new Binary(stream.getBytes()), changesUncompressedLength); + + // @SINCE 23.7.1 + final int deltaSize = stream.getInt(); + bucketRecordDelta = new HashMap<>(deltaSize); + for (int i = 0; i < deltaSize; i++) { + bucketRecordDelta.put(stream.getInt(), stream.getInt()); + } } protected WALFile.WALTransaction readTxFromBuffer() { @@ -120,5 +138,4 @@ protected WALFile.WALTransaction readTxFromBuffer() { return tx; } - }