Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update onProposedFederationChange in order to register SVP spend tx in the Bridge #365

Merged
38 changes: 19 additions & 19 deletions src/main/java/co/rsk/federate/BtcToRskClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public synchronized void setup(
public void start(Federation federation) {
logger.info("[start] Starting for Federation {}", federation.getAddress());
this.federation = federation;

FederationMember federator = federatorSupport.getFederationMember();
boolean isMember = federation.isMember(federator);

if (!isMember) {
String message = String.format(
"Member %s is no part of the federation %s",
Expand All @@ -128,10 +128,9 @@ public void start(Federation federation) {
logger.info("[start] Watching federation {} since I belong to it",
federation.getAddress());
bitcoinWrapper.addFederationListener(federation, this);
Optional<Integer> federatorIndex = federation.getBtcPublicKeyIndex(
federatorSupport.getFederationMember().getBtcPublicKey()
);

Optional<Integer> federatorIndex = federation.getBtcPublicKeyIndex(
federatorSupport.getFederationMember().getBtcPublicKey());
if (!federatorIndex.isPresent()) {
String message = String.format(
"Federator %s is a member of the federation %s but could not find the btcPublicKeyIndex",
Expand All @@ -142,20 +141,19 @@ public void start(Federation federation) {
throw new IllegalStateException(message);
}

TurnScheduler scheduler = new TurnScheduler(
bridgeConstants.getUpdateBridgeExecutionPeriod(),
federation.getSize()
);
long now = Clock.systemUTC().instant().toEpochMilli();

if (isUpdateBridgeTimerEnabled) {
updateBridgeTimer = Executors.newSingleThreadScheduledExecutor();
long now = Clock.systemUTC().instant().toEpochMilli();
TurnScheduler scheduler = new TurnScheduler(
bridgeConstants.getUpdateBridgeExecutionPeriod(),
federation.getSize());

this.updateBridgeTimer = Executors.newSingleThreadScheduledExecutor();

updateBridgeTimer.scheduleAtFixedRate(
this::updateBridge,
scheduler.getDelay(now, federatorIndex.get()),
scheduler.getInterval(),
TimeUnit.MILLISECONDS
);
TimeUnit.MILLISECONDS);
} else {
logger.info("[start] updateBridgeTimer is disabled");
}
Expand All @@ -164,11 +162,11 @@ public void start(Federation federation) {
public void stop() {
logger.info("Stopping");

federation = null;
this.federation = null;

if (updateBridgeTimer != null) {
updateBridgeTimer.shutdown();
updateBridgeTimer = null;
this.updateBridgeTimer = null;
}
}

Expand All @@ -180,13 +178,15 @@ public void updateBridge() {
if (federation == null) {
logger.warn("[updateBridge] updateBridge skipped because no Federation is associated to this BtcToRskClient");
}

if (nodeBlockProcessor.hasBetterBlockToSync()) {
logger.warn("[updateBridge] updateBridge skipped because the node is syncing blocks");
return;
}

logger.debug("[updateBridge] Updating bridge");

if(shouldUpdateBridgeBtcBlockchain) {
if (shouldUpdateBridgeBtcBlockchain) {
// Call receiveHeaders
try {
int numberOfBlocksSent = updateBridgeBtcBlockchain();
Expand All @@ -197,7 +197,7 @@ public void updateBridge() {
}
}

if(shouldUpdateBridgeBtcCoinbaseTransactions) {
if (shouldUpdateBridgeBtcCoinbaseTransactions) {
// Call registerBtcCoinbaseTransaction
try {
logger.debug("[updateBridge] Updating transactions and sending update");
Expand All @@ -208,7 +208,7 @@ public void updateBridge() {
}
}

if(shouldUpdateBridgeBtcTransactions) {
if (shouldUpdateBridgeBtcTransactions) {
// Call registerBtcTransaction
try {
logger.debug("[updateBridge] Updating transactions and sending update");
Expand All @@ -219,7 +219,7 @@ public void updateBridge() {
}
}

if(shouldUpdateCollections) {
if (shouldUpdateCollections) {
// Call updateCollections
try {
logger.debug("[updateBridge] Sending updateCollections");
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/co/rsk/federate/FedNodeRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ private void startFederate() throws Exception {
FederationWatcherListener federationWatcherListener = new FederationWatcherListenerImpl(
btcToRskClientActive,
btcToRskClientRetiring,
btcReleaseClient);
btcReleaseClient,
bitcoinWrapper);

federationWatcher.start(federationProvider, federationWatcherListener);
}
Expand Down
81 changes: 44 additions & 37 deletions src/main/java/co/rsk/federate/bitcoin/BitcoinWrapperImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* @author Oscar Guindzberg
*/
public class BitcoinWrapperImpl implements BitcoinWrapper {

private class FederationListener {
private Federation federation;
private TransactionListener listener;
Expand Down Expand Up @@ -73,6 +74,9 @@ public boolean equals(Object o) {
}
}

private static final int MAX_SIZE_MAP_STORED_BLOCKS = 10_000;
private static final Logger LOGGER = LoggerFactory.getLogger(BitcoinWrapperImpl.class);

private Context btcContext;
private BridgeConstants bridgeConstants;
private boolean running = false;
Expand All @@ -86,9 +90,6 @@ public boolean equals(Object o) {
private final FederatorSupport federatorSupport;
private final Kit kit;

public static final int MAX_SIZE_MAP_STORED_BLOCKS = 10_000;
private static final Logger LOGGER = LoggerFactory.getLogger(BitcoinWrapperImpl.class);

public BitcoinWrapperImpl(
Context btcContext,
BridgeConstants bridgeConstants,
Expand Down Expand Up @@ -316,42 +317,48 @@ public void removeNewBestBlockListener(NewBestBlockListener newBestBlockListener
}

protected void coinsReceivedOrSent(Transaction tx) {
if (watchedFederations.size() > 0) {
LOGGER.debug("[coinsReceivedOrSent] Received filtered transaction {}", tx.getWTxId().toString());
Context.propagate(btcContext);
// Wrap tx in a co.rsk.bitcoinj.core.BtcTransaction
BtcTransaction btcTx = ThinConverter.toThinInstance(bridgeConstants.getBtcParams(), tx);
co.rsk.bitcoinj.core.Context btcContextThin = ThinConverter.toThinInstance(btcContext);
for (FederationListener watched : watchedFederations) {
Federation watchedFederation = watched.getFederation();
TransactionListener listener = watched.getListener();
Wallet watchedFederationWallet = new BridgeBtcWallet(btcContextThin, Collections.singletonList(watchedFederation));
if (PegUtilsLegacy.isValidPegInTx(btcTx, watchedFederation, watchedFederationWallet, bridgeConstants, federatorSupport.getConfigForBestBlock())) {

PeginInformation peginInformation = new PeginInformation(
btcLockSenderProvider,
peginInstructionsProvider,
federatorSupport.getConfigForBestBlock()
);
try {
peginInformation.parse(btcTx);
} catch (PeginInstructionsException e) {
// If tx sender could be retrieved then let the Bridge process the tx and refund the sender
if (peginInformation.getSenderBtcAddress() != null) {
LOGGER.debug("[coinsReceivedOrSent] [btctx:{}] is not a valid lock tx, funds will be refunded to sender", tx.getWTxId());
} else {
LOGGER.debug("[coinsReceivedOrSent] [btctx:{}] is not a valid lock tx and won't be processed!", tx.getWTxId());
continue;
}
}
if (watchedFederations.isEmpty()) {
return;
}

LOGGER.debug("[coinsReceivedOrSent] [btctx:{}] is a lock", tx.getWTxId());
listener.onTransaction(tx);
}
if (PegUtilsLegacy.isPegOutTx(btcTx, Collections.singletonList(watchedFederation), federatorSupport.getConfigForBestBlock())) {
LOGGER.debug("[coinsReceivedOrSent] [btctx with hash {} and witness hash {}] is a pegout", tx.getTxId(), tx.getWTxId());
listener.onTransaction(tx);
LOGGER.debug("[coinsReceivedOrSent] Received filtered transaction {}", tx.getWTxId().toString());
Context.propagate(btcContext);

// Wrap tx in a co.rsk.bitcoinj.core.BtcTransaction
BtcTransaction btcTx = ThinConverter.toThinInstance(bridgeConstants.getBtcParams(), tx);
co.rsk.bitcoinj.core.Context btcContextThin = ThinConverter.toThinInstance(btcContext);

for (FederationListener watched : watchedFederations) {
Federation watchedFederation = watched.getFederation();
TransactionListener listener = watched.getListener();
Wallet watchedFederationWallet = new BridgeBtcWallet(btcContextThin, Collections.singletonList(watchedFederation));

if (PegUtilsLegacy.isValidPegInTx(btcTx, watchedFederation, watchedFederationWallet, bridgeConstants, federatorSupport.getConfigForBestBlock())) {
PeginInformation peginInformation = new PeginInformation(
btcLockSenderProvider,
peginInstructionsProvider,
federatorSupport.getConfigForBestBlock()
);

try {
peginInformation.parse(btcTx);
} catch (PeginInstructionsException e) {
// If tx sender could be retrieved then let the Bridge process the tx and refund the sender
if (peginInformation.getSenderBtcAddress() != null) {
LOGGER.debug("[coinsReceivedOrSent] [btctx:{}] is not a valid lock tx, funds will be refunded to sender", tx.getWTxId());
} else {
LOGGER.debug("[coinsReceivedOrSent] [btctx:{}] is not a valid lock tx and won't be processed!", tx.getWTxId());
continue;
}
}

LOGGER.debug("[coinsReceivedOrSent] [btctx:{}] is a lock", tx.getWTxId());
listener.onTransaction(tx);
}

if (PegUtilsLegacy.isPegOutTx(btcTx, Collections.singletonList(watchedFederation), federatorSupport.getConfigForBestBlock())) {
LOGGER.debug("[coinsReceivedOrSent] [btctx with hash {} and witness hash {}] is a pegout", tx.getTxId(), tx.getWTxId());
listener.onTransaction(tx);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package co.rsk.federate.watcher;

import co.rsk.federate.BtcToRskClient;
import co.rsk.federate.bitcoin.BitcoinWrapper;
import co.rsk.federate.btcreleaseclient.BtcReleaseClient;
import co.rsk.peg.federation.Federation;
import org.slf4j.Logger;
Expand All @@ -14,14 +15,17 @@ public class FederationWatcherListenerImpl implements FederationWatcherListener
private final BtcToRskClient btcToRskClientActive;
private final BtcToRskClient btcToRskClientRetiring;
private final BtcReleaseClient btcReleaseClient;
private final BitcoinWrapper bitcoinWrapper;

public FederationWatcherListenerImpl(
BtcToRskClient btcToRskClientActive,
BtcToRskClient btcToRskClientRetiring,
BtcReleaseClient btcReleaseClient) {
BtcReleaseClient btcReleaseClient,
BitcoinWrapper bitcoinWrapper) {
this.btcToRskClientActive = btcToRskClientActive;
this.btcToRskClientRetiring = btcToRskClientRetiring;
this.btcReleaseClient = btcReleaseClient;
this.bitcoinWrapper = bitcoinWrapper;
}

@Override
Expand Down Expand Up @@ -51,13 +55,17 @@ public void onProposedFederationChange(Federation newProposedFederation) {
// start {@code BtcReleaseClient} with proposed federation
// so it can sign svp spend tx
btcReleaseClient.start(newProposedFederation);

// add proposed federation to active btc to rsk client so
// it can register svp spend tx in the bridge
bitcoinWrapper.addFederationListener(newProposedFederation, btcToRskClientActive);
Comment on lines +59 to +61
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main and only change of the PR, the rest are small refactors

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this done already in btcReleaseClient.start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope


logger.info(
"[onProposedFederationChange] Client for proposed federation [{}] started with success",
"[onProposedFederationChange] Clients for proposed federation [{}] started with success",
newProposedFederation.getAddress());
} catch (Exception e) {
logger.error(
"[onProposedFederationChange] Client for proposed federation [{}] failed to start",
"[onProposedFederationChange] Clients for proposed federation [{}] failed to start",
newProposedFederation.getAddress(),
e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import co.rsk.bitcoinj.core.BtcECKey;
import co.rsk.bitcoinj.core.NetworkParameters;
import co.rsk.federate.BtcToRskClient;
import co.rsk.federate.bitcoin.BitcoinWrapper;
import co.rsk.federate.btcreleaseclient.BtcReleaseClient;
import co.rsk.peg.federation.Federation;
import co.rsk.peg.federation.FederationArgs;
Expand Down Expand Up @@ -38,15 +39,17 @@ class FederationWatcherListenerImplTest {
private BtcToRskClient btcToRskClientActive;
private BtcToRskClient btcToRskClientRetiring;
private BtcReleaseClient btcReleaseClient;
private BitcoinWrapper bitcoinWrapper;
private FederationWatcherListener federationWatcherListener;

@BeforeEach
void setUp() {
btcToRskClientActive = mock(BtcToRskClient.class);
btcToRskClientRetiring = mock(BtcToRskClient.class);
btcReleaseClient = mock(BtcReleaseClient.class);
bitcoinWrapper = mock(BitcoinWrapper.class);
federationWatcherListener = new FederationWatcherListenerImpl(
btcToRskClientActive, btcToRskClientRetiring, btcReleaseClient);
btcToRskClientActive, btcToRskClientRetiring, btcReleaseClient, bitcoinWrapper);
}

@Test
Expand Down Expand Up @@ -99,6 +102,7 @@ void onProposedFederationChange_whenNewProposedFederationIsNull_shouldNotStartCl

// Assert
verify(btcReleaseClient, never()).start(any(Federation.class));
verify(bitcoinWrapper, never()).addFederationListener(any(Federation.class), any(BtcToRskClient.class));
}

@Test
Expand All @@ -108,6 +112,7 @@ void onProposedFederationChange_whenNewProposedFederationIsValid_shouldStartClie

// Assert
verify(btcReleaseClient).start(FEDERATION);
verify(bitcoinWrapper).addFederationListener(FEDERATION, btcToRskClientActive);
}

@Test
Expand Down
Loading