From 4858ef03f578cff0dbff577895d0df0787b996e9 Mon Sep 17 00:00:00 2001 From: Troy Kessler Date: Wed, 12 Jun 2024 12:18:51 +0200 Subject: [PATCH] chore: properly implemented end key on protocol node --- common/protocol/src/index.ts | 2 - .../src/methods/checks/isEndkeyReached.ts | 30 -- common/protocol/src/methods/index.ts | 1 - common/protocol/src/methods/main/runCache.ts | 14 +- common/protocol/src/methods/main/runNode.ts | 6 + .../methods/upload/createBundleProposal.ts | 7 - .../validate/validateBundleProposal.ts | 13 + common/protocol/test/cache.test.ts | 188 +++++++++-- .../test/checks/is_endkey_reached.test.ts | 93 ------ common/protocol/test/vote_invalid.test.ts | 141 ++++++++ common/protocol/test/vote_valid.test.ts | 310 ++++++++++++++++++ 11 files changed, 647 insertions(+), 158 deletions(-) delete mode 100644 common/protocol/src/methods/checks/isEndkeyReached.ts delete mode 100644 common/protocol/test/checks/is_endkey_reached.test.ts diff --git a/common/protocol/src/index.ts b/common/protocol/src/index.ts index c544563b..4ede8f6e 100644 --- a/common/protocol/src/index.ts +++ b/common/protocol/src/index.ts @@ -39,7 +39,6 @@ import { isStorageBalanceZero, isValidVersion, isDataAvailable, - isEndkeyReached, voteBundleProposal, waitForAuthorization, waitForCacheContinuation, @@ -123,7 +122,6 @@ export class Validator { protected isStorageBalanceZero = isStorageBalanceZero; protected isStorageBalanceLow = isStorageBalanceLow; protected isDataAvailable = isDataAvailable; - protected isEndkeyReached = isEndkeyReached; // timeouts protected waitForAuthorization = waitForAuthorization; diff --git a/common/protocol/src/methods/checks/isEndkeyReached.ts b/common/protocol/src/methods/checks/isEndkeyReached.ts deleted file mode 100644 index 70b281d0..00000000 --- a/common/protocol/src/methods/checks/isEndkeyReached.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { standardizeError, Validator } from "../.."; - -/** - isEndkeyReached checks if the end key is reached. - */ -export async function isEndkeyReached( - this: Validator, - toKey: string -): Promise { - try { - // No further checks are needed if the current_key is empty - if (!this.pool.data!.current_key) { - return false; - } - - // Check if the current_key or to_key equals the end_key - if ( - this.pool.data!.current_key === this.pool.data!.end_key || - toKey === this.pool.data!.end_key - ) { - this.logger.info(`End key reached.`); - return true; - } - return false; - } catch (err) { - this.logger.debug(standardizeError(err)); - - return false; - } -} diff --git a/common/protocol/src/methods/index.ts b/common/protocol/src/methods/index.ts index 41a8fbd5..f051c352 100644 --- a/common/protocol/src/methods/index.ts +++ b/common/protocol/src/methods/index.ts @@ -13,7 +13,6 @@ export * from "./checks/isValidVersion"; export * from "./checks/isStorageBalanceLow"; export * from "./checks/isStorageBalanceZero"; export * from "./checks/isDataAvailable"; -export * from "./checks/isEndkeyReached"; // timeouts export * from "./timeouts/waitForAuthorization"; diff --git a/common/protocol/src/methods/main/runCache.ts b/common/protocol/src/methods/main/runCache.ts index e73e262c..e3fed0e4 100644 --- a/common/protocol/src/methods/main/runCache.ts +++ b/common/protocol/src/methods/main/runCache.ts @@ -119,6 +119,15 @@ export async function runCache(this: Validator): Promise { this.m.cache_index_tail.set(Math.max(0, currentIndex - 1)); for (let i = currentIndex; i < targetIndex; i++) { + // if the end key is not empty and we have reached the end key of the pool + // we do not sync past this key + if (this.pool.data?.end_key && this.pool.data.end_key === key) { + this.logger.info( + `Reached pool end key "${key}", the node will not continue collecting data past this key.` + ); + break; + } + // check if data item was already collected. If it was // already collected we don't need to retrieve it again this.logger.debug(`this.cacheProvider.exists(${i.toString()})`); @@ -223,11 +232,6 @@ export async function runCache(this: Validator): Promise { key = nextKey; } - // indicate that current caching round is done - this.logger.debug( - `Finished caching from index ${currentIndex} to ${targetIndex}. Waiting for next round ...` - ); - // wait until a new bundle proposal is available. We don't need // to sync the pool here because the pool state already gets // synced in the other main function "runNode" so we only listen diff --git a/common/protocol/src/methods/main/runNode.ts b/common/protocol/src/methods/main/runNode.ts index 3b85718a..d9b8560b 100644 --- a/common/protocol/src/methods/main/runNode.ts +++ b/common/protocol/src/methods/main/runNode.ts @@ -1,3 +1,4 @@ +import { PoolStatus } from "@kyvejs/types/lcd/kyve/pool/v1beta1/pool"; import { Validator } from "../.."; import { IDLE_TIME, sleep } from "../../utils"; @@ -33,6 +34,11 @@ export async function runNode(this: Validator): Promise { process.exit(1); } + if (this.pool.status === PoolStatus.POOL_STATUS_END_KEY_REACHED) { + this.logger.info(`Reached pool end key. Shutting down node ...`); + process.exit(0); + } + // log warnings if storage provider balance is low await this.isStorageBalanceLow(); diff --git a/common/protocol/src/methods/upload/createBundleProposal.ts b/common/protocol/src/methods/upload/createBundleProposal.ts index bdc62a4a..b6747ede 100644 --- a/common/protocol/src/methods/upload/createBundleProposal.ts +++ b/common/protocol/src/methods/upload/createBundleProposal.ts @@ -90,13 +90,6 @@ export async function createBundleProposal(this: Validator): Promise { // as to_key const toKey = bundleProposal.at(-1)?.key ?? ""; - if (await this.isEndkeyReached(toKey)) { - this.logger.info(`End key reached. Skipping Uploader Role ...`); - - await this.skipUploaderRole(fromIndex); - return; - } - // get the last value of the bundle proposal and format // it so it can be included in the bundle proposal and // saved on chain diff --git a/common/protocol/src/methods/validate/validateBundleProposal.ts b/common/protocol/src/methods/validate/validateBundleProposal.ts index 09495d7c..c2710b0d 100644 --- a/common/protocol/src/methods/validate/validateBundleProposal.ts +++ b/common/protocol/src/methods/validate/validateBundleProposal.ts @@ -234,6 +234,19 @@ export async function validateBundleProposal( // validate each data item in bundle with custom runtime validation for (let i = 0; i < proposedBundle.length; i++) { if (valid) { + // if the pool has an end key and we find out that a data item + // has the end key and it is not the last data item in the bundle + // we consider the bundle invalid + if (this.pool.data?.end_key) { + if ( + i < proposedBundle.length - 1 && + proposedBundle[i].key === this.pool.data?.end_key + ) { + valid = false; + break; + } + } + this.logger.debug( `this.runtime.validateDataItem($THIS, $PROPOSED_DATA_ITEM, $VALIDATION_DATA_ITEM)` ); diff --git a/common/protocol/test/cache.test.ts b/common/protocol/test/cache.test.ts index 3420a3eb..788dedb4 100644 --- a/common/protocol/test/cache.test.ts +++ b/common/protocol/test/cache.test.ts @@ -29,9 +29,7 @@ TEST CASES - cache tests * start caching from a pool where transformDataItem fails * start caching from a pool where nextKey fails once * start caching from a pool where cache methods fail -* TODO: test with pool config that has no source object -* TODO: test with pool config that has zero sources -* TODO: start caching from a pool where node has not cached anything yet +* start caching from a pool which has an endKey */ @@ -249,8 +247,6 @@ describe("cache tests", () => { // assert that only one round ran expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1); - - // TODO: assert timeouts }); test("start caching from a pool which has a bundle proposal ongoing", async () => { @@ -423,8 +419,6 @@ describe("cache tests", () => { // assert that only one round ran expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1); - - // TODO: assert timeouts }); test("continue caching from a pool which has a bundle proposal ongoing", async () => { @@ -602,8 +596,6 @@ describe("cache tests", () => { // assert that only one round ran expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1); - - // TODO: assert timeouts }); test("start caching from a pool where last bundle proposal was dropped", async () => { @@ -767,8 +759,6 @@ describe("cache tests", () => { // assert that only one round ran expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1); - - // TODO: assert timeouts }); test("start caching from a pool where getNextDataItem fails once", async () => { @@ -931,8 +921,6 @@ describe("cache tests", () => { // assert that only one round ran expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1); - - // TODO: assert timeouts }); test("start caching from a pool where getNextDataItem fails multiple times", async () => { @@ -1175,8 +1163,6 @@ describe("cache tests", () => { // assert that only one round ran expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1); - - // TODO: assert timeouts }); test("start caching from a pool where transformDataItem fails", async () => { @@ -1407,8 +1393,6 @@ describe("cache tests", () => { // assert that only one round ran expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1); - - // TODO: assert timeouts }); test.skip("start caching from a pool where nextKey fails once", async () => { @@ -1527,8 +1511,6 @@ describe("cache tests", () => { // assert that only one round ran expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(0); - - // TODO: assert timeouts }); test("start caching from a pool where cache methods fail", async () => { @@ -1665,7 +1647,173 @@ describe("cache tests", () => { // assert that only one round ran expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(0); + }); + + test("start caching from a pool which has an endKey", async () => { + // ARRANGE + v.pool = { + ...genesis_pool, + data: { + ...genesis_pool.data, + current_key: "99", + current_index: "100", + end_key: "152", + }, + bundle_proposal: { + ...genesis_pool.bundle_proposal, + storage_id: "test_storage_id", + uploader: "test_staker", + next_uploader: "test_staker", + data_size: "123456789", + data_hash: "test_bundle_hash", + bundle_size: "50", + from_key: "100", + to_key: "149", + bundle_summary: "test_summary", + updated_at: "0", + voters_valid: ["test_staker"], + }, + } as any; + + // ACT + await runCache.call(v); + + // ASSERT + const txs = v["client"][0].kyve.bundles.v1beta1; + const queries = v["lcd"][0].kyve.query.v1beta1; + const cacheProvider = v["cacheProvider"]; + const runtime = v["runtime"]; + + // ======================== + // ASSERT CLIENT INTERFACES + // ======================== + + expect(txs.claimUploaderRole).toHaveBeenCalledTimes(0); + + expect(txs.voteBundleProposal).toHaveBeenCalledTimes(0); + + expect(txs.submitBundleProposal).toHaveBeenCalledTimes(0); + + expect(txs.skipUploaderRole).toHaveBeenCalledTimes(0); + + // ===================== + // ASSERT LCD INTERFACES + // ===================== + + expect(queries.canVote).toHaveBeenCalledTimes(0); + + expect(queries.canPropose).toHaveBeenCalledTimes(0); + + // ========================= + // ASSERT STORAGE INTERFACES + // ========================= + + expect(storageProvider.saveBundle).toHaveBeenCalledTimes(0); + + expect(storageProvider.retrieveBundle).toHaveBeenCalledTimes(0); + + // ======================= + // ASSERT CACHE INTERFACES + // ======================= + + // we use 50 + 3 here because the current bundle is 50 items big + // and because of the end key we only index the next 3 items and stop + // afterwards + expect(cacheProvider.put).toHaveBeenCalledTimes(50 + 3); + + for (let n = 0; n < 50 + 3; n++) { + const item = { + key: (n + parseInt(genesis_pool.data.max_bundle_size)).toString(), + value: `${ + n + parseInt(genesis_pool.data.max_bundle_size) + }-value-transform`, + }; + expect(cacheProvider.put).toHaveBeenNthCalledWith( + n + 1, + (n + parseInt(genesis_pool.data.max_bundle_size)).toString(), + item + ); + } + + expect(cacheProvider.get).toHaveBeenCalledTimes(0); + + expect(cacheProvider.exists).toHaveBeenCalledTimes( + parseInt(genesis_pool.data.max_bundle_size) + 50 + 3 + ); + + for (let n = 0; n < parseInt(genesis_pool.data.max_bundle_size); n++) { + expect(cacheProvider.exists).toHaveBeenNthCalledWith(n + 1, n.toString()); + } + + for ( + let n = parseInt(genesis_pool.data.max_bundle_size); + n < parseInt(genesis_pool.data.max_bundle_size) + 50 + 3; + n++ + ) { + expect(cacheProvider.exists).toHaveBeenNthCalledWith(n + 1, n.toString()); + } + + expect(cacheProvider.del).toHaveBeenCalledTimes(0); + + expect(cacheProvider.drop).toHaveBeenCalledTimes(0); + + // ============================= + // ASSERT COMPRESSION INTERFACES + // ============================= + + expect(compression.compress).toHaveBeenCalledTimes(0); + + expect(compression.decompress).toHaveBeenCalledTimes(0); + + // ========================= + // ASSERT RUNTIME INTERFACES + // ========================= - // TODO: assert timeouts + expect(runtime.getDataItem).toHaveBeenCalledTimes(50 + 3); + + for (let n = 0; n < 50 + 3; n++) { + expect(runtime.getDataItem).toHaveBeenNthCalledWith( + n + 1, + v, + (n + parseInt(genesis_pool.data.max_bundle_size)).toString() + ); + } + + expect(runtime.transformDataItem).toHaveBeenCalledTimes(50 + 3); + + for (let n = 0; n < 50 + 3; n++) { + const item = { + key: (n + parseInt(genesis_pool.data.max_bundle_size)).toString(), + value: `${n + parseInt(genesis_pool.data.max_bundle_size)}-value`, + }; + expect(runtime.transformDataItem).toHaveBeenNthCalledWith( + n + 1, + expect.any(Validator), + item + ); + } + + expect(runtime.validateDataItem).toHaveBeenCalledTimes(0); + + expect(runtime.nextKey).toHaveBeenCalledTimes(50 + 3); + + // here we subtract the key - 1 because we start using the + // current key + for (let n = 0; n < 50 + 3; n++) { + expect(runtime.nextKey).toHaveBeenNthCalledWith( + n + 1, + expect.any(Validator), + (n + parseInt(genesis_pool.data.max_bundle_size) - 1).toString() + ); + } + + expect(runtime.summarizeDataBundle).toHaveBeenCalledTimes(0); + + // ======================== + // ASSERT NODEJS INTERFACES + // ======================== + + // assert that only one round ran + expect(v["waitForCacheContinuation"]).toHaveBeenCalledTimes(1); }); }); diff --git a/common/protocol/test/checks/is_endkey_reached.test.ts b/common/protocol/test/checks/is_endkey_reached.test.ts deleted file mode 100644 index 5385129e..00000000 --- a/common/protocol/test/checks/is_endkey_reached.test.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { register } from "prom-client"; -import { Logger } from "tslog"; - -import { Validator } from "../../src"; -import { isEndkeyReached, setupMetrics } from "../../src/methods"; -import { genesis_pool } from "../mocks/constants"; -import { TestRuntime } from "../mocks/runtime.mock"; - -/* - -TEST CASES - isEndkeyReached - -* assert isEndkeyReached with empty current_key -* assert isEndkeyReached with end_key == current_key -* assert isEndkeyReached with toKey == end_key -* assert isEndkeyReached is not reached -*/ - -describe("isEndkeyReached", () => { - let v: Validator; - - beforeEach(() => { - v = new Validator(new TestRuntime()); - - // mock logger - v.logger = new Logger(); - - v.logger.info = jest.fn(); - v.logger.debug = jest.fn(); - v.logger.warn = jest.fn(); - v.logger.error = jest.fn(); - v.logger.fatal = jest.fn(); - - v.pool = { - ...genesis_pool, - } as any; - - setupMetrics.call(v); - }); - - afterEach(() => { - // reset prometheus - register.clear(); - }); - - test("assert isEndkeyReached with empty current_key", async () => { - // ARRANGE - v.pool.data!.current_key = ""; - v.pool.data!.end_key = "0"; - - // ACT - const result = await isEndkeyReached.call(v, ""); - - // ASSERT - expect(result).toBeFalsy(); - }); - - test("assert isEndkeyReached with end_key == current_key", async () => { - // ARRANGE - v.pool.data!.current_key = "0"; - v.pool.data!.end_key = "0"; - - // ACT - const result = await isEndkeyReached.call(v, ""); - - // ASSERT - expect(result).toBeTruthy(); - }); - - test("assert isEndkeyReached with toKey == end_key", async () => { - // ARRANGE - v.pool.data!.current_key = "0"; - v.pool.data!.end_key = "1"; - - // ACT - const result = await isEndkeyReached.call(v, "1"); - - // ASSERT - expect(result).toBeTruthy(); - }); - - test("assert isEndkeyReached is not reached", async () => { - // ARRANGE - v.pool.data!.current_key = "0"; - v.pool.data!.end_key = "1"; - - // ACT - const result = await isEndkeyReached.call(v, "0"); - - // ASSERT - expect(result).toBeFalsy(); - }); -}); diff --git a/common/protocol/test/vote_invalid.test.ts b/common/protocol/test/vote_invalid.test.ts index 134230e2..077afb8b 100644 --- a/common/protocol/test/vote_invalid.test.ts +++ b/common/protocol/test/vote_invalid.test.ts @@ -36,6 +36,7 @@ TEST CASES - invalid votes tests * vote invalid but bundle from storage provider could not be loaded in the first try * try to vote invalid where voteBundleProposal fails * vote invalid because bundle is empty +* vote invalid because there were items after the end key data item */ @@ -2043,4 +2044,144 @@ describe("invalid votes tests", () => { // assert that only one round ran expect(v["waitForNextBundleProposal"]).toHaveBeenCalledTimes(1); }); + + test("vote invalid because there were items after the end key data item ", async () => { + const bundle = [ + { key: "test_key_1", value: "test_value_1" }, + { key: "test_key_2", value: "test_value_2" }, + ]; + + const bundleBytes = bundleToBytes(bundle); + const compressedBundle = bundleBytes; // no compression + const dataSize = compressedBundle.byteLength.toString(); + const dataHash = sha256(bundleBytes); + + v["syncPoolState"] = jest.fn().mockImplementation(() => { + v.pool = { + ...genesis_pool, + data: { + ...genesis_pool.data, + end_key: "test_key_1", + }, + bundle_proposal: { + ...genesis_pool.bundle_proposal, + storage_id: "another_test_storage_id", + uploader: "another_test_staker", + next_uploader: "another_test_staker", + data_size: dataSize, + data_hash: dataHash, + bundle_size: "2", + from_key: "test_key_1", + to_key: "test_key_2", + bundle_summary: JSON.stringify(bundle), + updated_at: "0", + voters_valid: ["another_test_staker"], + }, + } as any; + }); + + v["cacheProvider"].get = jest + .fn() + .mockResolvedValueOnce({ + key: "test_key_1", + value: "test_value_1", + }) + .mockResolvedValueOnce({ + key: "test_key_2", + value: "test_value_2", + }); + + // ACT + await runNode.call(v); + + // ASSERT + const txs = v["client"][0].kyve.bundles.v1beta1; + const queries = v["lcd"][0].kyve.query.v1beta1; + const cacheProvider = v["cacheProvider"]; + const runtime = v["runtime"]; + + // ======================== + // ASSERT CLIENT INTERFACES + // ======================== + + expect(txs.claimUploaderRole).toHaveBeenCalledTimes(0); + + expect(txs.voteBundleProposal).toHaveBeenCalledTimes(1); + expect(txs.voteBundleProposal).toHaveBeenLastCalledWith( + { + staker: "test_staker", + pool_id: "0", + storage_id: "another_test_storage_id", + vote: VoteType.VOTE_TYPE_INVALID, + }, + { + fee: 1.6, + } + ); + + expect(txs.submitBundleProposal).toHaveBeenCalledTimes(0); + + expect(txs.skipUploaderRole).toHaveBeenCalledTimes(0); + + // ===================== + // ASSERT LCD INTERFACES + // ===================== + + expect(queries.canVote).toHaveBeenCalledTimes(1); + expect(queries.canVote).toHaveBeenLastCalledWith({ + staker: "test_staker", + pool_id: "0", + voter: "test_valaddress", + storage_id: "another_test_storage_id", + }); + + expect(queries.canPropose).toHaveBeenCalledTimes(0); + + // ========================= + // ASSERT STORAGE INTERFACES + // ========================= + + expect(storageProvider.saveBundle).toHaveBeenCalledTimes(0); + + expect(storageProvider.retrieveBundle).toHaveBeenCalledTimes(1); + expect(storageProvider.retrieveBundle).toHaveBeenLastCalledWith( + "another_test_storage_id", + (120 - 20) * 1000 + ); + + // ======================= + // ASSERT CACHE INTERFACES + // ======================= + + expect(cacheProvider.get).toHaveBeenCalledTimes(2); + expect(cacheProvider.get).toHaveBeenNthCalledWith(1, "0"); + expect(cacheProvider.get).toHaveBeenNthCalledWith(2, "1"); + + // ============================= + // ASSERT COMPRESSION INTERFACES + // ============================= + + expect(compression.compress).toHaveBeenCalledTimes(0); + + expect(compression.decompress).toHaveBeenCalledTimes(1); + expect(compression.decompress).toHaveBeenLastCalledWith(compressedBundle); + + // ============================= + // ASSERT INTEGRATION INTERFACES + // ============================= + + expect(runtime.summarizeDataBundle).toHaveBeenCalledTimes(0); + + expect(runtime.validateDataItem).toHaveBeenCalledTimes(0); + + // ======================== + // ASSERT NODEJS INTERFACES + // ======================== + + // assert that debug bundle was saved + expect(v["archiveDebugBundle"]).toHaveBeenCalledTimes(1); + + // assert that only one round ran + expect(v["waitForNextBundleProposal"]).toHaveBeenCalledTimes(1); + }); }); diff --git a/common/protocol/test/vote_valid.test.ts b/common/protocol/test/vote_valid.test.ts index 0915cb0d..d740a636 100644 --- a/common/protocol/test/vote_valid.test.ts +++ b/common/protocol/test/vote_valid.test.ts @@ -30,6 +30,8 @@ TEST CASES - vote valid tests * try to vote valid after validator has voted invalid before * try to vote valid after validator has voted valid before * try to vote valid where voteBundleProposal fails +* vote valid with the bundle being before the end key +* vote valid with the last data item containing the end key */ @@ -1207,4 +1209,312 @@ describe("vote valid tests", () => { // assert that only one round ran expect(v["waitForNextBundleProposal"]).toHaveBeenCalledTimes(1); }); + + test("vote valid with the bundle being before the end key", async () => { + // ARRANGE + const bundle = [ + { key: "test_key_1", value: "test_value_1" }, + { key: "test_key_2", value: "test_value_2" }, + ]; + + const bundleBytes = bundleToBytes(bundle); + const compressedBundle = bundleBytes; // no compression + const dataSize = compressedBundle.byteLength.toString(); + const dataHash = sha256(bundleBytes); + + v["syncPoolState"] = jest.fn().mockImplementation(() => { + v.pool = { + ...genesis_pool, + data: { + ...genesis_pool.data, + end_key: "test_key_6", + }, + bundle_proposal: { + ...genesis_pool.bundle_proposal, + storage_id: "another_test_storage_id", + uploader: "another_test_staker", + next_uploader: "another_test_staker", + data_size: dataSize, + data_hash: dataHash, + bundle_size: "2", + from_key: "test_key_1", + to_key: "test_key_2", + bundle_summary: JSON.stringify(bundle), + updated_at: "0", + voters_valid: ["another_test_staker"], + }, + } as any; + }); + + v["cacheProvider"].get = jest + .fn() + .mockResolvedValueOnce({ + key: "test_key_1", + value: "test_value_1", + }) + .mockResolvedValueOnce({ + key: "test_key_2", + value: "test_value_2", + }); + + // ACT + await runNode.call(v); + + // ASSERT + const txs = v["client"][0].kyve.bundles.v1beta1; + const queries = v["lcd"][0].kyve.query.v1beta1; + const cacheProvider = v["cacheProvider"]; + const runtime = v["runtime"]; + + // ======================== + // ASSERT CLIENT INTERFACES + // ======================== + + expect(txs.claimUploaderRole).toHaveBeenCalledTimes(0); + + expect(txs.voteBundleProposal).toHaveBeenCalledTimes(1); + expect(txs.voteBundleProposal).toHaveBeenLastCalledWith( + { + staker: "test_staker", + pool_id: "0", + storage_id: "another_test_storage_id", + vote: VoteType.VOTE_TYPE_VALID, + }, + { + fee: 1.6, + } + ); + + expect(txs.submitBundleProposal).toHaveBeenCalledTimes(0); + + expect(txs.skipUploaderRole).toHaveBeenCalledTimes(0); + + // ===================== + // ASSERT LCD INTERFACES + // ===================== + + expect(queries.canVote).toHaveBeenCalledTimes(1); + expect(queries.canVote).toHaveBeenLastCalledWith({ + staker: "test_staker", + pool_id: "0", + voter: "test_valaddress", + storage_id: "another_test_storage_id", + }); + + expect(queries.canPropose).toHaveBeenCalledTimes(0); + + // ========================= + // ASSERT STORAGE INTERFACES + // ========================= + + expect(storageProvider.saveBundle).toHaveBeenCalledTimes(0); + + expect(storageProvider.retrieveBundle).toHaveBeenCalledTimes(1); + expect(storageProvider.retrieveBundle).toHaveBeenLastCalledWith( + "another_test_storage_id", + (120 - 20) * 1000 + ); + + // ======================= + // ASSERT CACHE INTERFACES + // ======================= + + expect(cacheProvider.get).toHaveBeenCalledTimes(2); + expect(cacheProvider.get).toHaveBeenNthCalledWith(1, "0"); + expect(cacheProvider.get).toHaveBeenNthCalledWith(2, "1"); + + // ============================= + // ASSERT COMPRESSION INTERFACES + // ============================= + + expect(compression.compress).toHaveBeenCalledTimes(0); + + expect(compression.decompress).toHaveBeenCalledTimes(1); + expect(compression.decompress).toHaveBeenLastCalledWith(compressedBundle); + + // ============================= + // ASSERT INTEGRATION INTERFACES + // ============================= + + expect(runtime.summarizeDataBundle).toHaveBeenCalledTimes(1); + expect(runtime.summarizeDataBundle).toHaveBeenLastCalledWith( + expect.any(Validator), + bundle + ); + + expect(runtime.validateDataItem).toHaveBeenCalledTimes(bundle.length); + + for (let i = 0; i < bundle.length; i++) { + expect(runtime.validateDataItem).toHaveBeenNthCalledWith( + i + 1, + expect.any(Validator), + standardizeJSON(bundle[i]), + standardizeJSON(bundle[i]) + ); + } + + // ======================== + // ASSERT NODEJS INTERFACES + // ======================== + + // assert that debug bundle was saved + expect(v["archiveDebugBundle"]).toHaveBeenCalledTimes(0); + + // assert that only one round ran + expect(v["waitForNextBundleProposal"]).toHaveBeenCalledTimes(1); + }); + + test("vote valid with the last data item containing the end key", async () => { + // ARRANGE + const bundle = [ + { key: "test_key_1", value: "test_value_1" }, + { key: "test_key_2", value: "test_value_2" }, + ]; + + const bundleBytes = bundleToBytes(bundle); + const compressedBundle = bundleBytes; // no compression + const dataSize = compressedBundle.byteLength.toString(); + const dataHash = sha256(bundleBytes); + + v["syncPoolState"] = jest.fn().mockImplementation(() => { + v.pool = { + ...genesis_pool, + data: { + ...genesis_pool.data, + end_key: "test_key_2", + }, + bundle_proposal: { + ...genesis_pool.bundle_proposal, + storage_id: "another_test_storage_id", + uploader: "another_test_staker", + next_uploader: "another_test_staker", + data_size: dataSize, + data_hash: dataHash, + bundle_size: "2", + from_key: "test_key_1", + to_key: "test_key_2", + bundle_summary: JSON.stringify(bundle), + updated_at: "0", + voters_valid: ["another_test_staker"], + }, + } as any; + }); + + v["cacheProvider"].get = jest + .fn() + .mockResolvedValueOnce({ + key: "test_key_1", + value: "test_value_1", + }) + .mockResolvedValueOnce({ + key: "test_key_2", + value: "test_value_2", + }); + + // ACT + await runNode.call(v); + + // ASSERT + const txs = v["client"][0].kyve.bundles.v1beta1; + const queries = v["lcd"][0].kyve.query.v1beta1; + const cacheProvider = v["cacheProvider"]; + const runtime = v["runtime"]; + + // ======================== + // ASSERT CLIENT INTERFACES + // ======================== + + expect(txs.claimUploaderRole).toHaveBeenCalledTimes(0); + + expect(txs.voteBundleProposal).toHaveBeenCalledTimes(1); + expect(txs.voteBundleProposal).toHaveBeenLastCalledWith( + { + staker: "test_staker", + pool_id: "0", + storage_id: "another_test_storage_id", + vote: VoteType.VOTE_TYPE_VALID, + }, + { + fee: 1.6, + } + ); + + expect(txs.submitBundleProposal).toHaveBeenCalledTimes(0); + + expect(txs.skipUploaderRole).toHaveBeenCalledTimes(0); + + // ===================== + // ASSERT LCD INTERFACES + // ===================== + + expect(queries.canVote).toHaveBeenCalledTimes(1); + expect(queries.canVote).toHaveBeenLastCalledWith({ + staker: "test_staker", + pool_id: "0", + voter: "test_valaddress", + storage_id: "another_test_storage_id", + }); + + expect(queries.canPropose).toHaveBeenCalledTimes(0); + + // ========================= + // ASSERT STORAGE INTERFACES + // ========================= + + expect(storageProvider.saveBundle).toHaveBeenCalledTimes(0); + + expect(storageProvider.retrieveBundle).toHaveBeenCalledTimes(1); + expect(storageProvider.retrieveBundle).toHaveBeenLastCalledWith( + "another_test_storage_id", + (120 - 20) * 1000 + ); + + // ======================= + // ASSERT CACHE INTERFACES + // ======================= + + expect(cacheProvider.get).toHaveBeenCalledTimes(2); + expect(cacheProvider.get).toHaveBeenNthCalledWith(1, "0"); + expect(cacheProvider.get).toHaveBeenNthCalledWith(2, "1"); + + // ============================= + // ASSERT COMPRESSION INTERFACES + // ============================= + + expect(compression.compress).toHaveBeenCalledTimes(0); + + expect(compression.decompress).toHaveBeenCalledTimes(1); + expect(compression.decompress).toHaveBeenLastCalledWith(compressedBundle); + + // ============================= + // ASSERT INTEGRATION INTERFACES + // ============================= + + expect(runtime.summarizeDataBundle).toHaveBeenCalledTimes(1); + expect(runtime.summarizeDataBundle).toHaveBeenLastCalledWith( + expect.any(Validator), + bundle + ); + + expect(runtime.validateDataItem).toHaveBeenCalledTimes(bundle.length); + + for (let i = 0; i < bundle.length; i++) { + expect(runtime.validateDataItem).toHaveBeenNthCalledWith( + i + 1, + expect.any(Validator), + standardizeJSON(bundle[i]), + standardizeJSON(bundle[i]) + ); + } + + // ======================== + // ASSERT NODEJS INTERFACES + // ======================== + + // assert that debug bundle was saved + expect(v["archiveDebugBundle"]).toHaveBeenCalledTimes(0); + + // assert that only one round ran + expect(v["waitForNextBundleProposal"]).toHaveBeenCalledTimes(1); + }); });