Skip to content
This repository has been archived by the owner on Oct 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #201 from ar-io/PE-5546-update-prescribed-after-di…
Browse files Browse the repository at this point in the history
…stro

fix(PE-5546) update prescribed after distro
  • Loading branch information
dtfiedler authored Feb 7, 2024
2 parents 603954b + baa51f7 commit 3bcc253
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 130 deletions.
26 changes: 3 additions & 23 deletions src/actions/read/observers.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import {
EPOCH_BLOCK_LENGTH,
EPOCH_DISTRIBUTION_DELAY,
GATEWAY_REGISTRY_SETTINGS,
TENURE_WEIGHT_PERIOD,
} from '../../constants';
import { EPOCH_BLOCK_LENGTH, EPOCH_DISTRIBUTION_DELAY } from '../../constants';
import {
getBaselineState,
stubbedGatewayData,
Expand All @@ -27,7 +22,7 @@ describe('getPrescribedObservers', () => {
});
});

it('should return the current array of prescribed observer if not set in state yet', async () => {
it('should return the an empty array of prescribed observer if not set in state yet', async () => {
const state = {
...getBaselineState(),
gateways: {
Expand All @@ -41,22 +36,7 @@ it('should return the current array of prescribed observer if not set in state y
// no distributions
};
const { result } = await getPrescribedObservers(state);
expect(result).toEqual([
{
gatewayAddress: 'a-test-gateway',
observerAddress: stubbedGatewayData.observerWallet,
gatewayRewardRatioWeight: 1,
observerRewardRatioWeight: 1,
stake: stubbedGatewayData.operatorStake,
start: 0,
stakeWeight:
stubbedGatewayData.operatorStake /
GATEWAY_REGISTRY_SETTINGS.minOperatorStake,
tenureWeight: 1 / TENURE_WEIGHT_PERIOD, // the gateway started at the same time as the epoch
compositeWeight: 1 / TENURE_WEIGHT_PERIOD,
normalizedCompositeWeight: 1,
},
]);
expect(result).toEqual([]);
});

describe('getEpoch', () => {
Expand Down
18 changes: 4 additions & 14 deletions src/actions/read/observers.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { EPOCH_BLOCK_LENGTH, GATEWAY_REGISTRY_SETTINGS } from '../../constants';
import {
getEpochDataForHeight,
getPrescribedObserversForEpoch,
} from '../../observers';
import { EPOCH_BLOCK_LENGTH } from '../../constants';
import { getEpochDataForHeight } from '../../observers';
import {
BlockHeight,
ContractReadResult,
Expand All @@ -14,21 +11,14 @@ export const getPrescribedObservers = async (
state: IOState,
): Promise<ContractReadResult> => {
const { prescribedObservers, distributions } = state;
const { epochStartHeight, epochEndHeight } = getEpochDataForHeight({
const { epochStartHeight } = getEpochDataForHeight({
currentBlockHeight: new BlockHeight(+SmartWeave.block.height),
epochZeroStartHeight: new BlockHeight(distributions.epochZeroStartHeight),
epochBlockLength: new BlockHeight(EPOCH_BLOCK_LENGTH),
});

const existingOrComputedObservers =
prescribedObservers[epochStartHeight.valueOf()] ||
(await getPrescribedObserversForEpoch({
gateways: state.gateways,
distributions: state.distributions,
epochStartHeight,
epochEndHeight,
minOperatorStake: GATEWAY_REGISTRY_SETTINGS.minOperatorStake,
}));
prescribedObservers[epochStartHeight.valueOf()] || [];

return { result: existingOrComputedObservers };
};
Expand Down
12 changes: 3 additions & 9 deletions src/actions/write/tick.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ describe('tick', () => {
});
});

it('should not distribute rewards, increment gateway performance stats or update epoch values if the current block height is not greater than the current epoch end height', async () => {
it('should not distribute rewards, increment gateway performance stats or update epoch values if the current block height is not the epoch distribution height', async () => {
const initialState: IOState = {
...getBaselineState(),
balances: {
Expand Down Expand Up @@ -864,7 +864,7 @@ describe('tick', () => {
index: index + 1,
})),
)(
'should not distribute rewards or increment gateway stats, but epoch values if the current block height is equal to the last epoch end height + %s blocks',
'should not distribute rewards or increment gateway stats, update prescribed observers or modify state the current block height is equal to the last epoch end height + %s blocks',
async ({ index: blockHeightDiff }) => {
const initialState: IOState = {
...getBaselineState(),
Expand All @@ -885,13 +885,7 @@ describe('tick', () => {
prescribedObservers: initialState.prescribedObservers,
});
expect(balances).toEqual(initialState.balances);
expect(distributions).toEqual({
...initialState.distributions,
epochPeriod: initialState.distributions.epochPeriod + 1,
epochStartHeight: initialState.distributions.epochEndHeight + 1,
epochEndHeight:
initialState.distributions.epochEndHeight + EPOCH_BLOCK_LENGTH,
});
expect(distributions).toEqual(initialState.distributions);
expect(gateways).toEqual(initialState.gateways);
},
);
Expand Down
34 changes: 3 additions & 31 deletions src/actions/write/tick.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,43 +442,15 @@ export async function tickRewardDistribution({
distributions.nextDistributionHeight,
);

// distribution should only happen ONCE on block that is EPOCH_DISTRIBUTION_DELAY after the last completed epoch
// distribution should only happen ONCE on block that is EPOCH_DISTRIBUTION_DELAY after the last completed epoch, do nothing if we are not there yet
if (
currentBlockHeight.valueOf() !== distributionHeightForLastEpoch.valueOf()
) {
const {
epochStartHeight: nextEpochStartHeight,
epochEndHeight: nextEpochEndHeight,
epochPeriod: newEpochPeriod,
} = getEpochDataForHeight({
currentBlockHeight,
epochZeroStartHeight: new BlockHeight(distributions.epochZeroStartHeight),
epochBlockLength: new BlockHeight(EPOCH_BLOCK_LENGTH),
});

const updatedPrescribedObservers = await getPrescribedObserversForEpoch({
gateways,
epochStartHeight: nextEpochStartHeight,
epochEndHeight: nextEpochEndHeight,
distributions,
minOperatorStake: GATEWAY_REGISTRY_SETTINGS.minOperatorStake,
});
// increment the epoch variables if we've moved to the next epoch, but DO NOT update the nextDistributionHeight as that will happen below after distributions are complete
const updatedEpochData: EpochDistributionData = {
epochStartHeight: nextEpochStartHeight.valueOf(),
epochEndHeight: nextEpochEndHeight.valueOf(),
epochZeroStartHeight: distributions.epochZeroStartHeight,
nextDistributionHeight: distributionHeightForLastEpoch.valueOf(), // DON'T UPDATE THIS UNTIL THE DISTRIBUTION OCCURS
epochPeriod: newEpochPeriod.valueOf(),
};

return {
distributions: updatedEpochData,
distributions,
balances,
gateways,
prescribedObservers: {
[nextEpochStartHeight.valueOf()]: updatedPrescribedObservers,
},
prescribedObservers: prescribedObservers as PrescribedObservers,
};
}

Expand Down
84 changes: 31 additions & 53 deletions tests/observation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ describe('Observation', () => {
const gatewayWalletAddresses: string[] = [];
let contract: Contract<IOState>;
let srcContractId: string;
let prevState: IOState;
const wallets: {
addr: string;
jwk: JWKInterface;
Expand Down Expand Up @@ -57,6 +58,7 @@ describe('Observation', () => {
];
});
beforeEach(async () => {
prevState = (await contract.readState()).cachedValue.state;
const { result }: { result: WeightedObserver[] } = await contract.viewState(
{
function: 'prescribedObservers',
Expand Down Expand Up @@ -94,17 +96,17 @@ describe('Observation', () => {
});

describe('write interactions', () => {
it('should save observations in epoch if prescribed observer and the current block height is past the epoch start height + delay period', async () => {
const { cachedValue: prevCachedValue } = await contract.readState();
const previousState = prevCachedValue.state as IOState;
beforeAll(async () => {
// ensure that we are past the delay period
const minimumObservationHeight =
previousState.distributions.epochStartHeight +
EPOCH_DISTRIBUTION_DELAY;
prevState.distributions.epochEndHeight + EPOCH_DISTRIBUTION_DELAY + 1;
const diffInBlocks =
minimumObservationHeight - (await getCurrentBlock(arweave)).valueOf();
if (diffInBlocks > 0) {
await mineBlocks(arweave, diffInBlocks);
}
});
it('should save observations in epoch if prescribed observer and the current block height is past the epoch start height + delay period', async () => {
const writeInteractions = await Promise.all(
prescribedObserverWallets.map((wallet) => {
contract = warp
Expand Down Expand Up @@ -146,26 +148,14 @@ describe('Observation', () => {
});

it('should allow an observer to update their observation with new failures/report if selected as observer and the current block height is past the epoch start height + delay period', async () => {
const { cachedValue: prevCachedValue } = await contract.readState();
const previousState = prevCachedValue.state as IOState;
const previousSummary =
previousState.observations[currentEpochStartHeight.valueOf()]
prevState.observations[currentEpochStartHeight.valueOf()]
?.failureSummaries;
const previousReports =
previousState.observations[currentEpochStartHeight.valueOf()]
?.reports;
prevState.observations[currentEpochStartHeight.valueOf()]?.reports;
contract = warp
.contract<IOState>(srcContractId)
.connect(prescribedObserverWallets[0].jwk);
// ensure that we are past the delay period
const minimumObservationHeight =
previousState.distributions.epochStartHeight +
EPOCH_DISTRIBUTION_DELAY;
const diffInBlocks =
minimumObservationHeight - (await getCurrentBlock(arweave)).valueOf();
if (diffInBlocks > 0) {
await mineBlocks(arweave, diffInBlocks);
}
const writeInteraction = await contract.writeInteraction({
function: 'saveObservations',
observerReportTxId: EXAMPLE_OBSERVER_REPORT_TX_IDS[1],
Expand Down Expand Up @@ -239,8 +229,6 @@ describe('Observation', () => {
});

it('should update gateways observerReportTxId tx id if gateway is a prescribed observer saves observation again within the same epoch', async () => {
const previousObservation = await contract.readState();
const prevState = previousObservation.cachedValue.state as IOState;
const previousReportsAndSummary =
prevState.observations[currentEpochStartHeight.valueOf()];
const writeInteractions = await Promise.all(
Expand Down Expand Up @@ -282,7 +270,6 @@ describe('Observation', () => {
describe('non-prescribed observer', () => {
describe('write interactions', () => {
it('should not save observation report if not prescribed observer', async () => {
const { cachedValue: prevCachedValue } = await contract.readState();
const nonPrescribedObserver = wallets[8].jwk; // not allowed to observe
contract = warp
.contract<IOState>(srcContractId)
Expand All @@ -297,12 +284,11 @@ describe('Observation', () => {
expect(
newCachedValue.errorMessages[writeInteraction?.originalTxId],
).toEqual(INVALID_OBSERVATION_CALLER_MESSAGE);
expect(newCachedValue.state).toEqual(prevCachedValue.state);
expect(newCachedValue.state).toEqual(prevState);
});

it('should not save observation report if the caller is not a registered observer', async () => {
const notJoinedGateway = await createLocalWallet(arweave);
const { cachedValue: prevCachedValue } = await contract.readState();
contract = warp
.contract<IOState>(srcContractId)
.connect(notJoinedGateway.wallet);
Expand All @@ -316,14 +302,13 @@ describe('Observation', () => {
expect(
newCachedValue.errorMessages[writeInteraction?.originalTxId],
).toEqual(INVALID_OBSERVATION_CALLER_MESSAGE);
expect(newCachedValue.state).toEqual(prevCachedValue.state);
expect(newCachedValue.state).toEqual(prevState);
});
});
});
describe('fast forwarding to the next epoch', () => {
it('should update the prescribed observers, distributed balances, and increment gateway stats when distribution happens', async () => {
await mineBlocks(arweave, EPOCH_BLOCK_LENGTH);
const { cachedValue: prevCachedValue } = await contract.readState();
const writeInteraction = await contract
.connect(wallets[0].jwk)
.writeInteraction({
Expand All @@ -337,39 +322,34 @@ describe('Observation', () => {
const newState = newCachedValue.state as IOState;
// updated correctly
expect(newState.distributions).toEqual({
epochZeroStartHeight:
prevCachedValue.state.distributions.epochZeroStartHeight,
epochPeriod: prevCachedValue.state.distributions.epochPeriod + 1,
epochStartHeight:
prevCachedValue.state.distributions.epochEndHeight + 1,
epochZeroStartHeight: prevState.distributions.epochZeroStartHeight,
epochPeriod: prevState.distributions.epochPeriod + 1,
epochStartHeight: prevState.distributions.epochEndHeight + 1,
epochEndHeight:
prevCachedValue.state.distributions.epochEndHeight +
EPOCH_BLOCK_LENGTH,
prevState.distributions.epochEndHeight + EPOCH_BLOCK_LENGTH,
nextDistributionHeight:
prevCachedValue.state.distributions.epochEndHeight +
prevState.distributions.epochEndHeight +
EPOCH_BLOCK_LENGTH +
EPOCH_DISTRIBUTION_DELAY,
});
const gatewaysAroundDuringEpoch = Object.keys(
prevCachedValue.state.gateways,
).filter(
const gatewaysAroundDuringEpoch = Object.keys(prevState.gateways).filter(
(gatewayAddress) =>
prevCachedValue.state.gateways[gatewayAddress].start <=
prevCachedValue.state.distributions.epochStartHeight &&
(prevCachedValue.state.gateways[gatewayAddress].end === 0 ||
prevCachedValue.state.gateways[gatewayAddress].end >
prevCachedValue.state.distributions.epochEndHeight),
prevState.gateways[gatewayAddress].start <=
prevState.distributions.epochStartHeight &&
(prevState.gateways[gatewayAddress].end === 0 ||
prevState.gateways[gatewayAddress].end >
prevState.distributions.epochEndHeight),
);
const gatewaysExistedButNotStarted = Object.keys(
prevCachedValue.state.gateways,
prevState.gateways,
).reduce((gateways: Gateways, gatewayAddress) => {
if (
prevCachedValue.state.gateways[gatewayAddress].start >
prevCachedValue.state.distributions.epochStartHeight
prevState.gateways[gatewayAddress].start >
prevState.distributions.epochStartHeight
) {
return {
...gateways,
[gatewayAddress]: prevCachedValue.state.gateways[gatewayAddress],
[gatewayAddress]: prevState.gateways[gatewayAddress],
};
}
return gateways;
Expand All @@ -378,20 +358,18 @@ describe('Observation', () => {
...gatewaysExistedButNotStarted,
...gatewaysAroundDuringEpoch.reduce(
(gateways: Gateways, gatewayAddress) => {
const gateway = prevCachedValue.state.gateways[gatewayAddress];
const gateway = prevState.gateways[gatewayAddress];
const didFail =
prevCachedValue.state.observations[
prevCachedValue.state.distributions.epochStartHeight
]?.failureSummaries[gatewayAddress] ||
prevState.observations[prevState.distributions.epochStartHeight]
?.failureSummaries[gatewayAddress] ||
[].length >
prescribedObservers.length * OBSERVATION_FAILURE_THRESHOLD;
const wasPrescribed = prescribedObservers.some(
(observer) => observer.observerAddress === gateway.observerWallet,
);
const didObserve =
prevCachedValue.state.observations[
prevCachedValue.state.distributions.epochStartHeight
].reports[gateway.observerWallet] !== undefined;
prevState.observations[prevState.distributions.epochStartHeight]
.reports[gateway.observerWallet] !== undefined;
return {
...gateways,
[gatewayAddress]: {
Expand Down

0 comments on commit 3bcc253

Please sign in to comment.