-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable setting of transaction expiration handler in active transaction management #1395
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ | |
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.BiConsumer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -32,6 +34,17 @@ public abstract class ActiveTransactionManagedDistributedTransactionManager | |
|
||
private final ActiveExpiringMap<String, ActiveTransaction> activeTransactions; | ||
|
||
private final AtomicReference<BiConsumer<String, DistributedTransaction>> | ||
transactionExpirationHandler = | ||
new AtomicReference<>( | ||
(id, t) -> { | ||
try { | ||
t.rollback(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [minor] I think this private final AtomicReference<BiConsumer<String, DistributedTransaction>> transactionExpirationCallback = new AtomicReference<>((id, t) -> {});
private final BiConsumer<String, DistributedTransaction>
transactionExpirationHandler =
(id, t) -> {
try {
transactionExpirationCallback.get().accept(id, t);
}
catch (Throwable t) {
// Warning
}
try {
t.rollback();
} catch (Exception e) {
logger.warn("Rollback failed. transaction ID: {}", id, e);
}
}; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I agree that |
||
} catch (Exception e) { | ||
logger.warn("Rollback failed. transaction ID: {}", id, e); | ||
} | ||
}); | ||
|
||
public ActiveTransactionManagedDistributedTransactionManager(DatabaseConfig config) { | ||
super(config); | ||
activeTransactions = | ||
|
@@ -40,14 +53,14 @@ public ActiveTransactionManagedDistributedTransactionManager(DatabaseConfig conf | |
TRANSACTION_EXPIRATION_INTERVAL_MILLIS, | ||
(id, t) -> { | ||
logger.warn("The transaction is expired. transaction ID: {}", id); | ||
try { | ||
t.rollback(); | ||
} catch (Exception e) { | ||
logger.warn("Rollback failed. transaction ID: {}", id, e); | ||
} | ||
transactionExpirationHandler.get().accept(id, t); | ||
}); | ||
} | ||
|
||
public void setTransactionExpirationHandler(BiConsumer<String, DistributedTransaction> handler) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will be able to set a transaction expiration handler after this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @brfrn169 Can you tell me how the custom handler would be like? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm going to use it for the ScalarDB Auth thing. Theoretically, we can do anything for expired transactions with this handler. For example, we can output detail logs for the expired transactions. |
||
transactionExpirationHandler.set(handler); | ||
} | ||
|
||
private void add(ActiveTransaction transaction) throws TransactionException { | ||
if (activeTransactions.putIfAbsent(transaction.getId(), transaction).isPresent()) { | ||
transaction.rollback(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,8 @@ | |
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.BiConsumer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -34,6 +36,17 @@ public abstract class ActiveTransactionManagedTwoPhaseCommitTransactionManager | |
|
||
private final ActiveExpiringMap<String, ActiveTransaction> activeTransactions; | ||
|
||
private final AtomicReference<BiConsumer<String, TwoPhaseCommitTransaction>> | ||
transactionExpirationHandler = | ||
new AtomicReference<>( | ||
(id, t) -> { | ||
try { | ||
t.rollback(); | ||
} catch (Exception e) { | ||
logger.warn("Rollback failed. transaction ID: {}", id, e); | ||
} | ||
}); | ||
|
||
public ActiveTransactionManagedTwoPhaseCommitTransactionManager(DatabaseConfig config) { | ||
super(config); | ||
activeTransactions = | ||
|
@@ -42,14 +55,15 @@ public ActiveTransactionManagedTwoPhaseCommitTransactionManager(DatabaseConfig c | |
TRANSACTION_EXPIRATION_INTERVAL_MILLIS, | ||
(id, t) -> { | ||
logger.warn("The transaction is expired. transaction ID: {}", id); | ||
try { | ||
t.rollback(); | ||
} catch (Exception e) { | ||
logger.warn("Rollback failed. transaction ID: {}", id, e); | ||
} | ||
transactionExpirationHandler.get().accept(id, t); | ||
}); | ||
} | ||
|
||
public void setTransactionExpirationHandler( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
BiConsumer<String, TwoPhaseCommitTransaction> handler) { | ||
transactionExpirationHandler.set(handler); | ||
} | ||
|
||
private void add(ActiveTransaction transaction) throws TransactionException { | ||
if (activeTransactions.putIfAbsent(transaction.getId(), transaction).isPresent()) { | ||
transaction.rollback(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question.
I think we also have this expiration handler in ScalarDB Cluster, but I'm wondering if we need to have it in both sides: ScalarDB core and ScalarDB Cluster.
If the core implements it, the one in ScalarDB Cluster is not necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I can't remember the expiration handler in ScalarDB Cluster. Please give me some more information about that?