Skip to content

Commit

Permalink
feat: add l1_sent two direction sync
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisloh95 committed Jun 14, 2023
1 parent 9f82885 commit 4ad3dd4
Showing 1 changed file with 166 additions and 39 deletions.
205 changes: 166 additions & 39 deletions data-sync-service/src/schedule/tasks.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { L2IngestionService } from '../l2Ingestion/l2Ingestion.service';
import { ConfigService } from '@nestjs/config';

const L1_SENT = 'l1_sent_block_number';
const L1_SENT_BACKTRACK = 'l1_sent_backtrack_block_number';
const L1_SENT_BACKTRACK_START = 'l1_sent_backtrack_start_block_number';
const L1_RELAYED = 'l1_relayed_block_number';
const L2_SENT = 'l2_sent_block_number';
const L2_RELAYED = 'l2_relayed_block_number';
Expand All @@ -22,25 +24,37 @@ export class TasksService {
private readonly l1IngestionService: L1IngestionService,
private readonly l2IngestionService: L2IngestionService,
@Inject(CACHE_MANAGER) private cacheManager: Cache,
private schedulerRegistry: SchedulerRegistry
private schedulerRegistry: SchedulerRegistry,
) {
this.initCache();
const fixedL2ToL1TokenAddress0x000Bug = setInterval(async () => {
try {
const result = await this.l2IngestionService.fixedL2ToL1TokenAddress0x000Bug();
const result =
await this.l2IngestionService.fixedL2ToL1TokenAddress0x000Bug();
if (result.length <= 0) {
console.log('deleteInterval fixedL2ToL1TokenAddress0x000Bug');
this.schedulerRegistry.deleteInterval('fixedL2ToL1TokenAddress0x000Bug');
this.schedulerRegistry.deleteInterval(
'fixedL2ToL1TokenAddress0x000Bug',
);
}
} catch (error) {
this.logger.error(`error fixedL2ToL1TokenAddress0x000Bug: ${error}`);
}
}, 1000);
this.schedulerRegistry.addInterval('fixedL2ToL1TokenAddress0x000Bug', fixedL2ToL1TokenAddress0x000Bug);
this.schedulerRegistry.addInterval(
'fixedL2ToL1TokenAddress0x000Bug',
fixedL2ToL1TokenAddress0x000Bug,
);
}
private readonly logger = new Logger(TasksService.name);
async initCache() {
let l1_sent_block_number = await this.cacheManager.get(L1_SENT);
let l1_sent_backtrack_block_number = await this.cacheManager.get(
L1_SENT_BACKTRACK,
);
let l1_sent_backtrack_start_block_number = await this.cacheManager.get(
L1_SENT_BACKTRACK_START,
);
let l1_relayed_block_number = await this.cacheManager.get(L1_RELAYED);
let l2_sent_block_number = await this.cacheManager.get(L2_SENT);
let l2_relayed_block_number = await this.cacheManager.get(L2_RELAYED);
Expand All @@ -52,11 +66,18 @@ export class TasksService {
(await this.l1IngestionService.getSentEventsBlockNumber()) ||
this.configService.get('L1_START_BLOCK_NUMBER');
}
if (!l1_sent_backtrack_block_number) {
l1_sent_backtrack_block_number =
(await this.l1IngestionService.getCurrentBlockNumber()) ||
this.configService.get('L1_START_BACKTRACK_BLOCK_NUMBER');
l1_sent_backtrack_start_block_number = l1_sent_backtrack_block_number;
}
if (!l1_relayed_block_number) {
l1_relayed_block_number =
(await this.l1IngestionService.getRelayedEventsBlockNumber()) ||
this.configService.get('L1_START_BLOCK_NUMBER');
}

if (!l2_sent_block_number) {
l2_sent_block_number =
(await this.l2IngestionService.getSentEventsBlockNumber()) ||
Expand All @@ -79,12 +100,25 @@ export class TasksService {
}
if (!da_batch_index) {
da_batch_index =
(await this.l1IngestionService.getLastBatchIndex() + 1) ||
1;
(await this.l1IngestionService.getLastBatchIndex()) + 1 || 1;
}
await this.cacheManager.set(L1_SENT, Number(l1_sent_block_number), {
ttl: 0,
});
await this.cacheManager.set(
L1_SENT_BACKTRACK,
Number(l1_sent_backtrack_block_number),
{
ttl: 0,
},
);
await this.cacheManager.set(
L1_SENT_BACKTRACK_START,
Number(l1_sent_backtrack_start_block_number),
{
ttl: 0,
},
);
await this.cacheManager.set(L1_RELAYED, Number(l1_relayed_block_number), {
ttl: 0,
});
Expand All @@ -105,13 +139,20 @@ export class TasksService {
});
console.log('================end init cache================');
// TODO (Jayce) state batch missed data sync script
this.miss_data_script_start(9006135)
this.miss_data_script_start(9006135);
}
@Interval(2000)
async l1_sent() {
let end = 0;
let reach_backtrack = false;
const currentBlockNumber =
await this.l1IngestionService.getCurrentBlockNumber();
const backtrackBlockNumber = Number(
await this.cacheManager.get(L1_SENT_BACKTRACK),
);
const backtractStartBlock = Number(
await this.cacheManager.get(L1_SENT_BACKTRACK_START),
);
console.log('l1 sent currentBlockNumber: ', currentBlockNumber);
const start = Number(await this.cacheManager.get(L1_SENT));
if (currentBlockNumber - start > SYNC_STEP) {
Expand All @@ -122,22 +163,92 @@ export class TasksService {
? start - SYNC_STEP
: currentBlockNumber;
}

if (
backtrackBlockNumber > start &&
backtrackBlockNumber - start < SYNC_STEP
) {
end = backtrackBlockNumber - 1;
reach_backtrack = true;
}

if (currentBlockNumber > start + 1) {
const result = await this.l1IngestionService.createSentEvents(
start + 1,
end,
);
const insertData = !result || result.length <= 0 ? [] : result[0].identifiers || []
const insertData =
!result || result.length <= 0 ? [] : result[0].identifiers || [];
this.logger.log(
`sync [${insertData.length}] l1_sent_message_events from block [${start}] to [${end}]`,
);
await this.cacheManager.set(L1_SENT, end, { ttl: 0 });

if (reach_backtrack) {

This comment has been minimized.

Copy link
@shellteo

shellteo Jun 15, 2023

reach_backtrack then need to stop the Job

await this.cacheManager.set(L1_SENT, backtractStartBlock, { ttl: 0 });
} else {
await this.cacheManager.set(L1_SENT, end, { ttl: 0 });
}
} else {
this.logger.log(
`sync l1_sent finished and latest block number is: ${currentBlockNumber}`,
);
}
}
@Interval(3000)
async l1_sent_backtrack() {

This comment has been minimized.

Copy link
@shellteo

shellteo Jun 15, 2023

How does this method update to the latest block

This comment has been minimized.

Copy link
@dennisloh95

dennisloh95 Jun 15, 2023

Author Collaborator

This method use current latest block to sync to the l1_sent block.
For example l1_sent_backtrack sync from block 100 to 90, 80, 70 etc. When the sync block reaching the l1_sent sync block it will stop.
Then from l1_sent sync from 0 to 10, 20 ,30, and when it reach l1_sent_backtrack sync block, it will then start to use block 100 and listen to the latest block from chain.

let end = 0;
let start = 0;
const backtrackBlockNumber = Number(
await this.cacheManager.get(L1_SENT_BACKTRACK),
);
if (!backtrackBlockNumber) {
this.logger.log(`backtrackBlockNumber not found`);
return;
}
const currentL1SentBlockNumber = Number(
await this.cacheManager.get(L1_SENT),
);
console.log(
'l1 backtrackBlockNumber: ',
backtrackBlockNumber,
'l1 current sent block: ',
currentL1SentBlockNumber,
);

if (
backtrackBlockNumber - currentL1SentBlockNumber < SYNC_STEP * 3 ||
currentL1SentBlockNumber > backtrackBlockNumber
) {
// triple SYNC_STEP to prevent execute during transaction in progress
this.logger.log(`sync l1_sent_backtrack finished`);
return;
}
end = backtrackBlockNumber;

// above have anotther safe guard
if (end - currentL1SentBlockNumber > SYNC_STEP) {
start = end - SYNC_STEP;
} else {
start =
end - SYNC_STEP > currentL1SentBlockNumber
? end - SYNC_STEP
: currentL1SentBlockNumber;
}
if (end > start + 1) {
const result = await this.l1IngestionService.createSentEvents(
start + 1,
end,
);
const insertData =
!result || result.length <= 0 ? [] : result[0].identifiers || [];
this.logger.log(
`sync [${insertData.length}] l1_sent_message_events_backtrack from block [${start}] to [${end}]`,
);
await this.cacheManager.set(L1_SENT_BACKTRACK, start, { ttl: 0 });
} else {
this.logger.log(`sync l1_sent_backtrack finished`);
}
}
@Interval(2000)
async l1_relayed() {
let end = 0;
Expand All @@ -158,7 +269,8 @@ export class TasksService {
start + 1,
end,
);
const insertData = !result || result.length <= 0 ? [] : result[0].identifiers || []
const insertData =
!result || result.length <= 0 ? [] : result[0].identifiers || [];
this.logger.log(
`sync [${insertData.length}] l1_relayed_message_events from block [${start}] to [${end}]`,
);
Expand Down Expand Up @@ -189,7 +301,8 @@ export class TasksService {
start + 1,
end,
);
const insertData = !result || result.length <= 0 ? [] : result[0].identifiers || []
const insertData =
!result || result.length <= 0 ? [] : result[0].identifiers || [];
this.logger.log(
`sync [${insertData.length}] l2_sent_message_events from block [${start}] to [${end}]`,
);
Expand Down Expand Up @@ -220,7 +333,8 @@ export class TasksService {
start + 1,
end,
);
const insertData = !result || result.length <= 0 ? [] : result[0].identifiers || []
const insertData =
!result || result.length <= 0 ? [] : result[0].identifiers || [];
this.logger.log(
`sync [${insertData.length}] l2_relayed_message_events from block [${start}] to [${end}]`,
);
Expand Down Expand Up @@ -250,44 +364,46 @@ export class TasksService {
: currentBlockNumber;
}
if (currentBlockNumber >= start + 1) {
const result = await this.l1IngestionService.createStateBatchesEvents(
start + 1,
end,
).catch(e=> {
console.error(`insert state batch failed, number: ${start} ${end}`)
});
if(result){
const insertData = !result || result.length <= 0 ? [] : result[0].identifiers || []
const result = await this.l1IngestionService
.createStateBatchesEvents(start + 1, end)
.catch((e) => {
console.error(`insert state batch failed, number: ${start} ${end}`);
});
if (result) {
const insertData =
!result || result.length <= 0 ? [] : result[0].identifiers || [];
this.logger.log(
`sync [${insertData.length}] state_batch from block [${start}] to [${end}]`,
);
await this.cacheManager.set(STATE_BATCH, end, { ttl: 0 });
}else {
console.error('result insert state batch data failed')
} else {
console.error('result insert state batch data failed');
}

} else {
this.logger.log(
`sync state_batch finished and latest block number is: ${currentBlockNumber}`,
);
}
}


async miss_data_script_start(block) {
console.log('-------------- start script , start block', block)
const result = await this.l1IngestionService.saveStateBatchMissedScript(block).catch(e=> {
console.error(`insert state batch failed,`)
});
console.log('list sync completed, the next block:', result)
if(result && result < 9146135){
this.miss_data_script_start(result)
}else {
console.error('result insert state batch data failed, or sync completed!', result)
console.log('-------------- start script , start block', block);
const result = await this.l1IngestionService
.saveStateBatchMissedScript(block)
.catch((e) => {
console.error(`insert state batch failed,`);
});
console.log('list sync completed, the next block:', result);
if (result && result < 9146135) {
this.miss_data_script_start(result);
} else {
console.error(
'result insert state batch data failed, or sync completed!',
result,
);
}
}


/* @Interval(2000)
async txn_batch() {
let end = 0;
Expand Down Expand Up @@ -346,15 +462,26 @@ export class TasksService {
@Interval(5000)
async eigen_da_batch_txns() {
try {
const fromStoreNumber = Number(await this.cacheManager.get(DA_BATCH_INDEX));
console.log('[syncEigenDaBatch] start eigenda service fromStoreNumber: ', fromStoreNumber);
const result = await this.l1IngestionService.syncEigenDaBatch(fromStoreNumber);
const fromStoreNumber = Number(
await this.cacheManager.get(DA_BATCH_INDEX),
);
console.log(
'[syncEigenDaBatch] start eigenda service fromStoreNumber: ',
fromStoreNumber,
);
const result = await this.l1IngestionService.syncEigenDaBatch(
fromStoreNumber,
);
if (result) {
console.log('[syncEigenDaBatch] add DA_BATCH_INDEX');
await this.cacheManager.set(DA_BATCH_INDEX, fromStoreNumber + 1, { ttl: 0 });
await this.cacheManager.set(DA_BATCH_INDEX, fromStoreNumber + 1, {
ttl: 0,
});
}
} catch (error) {
this.logger.error(`[syncEigenDaBatch] error eigen da batches err: ${error}`);
this.logger.error(
`[syncEigenDaBatch] error eigen da batches err: ${error}`,
);
}
}
}

0 comments on commit 4ad3dd4

Please sign in to comment.