Skip to content

Commit

Permalink
fix: fixed issue with record count in HA
Browse files Browse the repository at this point in the history
Fixed issue #1182
  • Loading branch information
lvca committed Jul 23, 2023
1 parent b51ea65 commit 35c5659
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ public void kill() {
immutablePages.clear();
}

public Map<Integer, Integer> getBucketRecordDelta() {
final Map<Integer, Integer> map = new HashMap<>(bucketRecordDelta.size());
for (Map.Entry<Integer, AtomicInteger> entry : bucketRecordDelta.entrySet())
map.put(entry.getKey(), entry.getValue().get());
return map;
}

/**
* Returns the delta of records considering the pending changes in transaction.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void checkIntegrity() {

lastTxId = lowerTxId;

applyChanges(walPositions[lowerTx], true);
applyChanges(walPositions[lowerTx], Collections.emptyMap(), true);

walPositions[lowerTx] = activeWALFilePool[lowerTx].getTransaction(walPositions[lowerTx].endPositionInLog);
}
Expand Down Expand Up @@ -267,7 +267,7 @@ public Map<String, Object> getStats() {
return map;
}

public boolean applyChanges(final WALFile.WALTransaction tx, final boolean ignoreErrors) {
public boolean applyChanges(final WALFile.WALTransaction tx, final Map<Integer, Integer> bucketRecordDelta, final boolean ignoreErrors) {
boolean changed = false;
boolean involveDictionary = false;

Expand Down Expand Up @@ -362,6 +362,11 @@ public boolean applyChanges(final WALFile.WALTransaction tx, final boolean ignor
}
}

for (Map.Entry<Integer, Integer> entry : bucketRecordDelta.entrySet()) {
final Bucket bucket = database.getSchema().getBucketById(entry.getKey());
bucket.setCachedRecordCount(bucket.getCachedRecordCount() + entry.getValue());
}

if (involveDictionary) {
try {
database.getSchema().getDictionary().reload();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public ResultSet syncPull(final CommandContext context, final int nRecords) thro

r.setProperty("unique", index.isUnique());
r.setProperty("automatic", index.isAutomatic());
if (index instanceof IndexInternal)
r.setProperty("compacting", ((IndexInternal) index).isCompacting());
r.setProperty("compacting", ((IndexInternal) index).isCompacting());

if (fileId > -1) {
r.setProperty("fileId", fileId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ else if (type.getType() == Edge.RECORD_TYPE)
t = "edge";

r.setProperty("type", t);
r.setProperty("records", context.getDatabase().countType(typeName, false));
r.setProperty("buckets", type.getBuckets(false).stream().map((b) -> b.getName()).collect(Collectors.toList()));
r.setProperty("bucketSelectionStrategy", type.getBucketSelectionStrategy().getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ public void commit() {
replicateTx(tx, phase1, bufferChanges);
else {
// USE A BIGGER TIMEOUT CONSIDERING THE DOUBLE LATENCY
final TxForwardRequest command = new TxForwardRequest(ReplicatedDatabase.this, getTransactionIsolationLevel(), bufferChanges,
tx.getIndexChanges().toMap());
final TxForwardRequest command = new TxForwardRequest(ReplicatedDatabase.this, getTransactionIsolationLevel(), tx.getBucketRecordDelta(),
bufferChanges, tx.getIndexChanges().toMap());
server.getHA().forwardCommandToLeader(command, timeout * 2);
tx.reset();
}
Expand Down Expand Up @@ -171,7 +171,7 @@ public void replicateTx(final TransactionContext tx, final TransactionContext.Tr
throw new IllegalArgumentException("Quorum " + quorum + " not managed");
}

final TxRequest req = new TxRequest(getName(), bufferChanges, reqQuorum > 1);
final TxRequest req = new TxRequest(getName(), tx.getBucketRecordDelta(), bufferChanges, reqQuorum > 1);

final DatabaseChangeStructureRequest changeStructureRequest = getChangeStructure(-1);
if (changeStructureRequest != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ public class TxForwardRequest extends TxRequestAbstract {
public TxForwardRequest() {
}

public TxForwardRequest(final DatabaseInternal database, Database.TRANSACTION_ISOLATION_LEVEL transactionIsolationLevel, final Binary bufferChanges,
public TxForwardRequest(final DatabaseInternal database, Database.TRANSACTION_ISOLATION_LEVEL transactionIsolationLevel,
final Map<Integer, Integer> bucketRecordDelta, final Binary bufferChanges,
final Map<String, TreeMap<TransactionIndexContext.ComparableKey, Map<TransactionIndexContext.IndexKey, TransactionIndexContext.IndexKey>>> keysTx) {
super(database.getName(), bufferChanges);
super(database.getName(), bucketRecordDelta, bufferChanges);
this.isolationLevelIndex = transactionIsolationLevel.ordinal();
writeIndexKeysToBuffer(database, keysTx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.arcadedb.server.ha.ReplicationException;

import java.nio.channels.*;
import java.util.*;
import java.util.logging.*;

/**
Expand All @@ -41,8 +42,8 @@ public class TxRequest extends TxRequestAbstract {
public TxRequest() {
}

public TxRequest(final String dbName, final Binary bufferChanges, final boolean waitForResponse) {
super(dbName, bufferChanges);
public TxRequest(final String dbName, final Map<Integer, Integer> bucketRecordDelta, final Binary bufferChanges, final boolean waitForResponse) {
super(dbName, bucketRecordDelta, bufferChanges);
this.waitForResponse = waitForResponse;
}

Expand Down Expand Up @@ -92,7 +93,7 @@ public HACommand execute(final HAServer server, final String remoteServerName, f
try {
LogManager.instance().log(this, Level.FINE, "Applying tx %d from server %s (modifiedPages=%d)...", walTx.txId, remoteServerName, walTx.pages.length);

db.getTransactionManager().applyChanges(walTx, false);
db.getTransactionManager().applyChanges(walTx, bucketRecordDelta, false);

} catch (final WALException e) {
if (e.getCause() instanceof ClosedChannelException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,52 @@
import com.arcadedb.server.ArcadeDBServer;
import com.arcadedb.server.ha.ReplicationException;

import java.util.*;

public abstract class TxRequestAbstract extends HAAbstractCommand {
protected String databaseName;
protected int changesUncompressedLength;
protected Binary changesBuffer;
protected String databaseName;
protected int changesUncompressedLength;
protected Binary changesBuffer;
protected Map<Integer, Integer> bucketRecordDelta; // @SINCE 23.7.1

protected TxRequestAbstract() {
}

protected TxRequestAbstract(final String dbName, final Binary changesBuffer) {
protected TxRequestAbstract(final String dbName, final Map<Integer, Integer> bucketRecordDelta, final Binary changesBuffer) {
this.databaseName = dbName;

changesBuffer.rewind();
this.changesUncompressedLength = changesBuffer.size();
this.changesBuffer = CompressionFactory.getDefault().compress(changesBuffer);
this.bucketRecordDelta = bucketRecordDelta;
}

@Override
public void toStream(final Binary stream) {
stream.putString(databaseName);
stream.putInt(changesUncompressedLength);
stream.putBytes(changesBuffer.getContent(), changesBuffer.size());

// @SINCE 23.7.1
stream.putInt(bucketRecordDelta.size());
for (Map.Entry<Integer, Integer> entry : bucketRecordDelta.entrySet()) {
stream.putInt(entry.getKey());
stream.putInt(entry.getValue());
}
}

@Override
public void fromStream(final ArcadeDBServer server, final Binary stream) {
databaseName = stream.getString();
changesUncompressedLength = stream.getInt();
changesBuffer = CompressionFactory.getDefault().decompress(new Binary(stream.getBytes()), changesUncompressedLength);

// @SINCE 23.7.1
final int deltaSize = stream.getInt();
bucketRecordDelta = new HashMap<>(deltaSize);
for (int i = 0; i < deltaSize; i++) {
bucketRecordDelta.put(stream.getInt(), stream.getInt());
}
}

protected WALFile.WALTransaction readTxFromBuffer() {
Expand Down Expand Up @@ -120,5 +138,4 @@ protected WALFile.WALTransaction readTxFromBuffer() {

return tx;
}

}

0 comments on commit 35c5659

Please sign in to comment.