Skip to content

Commit

Permalink
chore: properly implemented end key on protocol node
Browse files Browse the repository at this point in the history
  • Loading branch information
troykessler committed Jun 12, 2024
1 parent 2845915 commit 4858ef0
Show file tree
Hide file tree
Showing 11 changed files with 647 additions and 158 deletions.
2 changes: 0 additions & 2 deletions common/protocol/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import {
isStorageBalanceZero,
isValidVersion,
isDataAvailable,
isEndkeyReached,
voteBundleProposal,
waitForAuthorization,
waitForCacheContinuation,
Expand Down Expand Up @@ -123,7 +122,6 @@ export class Validator {
protected isStorageBalanceZero = isStorageBalanceZero;
protected isStorageBalanceLow = isStorageBalanceLow;
protected isDataAvailable = isDataAvailable;
protected isEndkeyReached = isEndkeyReached;

// timeouts
protected waitForAuthorization = waitForAuthorization;
Expand Down
30 changes: 0 additions & 30 deletions common/protocol/src/methods/checks/isEndkeyReached.ts

This file was deleted.

1 change: 0 additions & 1 deletion common/protocol/src/methods/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export * from "./checks/isValidVersion";
export * from "./checks/isStorageBalanceLow";
export * from "./checks/isStorageBalanceZero";
export * from "./checks/isDataAvailable";
export * from "./checks/isEndkeyReached";

// timeouts
export * from "./timeouts/waitForAuthorization";
Expand Down
14 changes: 9 additions & 5 deletions common/protocol/src/methods/main/runCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ export async function runCache(this: Validator): Promise<void> {
this.m.cache_index_tail.set(Math.max(0, currentIndex - 1));

for (let i = currentIndex; i < targetIndex; i++) {
// if the end key is not empty and we have reached the end key of the pool
// we do not sync past this key
if (this.pool.data?.end_key && this.pool.data.end_key === key) {
this.logger.info(
`Reached pool end key "${key}", the node will not continue collecting data past this key.`
);
break;
}

// check if data item was already collected. If it was
// already collected we don't need to retrieve it again
this.logger.debug(`this.cacheProvider.exists(${i.toString()})`);
Expand Down Expand Up @@ -223,11 +232,6 @@ export async function runCache(this: Validator): Promise<void> {
key = nextKey;
}

// indicate that current caching round is done
this.logger.debug(
`Finished caching from index ${currentIndex} to ${targetIndex}. Waiting for next round ...`
);

// wait until a new bundle proposal is available. We don't need
// to sync the pool here because the pool state already gets
// synced in the other main function "runNode" so we only listen
Expand Down
6 changes: 6 additions & 0 deletions common/protocol/src/methods/main/runNode.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { PoolStatus } from "@kyvejs/types/lcd/kyve/pool/v1beta1/pool";
import { Validator } from "../..";
import { IDLE_TIME, sleep } from "../../utils";

Expand Down Expand Up @@ -33,6 +34,11 @@ export async function runNode(this: Validator): Promise<void> {
process.exit(1);
}

if (this.pool.status === PoolStatus.POOL_STATUS_END_KEY_REACHED) {
this.logger.info(`Reached pool end key. Shutting down node ...`);
process.exit(0);
}

// log warnings if storage provider balance is low
await this.isStorageBalanceLow();

Expand Down
7 changes: 0 additions & 7 deletions common/protocol/src/methods/upload/createBundleProposal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ export async function createBundleProposal(this: Validator): Promise<void> {
// as to_key
const toKey = bundleProposal.at(-1)?.key ?? "";

if (await this.isEndkeyReached(toKey)) {
this.logger.info(`End key reached. Skipping Uploader Role ...`);

await this.skipUploaderRole(fromIndex);
return;
}

// get the last value of the bundle proposal and format
// it so it can be included in the bundle proposal and
// saved on chain
Expand Down
13 changes: 13 additions & 0 deletions common/protocol/src/methods/validate/validateBundleProposal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,19 @@ export async function validateBundleProposal(
// validate each data item in bundle with custom runtime validation
for (let i = 0; i < proposedBundle.length; i++) {
if (valid) {
// if the pool has an end key and we find out that a data item
// has the end key and it is not the last data item in the bundle
// we consider the bundle invalid
if (this.pool.data?.end_key) {
if (
i < proposedBundle.length - 1 &&
proposedBundle[i].key === this.pool.data?.end_key
) {
valid = false;
break;
}
}

this.logger.debug(
`this.runtime.validateDataItem($THIS, $PROPOSED_DATA_ITEM, $VALIDATION_DATA_ITEM)`
);
Expand Down
188 changes: 168 additions & 20 deletions common/protocol/test/cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ TEST CASES - cache tests
* start caching from a pool where transformDataItem fails
* start caching from a pool where nextKey fails once
* start caching from a pool where cache methods fail
* TODO: test with pool config that has no source object
* TODO: test with pool config that has zero sources
* TODO: start caching from a pool where node has not cached anything yet
* start caching from a pool which has an endKey
*/

Expand Down Expand Up @@ -249,8 +247,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool which has a bundle proposal ongoing", async () => {
Expand Down Expand Up @@ -423,8 +419,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("continue caching from a pool which has a bundle proposal ongoing", async () => {
Expand Down Expand Up @@ -602,8 +596,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where last bundle proposal was dropped", async () => {
Expand Down Expand Up @@ -767,8 +759,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where getNextDataItem fails once", async () => {
Expand Down Expand Up @@ -931,8 +921,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where getNextDataItem fails multiple times", async () => {
Expand Down Expand Up @@ -1175,8 +1163,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test("start caching from a pool where transformDataItem fails", async () => {
Expand Down Expand Up @@ -1407,8 +1393,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);

// TODO: assert timeouts
});

test.skip("start caching from a pool where nextKey fails once", async () => {
Expand Down Expand Up @@ -1527,8 +1511,6 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(0);

// TODO: assert timeouts
});

test("start caching from a pool where cache methods fail", async () => {
Expand Down Expand Up @@ -1665,7 +1647,173 @@ describe("cache tests", () => {

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(0);
});

test("start caching from a pool which has an endKey", async () => {
// ARRANGE
v.pool = {
...genesis_pool,
data: {
...genesis_pool.data,
current_key: "99",
current_index: "100",
end_key: "152",
},
bundle_proposal: {
...genesis_pool.bundle_proposal,
storage_id: "test_storage_id",
uploader: "test_staker",
next_uploader: "test_staker",
data_size: "123456789",
data_hash: "test_bundle_hash",
bundle_size: "50",
from_key: "100",
to_key: "149",
bundle_summary: "test_summary",
updated_at: "0",
voters_valid: ["test_staker"],
},
} as any;

// ACT
await runCache.call(v);

// ASSERT
const txs = v["client"][0].kyve.bundles.v1beta1;
const queries = v["lcd"][0].kyve.query.v1beta1;
const cacheProvider = v["cacheProvider"];
const runtime = v["runtime"];

// ========================
// ASSERT CLIENT INTERFACES
// ========================

expect(txs.claimUploaderRole).toHaveBeenCalledTimes(0);

expect(txs.voteBundleProposal).toHaveBeenCalledTimes(0);

expect(txs.submitBundleProposal).toHaveBeenCalledTimes(0);

expect(txs.skipUploaderRole).toHaveBeenCalledTimes(0);

// =====================
// ASSERT LCD INTERFACES
// =====================

expect(queries.canVote).toHaveBeenCalledTimes(0);

expect(queries.canPropose).toHaveBeenCalledTimes(0);

// =========================
// ASSERT STORAGE INTERFACES
// =========================

expect(storageProvider.saveBundle).toHaveBeenCalledTimes(0);

expect(storageProvider.retrieveBundle).toHaveBeenCalledTimes(0);

// =======================
// ASSERT CACHE INTERFACES
// =======================

// we use 50 + 3 here because the current bundle is 50 items big
// and because of the end key we only index the next 3 items and stop
// afterwards
expect(cacheProvider.put).toHaveBeenCalledTimes(50 + 3);

for (let n = 0; n < 50 + 3; n++) {
const item = {
key: (n + parseInt(genesis_pool.data.max_bundle_size)).toString(),
value: `${
n + parseInt(genesis_pool.data.max_bundle_size)
}-value-transform`,
};
expect(cacheProvider.put).toHaveBeenNthCalledWith(
n + 1,
(n + parseInt(genesis_pool.data.max_bundle_size)).toString(),
item
);
}

expect(cacheProvider.get).toHaveBeenCalledTimes(0);

expect(cacheProvider.exists).toHaveBeenCalledTimes(
parseInt(genesis_pool.data.max_bundle_size) + 50 + 3
);

for (let n = 0; n < parseInt(genesis_pool.data.max_bundle_size); n++) {
expect(cacheProvider.exists).toHaveBeenNthCalledWith(n + 1, n.toString());
}

for (
let n = parseInt(genesis_pool.data.max_bundle_size);
n < parseInt(genesis_pool.data.max_bundle_size) + 50 + 3;
n++
) {
expect(cacheProvider.exists).toHaveBeenNthCalledWith(n + 1, n.toString());
}

expect(cacheProvider.del).toHaveBeenCalledTimes(0);

expect(cacheProvider.drop).toHaveBeenCalledTimes(0);

// =============================
// ASSERT COMPRESSION INTERFACES
// =============================

expect(compression.compress).toHaveBeenCalledTimes(0);

expect(compression.decompress).toHaveBeenCalledTimes(0);

// =========================
// ASSERT RUNTIME INTERFACES
// =========================

// TODO: assert timeouts
expect(runtime.getDataItem).toHaveBeenCalledTimes(50 + 3);

for (let n = 0; n < 50 + 3; n++) {
expect(runtime.getDataItem).toHaveBeenNthCalledWith(
n + 1,
v,
(n + parseInt(genesis_pool.data.max_bundle_size)).toString()
);
}

expect(runtime.transformDataItem).toHaveBeenCalledTimes(50 + 3);

for (let n = 0; n < 50 + 3; n++) {
const item = {
key: (n + parseInt(genesis_pool.data.max_bundle_size)).toString(),
value: `${n + parseInt(genesis_pool.data.max_bundle_size)}-value`,
};
expect(runtime.transformDataItem).toHaveBeenNthCalledWith(
n + 1,
expect.any(Validator),
item
);
}

expect(runtime.validateDataItem).toHaveBeenCalledTimes(0);

expect(runtime.nextKey).toHaveBeenCalledTimes(50 + 3);

// here we subtract the key - 1 because we start using the
// current key
for (let n = 0; n < 50 + 3; n++) {
expect(runtime.nextKey).toHaveBeenNthCalledWith(
n + 1,
expect.any(Validator),
(n + parseInt(genesis_pool.data.max_bundle_size) - 1).toString()
);
}

expect(runtime.summarizeDataBundle).toHaveBeenCalledTimes(0);

// ========================
// ASSERT NODEJS INTERFACES
// ========================

// assert that only one round ran
expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1);
});
});
Loading

0 comments on commit 4858ef0

Please sign in to comment.