diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index b0e93432..de41597d 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -255,6 +255,7 @@ def __init__(self, env: "Env", db: DB, daemon: Daemon, notifications: "Notificat self.touched = set() self.semaphore = asyncio.Semaphore() self.reorg_count = 0 + self.reorg_processing = False self.height = -1 self.tip = None # type: Optional[bytes] self.tip_advanced_event = asyncio.Event() @@ -354,6 +355,10 @@ async def check_and_advance_blocks(self, raw_blocks): hprevs = [self.coin.header_prevhash(h) for h in headers] chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] + if self.reorg_processing: + self.logger.warning("Awaiting the previous reorg to finished...") + await self.semaphore.acquire() + if hprevs == chain: start = time.monotonic() await self.run_in_thread_with_lock(self.advance_blocks, blocks) @@ -387,38 +392,42 @@ async def reorg_chain(self, count=None): Count is the number of blocks to simulate a reorg, or None for a real reorg.""" + self.reorg_processing = True if count is None: self.logger.info("chain reorg detected") else: self.logger.info(f"faking a reorg of {count:,d} blocks") - await self.flush(True) - - async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]: - heights = range(last_height, last_height - len(hex_hashes), -1) - try: - blocks = [self.db.read_raw_block(height) for height in heights] - self.logger.info(f"read {len(blocks)} blocks from disk") - return blocks - except FileNotFoundError: - return await self.daemon.raw_blocks(hex_hashes) - - def flush_backup(): - # self.touched can include other addresses which is - # harmless, but remove None. - self.touched.discard(None) - self.db.flush_backup(self.flush_data(), self.touched) - - _start, last, hashes = await self.reorg_hashes(count) - # Reverse and convert to hex strings. - hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] - for hex_hashes in chunks(hashes, 50): - raw_blocks = await get_raw_blocks(last, hex_hashes) - await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - await self.run_in_thread_with_lock(flush_backup) - last -= len(raw_blocks) - await self.prefetcher.reset_height(self.height) - self.backed_up_event.set() - self.backed_up_event.clear() + try: + await self.flush(True, force=True) + + async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]: + heights = range(last_height, last_height - len(hex_hashes), -1) + try: + blocks = [self.db.read_raw_block(height) for height in heights] + self.logger.info(f"read {len(blocks)} blocks from disk") + return blocks + except FileNotFoundError: + return await self.daemon.raw_blocks(hex_hashes) + + def flush_backup(): + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) + self.db.flush_backup(self.flush_data(), self.touched) + + _start, last, hashes = await self.reorg_hashes(count) + # Reverse and convert to hex strings. + hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] + for hex_hashes in chunks(hashes, 50): + raw_blocks = await get_raw_blocks(last, hex_hashes) + await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) + await self.run_in_thread_with_lock(flush_backup) + last -= len(raw_blocks) + await self.prefetcher.reset_height(self.height) + self.backed_up_event.set() + self.backed_up_event.clear() + finally: + self.reorg_processing = False async def reorg_hashes(self, count): """Return a pair (start, last, hashes) of blocks to back up during a @@ -504,9 +513,9 @@ def flush_data(self): self.op_data_cache, ) - async def flush(self, flush_utxos): + async def flush(self, flush_utxos, force=False): def flush(): - self.db.flush_dbs(self.flush_data(), flush_utxos, self.estimate_txs_remaining) + self.db.flush_dbs(self.flush_data(), flush_utxos, self.estimate_txs_remaining, force=force) await self.run_in_thread_with_lock(flush) @@ -564,6 +573,7 @@ def advance_blocks(self, blocks): self.undo_infos.append((undo_info, height)) self.atomicals_undo_infos.append((atomicals_undo_info, height)) self.db.write_raw_block(block.raw, height) + self.logger.debug(f'processed block {height} with {len(block.transactions)} txs') headers = [block.header for block in blocks] self.height = height @@ -3990,7 +4000,7 @@ def backup_blocks(self, raw_blocks: Sequence[bytes]): The blocks should be in order of decreasing height, starting at. self.height. A flush is performed once the blocks are backed up. """ - self.db.assert_flushed(self.flush_data()) + # self.db.assert_flushed(self.flush_data()) assert self.height >= len(raw_blocks) genesis_activation = self.coin.GENESIS_ACTIVATION