diff --git a/.cspell/custom-words.txt b/.cspell/custom-words.txt index ee11e5b6e32..5fb88e58658 100644 --- a/.cspell/custom-words.txt +++ b/.cspell/custom-words.txt @@ -42,6 +42,7 @@ autouse backpressure BADTOKEN Baka +Bardock beafy BindGen Bitwarden diff --git a/server/parsec/realm_export.py b/server/parsec/realm_export.py new file mode 100644 index 00000000000..0e8305e91aa --- /dev/null +++ b/server/parsec/realm_export.py @@ -0,0 +1,1174 @@ +# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS +from __future__ import annotations + +import asyncio +import queue +import sqlite3 +import threading +from collections import deque +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any, AsyncGenerator, Callable, Literal + +from parsec._parsec import BlockID, DateTime, OrganizationID, VlobID +from parsec.backend import Backend +from parsec.ballpark import BALLPARK_CLIENT_LATE_OFFSET +from parsec.components.blockstore import BlockStoreReadBadOutcome +from parsec.components.realm import ( + RealmExportBlocksMetadataBatch, + RealmExportCertificates, + RealmExportDoBaseInfo, + RealmExportDoBaseInfoBadOutcome, + RealmExportDoBlocksBatchMetadataBadOutcome, + RealmExportDoCertificatesBadOutcome, + RealmExportDoVlobsBatchBadOutcome, + RealmExportVlobsBatch, +) +from parsec.logging import get_logger + +logger = get_logger() + + +type SequentialID = int + + +class RealmExporterError(Exception): + pass + + +class RealmExporterInputError(RealmExporterError): + """ + Error raised due to invalid input parameters (i.e. the caller is at fault). + + This typically occurs when trying to export an non-existing realm. + """ + + pass + + +class RealmExporterOutputDbError(RealmExporterError): + """ + Error raised due to an issue with the output SQLite database. + + This typically occurs when the database exist but is corrupted, or doesn't + correspond to the realm being exported. + """ + + pass + + +class RealmExporterInputDbError(RealmExporterError): + """ + Error raised due to an issue with the input PostgreSQL database, or the + blockstore. + + This typically occurs if there is some inconsistency between the block + metadata (stored in PostgreSQL) and the block data (stored in the blockstore). + """ + + pass + + +# Considering vlob to be a couple of Ko in size, 100k items means under 1Go of data per batch +VLOB_EXPORT_BATCH_SIZE = 100_000 +# Block metadata are really small (< 100 bytes) +BLOCK_METADATA_EXPORT_BATCH_SIZE = 1_000_000 +BLOCK_DATA_EXPORT_PARALLELISM = 10 +# Among of RAM we are willing to use to store block data in memory +# before flushing it to the SQLite database. +BLOCK_DATA_EXPORT_RAM_LIMIT = 2**30 # 1Go +MAX_CONSECUTIVE_STORE_UNAVAILABLE_ERRORS = 5 +MAX_STORE_UNAVAILABLE_BACKOFF_SLEEP = 60 # seconds + +OUTPUT_DB_MAGIC_NUMBER = 87948 +OUTPUT_DB_VERSION = 1 +OUTPUT_DB_INIT_QUERY = f""" +------------------------------------------------------------------------- +-- Database info +------------------------------------------------------------------------- + + +-- This magic number has two roles: +-- 1) It makes unlikely we mistakenly consider an unrelated database as legit. +-- 2) It acts as a constant ID to easily retrieve the single row in the table. +CREATE TABLE IF NOT EXISTS info ( + magic INTEGER UNIQUE NOT NULL DEFAULT {OUTPUT_DB_MAGIC_NUMBER}, + version INTEGER NOT NULL DEFAULT {OUTPUT_DB_VERSION}, + + organization_id VARCHAR(32) NOT NULL, + realm_id BLOB NOT NULL, + root_verify_key BLOB NOT NULL, + + -- The export can be considered to be a snapshot of the realm up to this point in time + -- (Note this doesn't necessarily correspond to when the export has been done). + snapshot_timestamp INTEGER NOT NULL, -- us since UNIX epoch + + -- Total amount of data exported + vlobs_total_bytes INTEGER NOT NULL, + blocks_total_bytes INTEGER NOT NULL, + + certificates_export_done INTEGER NOT NULL DEFAULT 0, -- Boolean + vlobs_export_done INTEGER NOT NULL DEFAULT 0, -- Boolean + blocks_metadata_export_done INTEGER NOT NULL DEFAULT 0, -- Boolean + blocks_data_export_done INTEGER NOT NULL DEFAULT 0 -- Boolean +); + + +------------------------------------------------------------------------- +-- Certificates export +------------------------------------------------------------------------- + + +-- Common certificates provided in-order +CREATE TABLE common_certificate ( + _id INTEGER PRIMARY KEY, + certificate BLOB NOT NULL +); + +-- Sequester certificates provided in-order +CREATE TABLE sequester_certificate ( + _id INTEGER PRIMARY KEY, + certificate BLOB NOT NULL +); + +-- Realm's certificates provided in-order +CREATE TABLE realm_certificate ( + _id INTEGER PRIMARY KEY, + certificate BLOB NOT NULL +); + +-- Note Shamir recovery and other realms' certificates are not exported +-- since they are unrelated to the realm being exported. + + +------------------------------------------------------------------------- +-- Vlobs export +------------------------------------------------------------------------- + + +CREATE TABLE vlob_atom ( + -- We use `realm_vlob_update`'s `index` field as primary key. + -- This means the vlob atoms are ordered according to how they got added + -- in the server in the first place. + sequential_id INTEGER PRIMARY KEY, + vlob_id BLOB NOT NULL, -- VlobID + version INTEGER NOT NULL, + key_index INTEGER NOT NULL, + blob BLOB NOT NULL, + size INTEGER NOT NULL, + author BLOB NOT NULL, -- DeviceID + -- Note this field is called `created_on` in Parsec datamodel, but it correspond + -- in fact to the timestamp field in the API ! So we stick with the latter. + -- On top of that, unlike PostgreSQL, SQLite doesn't have a TIMESTAMPZ type out + -- of the box so we have to roll our own integer-based format. + timestamp INTEGER NOT NULL -- us since UNIX epoch +); + + +------------------------------------------------------------------------- +-- Blocks export +------------------------------------------------------------------------- + + +CREATE TABLE block ( + -- Primary key is not SERIAL given we will take the one present in the Parsec database. + -- This means the blocks are ordered according to how they got added in the server + -- in the first place. + sequential_id INTEGER PRIMARY KEY, + block_id BLOB NOT NULL, + author BLOB NOT NULL, -- DeviceID + size INTEGER NOT NULL, + key_index INTEGER NOT NULL +); + +CREATE TABLE block_data ( + block INTEGER PRIMARY KEY REFERENCES block(_id), + data BLOB NOT NULL +); +""" + + +def _sqlite_init_db( + output_db_path: Path, + organization_id: OrganizationID, + realm_id: VlobID, + snapshot_timestamp: DateTime, + root_verify_key: bytes, + vlobs_total_bytes: int, + blocks_total_bytes: int, +) -> sqlite3.Connection: + try: + con = sqlite3.connect( + f"file:{output_db_path}?mode=rw", uri=True, autocommit=False, check_same_thread=True + ) + except sqlite3.Error: + # Export database doesn't exists + try: + # Create the database... + con = sqlite3.connect(output_db_path) + # ...and initialize it + con.executescript(OUTPUT_DB_INIT_QUERY) + con.execute( + """\ + INSERT INTO info ( + organization_id,\ + realm_id,\ + snapshot_timestamp,\ + root_verify_key,\ + vlobs_total_bytes,\ + blocks_total_bytes\ + ) VALUES (?, ?, ?, ?, ?, ?)\ + """, + ( + organization_id.str, + realm_id.bytes, + snapshot_timestamp.as_timestamp_micros(), + root_verify_key, + vlobs_total_bytes, + blocks_total_bytes, + ), + ) + con.commit() + except sqlite3.Error as exc: + raise RealmExporterOutputDbError(f"Cannot create export database: {exc}") from exc + + try: + # Export database already exists, we should make sure it format is expected + try: + row = con.execute( + """SELECT\ + version,\ + organization_id,\ + realm_id,\ + snapshot_timestamp,\ + root_verify_key,\ + vlobs_total_bytes,\ + blocks_total_bytes\ + FROM info WHERE magic = ?\ + """, + (OUTPUT_DB_MAGIC_NUMBER,), + ).fetchone() + except sqlite3.Error as exc: + # If we end up here this is most likely because `info` table doesn't exists (or miss some columns) + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: {exc}" + ) from exc + if not row: + # `info` table exists and is valid, but magic number doesn't match + raise RealmExporterOutputDbError( + "Existing output target is not a valid export database: no info row" + ) + ( + db_version, + db_organization_id, + db_realm_id, + db_snapshot_timestamp, + db_root_verify_key, + db_vlobs_total_bytes, + db_blocks_total_bytes, + ) = row + + match db_version: + case int(): + if db_version != OUTPUT_DB_VERSION: + raise RealmExporterOutputDbError( + f"Existing output export database version format is not supported: got version `{db_version}` but only version `{OUTPUT_DB_VERSION}` is supported" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.version` value `{unknown!r}` (expected int)" + ) + + match db_organization_id: + case str(): + if db_organization_id != organization_id.str: + raise RealmExporterOutputDbError( + f"Existing output export database is for a different realm: got `{db_organization_id}` instead of expected `{organization_id.str}`" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.organization_id` value `{unknown!r}` (expected str)" + ) + + match db_realm_id: + case bytes(): + if db_realm_id != realm_id.bytes: + raise RealmExporterOutputDbError( + f"Existing output export database is for a different realm: got `0x{db_realm_id.hex()}` instead of expected `0x{realm_id.hex}`" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.realm_id` value `{unknown!r}` (expected bytes)" + ) + + match db_snapshot_timestamp: + case int(): + if db_snapshot_timestamp != snapshot_timestamp.as_timestamp_micros(): + try: + display_db_snapshot_timestamp = DateTime.from_timestamp_micros( + db_snapshot_timestamp + ) + except ValueError: + display_db_snapshot_timestamp = ( + f"" + ) + raise RealmExporterOutputDbError( + f"Existing output export database is for a different timestamp: got `{display_db_snapshot_timestamp}` instead of expected `{snapshot_timestamp}`" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.snapshot_timestamp` value `{unknown!r}` (expected int)" + ) + + match db_root_verify_key: + case bytes(): + if db_root_verify_key != root_verify_key: + raise RealmExporterOutputDbError( + f"Existing output export database is for a different realm: realm ID `0x{realm_id.hex}` is the same but root verify key differs" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.root_verify_key` value `{unknown!r}` (expected bytes)" + ) + + match db_vlobs_total_bytes: + case int(): + if db_vlobs_total_bytes != vlobs_total_bytes: + raise RealmExporterOutputDbError( + f"Existing output export database doesn't match: realm ID `0x{realm_id.hex}` and snapshot timestamp `{snapshot_timestamp}` are the same, but vlobs total bytes differs" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.vlobs_total_bytes` value `{unknown!r}` (expected int)" + ) + + match db_blocks_total_bytes: + case int(): + if db_blocks_total_bytes != blocks_total_bytes: + raise RealmExporterOutputDbError( + f"Existing output export database doesn't match: realm ID `0x{realm_id.hex}` and snapshot timestamp `{snapshot_timestamp}` are the same, but blocks total bytes differs" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.blocks_total_bytes` value `{unknown!r}` (expected int)" + ) + + except: + con.close() + raise + + return con + + +# We send callback function from asyncio to access the SQLite connection. +# This is to have a single thread managing the SQLite connection and ensuring +# thread safety on it. +type OutputDBSqliteJobCb[R] = tuple[Callable[[sqlite3.Connection], R], asyncio.Queue[R | Exception]] + + +class OutputDBConnection: + def __init__( + self, con: sqlite3.Connection, queries_queue: queue.Queue[OutputDBSqliteJobCb[Any]] + ): + self._con = con + # Queue contains: (SQL query, parameters, oneshot queue to return result) + self._queries_queue = queries_queue + + @classmethod + @asynccontextmanager + async def connect_or_create( + cls, + output_db_path: Path, + organization_id: OrganizationID, + realm_id: VlobID, + snapshot_timestamp: DateTime, + root_verify_key: bytes, + vlobs_total_bytes: int, + blocks_total_bytes: int, + ) -> AsyncGenerator[OutputDBConnection]: + # Make this queue infinite to ensure we won't block the asyncio event loop + queries_queue: queue.Queue[OutputDBSqliteJobCb[Any]] = queue.Queue(maxsize=0) + con_ready: asyncio.Queue[sqlite3.Connection] = asyncio.Queue(maxsize=1) + stop_requested = False + + asyncio_loop = asyncio.get_event_loop() + + def _sqlite3_worker(): + nonlocal con_ready + con = _sqlite_init_db( + output_db_path, + organization_id, + realm_id, + snapshot_timestamp, + root_verify_key, + vlobs_total_bytes, + blocks_total_bytes, + ) + try: + asyncio_loop.call_soon_threadsafe(con_ready.put_nowait, con) + del con_ready + + while not stop_requested: + cb, result_queue = queries_queue.get() + if stop_requested: + break + try: + result = cb(con) + except Exception as exc: + asyncio_loop.call_soon_threadsafe(result_queue.put_nowait, exc) + else: + asyncio_loop.call_soon_threadsafe(result_queue.put_nowait, result) + + finally: + con.close() + + try: + async with asyncio.TaskGroup() as tg: + tg.create_task(asyncio.to_thread(_sqlite3_worker)) + con = await con_ready.get() + + try: + yield cls(con, queries_queue) + finally: + stop_requested = True + # Also push a dummy job to wake up the worker if no job is in the queue + queries_queue.put_nowait((lambda _: None, asyncio.Queue(maxsize=1))) + + # `RealmExporterError` are our own error type, so if it occurs there most + # likely won't be any other concurrent errors (and if there is, those errors + # are less relevant than our own error in any case). + except* RealmExporterError as exc: + raise exc.exceptions[0] + + async def execute[R](self, cb: Callable[[sqlite3.Connection], R]) -> R: + ret_queue: asyncio.Queue[R | Exception] = asyncio.Queue(maxsize=1) + self._queries_queue.put_nowait((cb, ret_queue)) + match await ret_queue.get(): + case Exception() as exc: + raise exc + case ret: + return ret + + +type ToExportBytes = int # Total amount of data in bytes to be exported +type ExportedBytes = int # Amount of data in bytes that have been exported so far +type ExportProgressStep = ( + Literal["certificates_start"] + | Literal["certificates_done"] + | tuple[Literal["vlobs"], ExportedBytes, ToExportBytes] + | tuple[Literal["blocks_metadata"], ExportedBytes, ToExportBytes] + | tuple[Literal["blocks_data"], ExportedBytes, ToExportBytes] +) +type ProgressReportCallback = Callable[[ExportProgressStep], None] + + +def get_earliest_allowed_snapshot_timestamp() -> DateTime: + """ + The very first step we do when exporting a realm is to determine what should be + exported according to the snapshot timestamp parameter. + + Obviously snapshot timestamp is not allowed to be beyond present time since only + Bardock can see in the future. + + On top of that, it is also bad to have the snapshot timestamp in the past but + too close to the present time as the server is still allowed accept data timed + before our snapshot timestamp. + """ + return DateTime.now().subtract(seconds=BALLPARK_CLIENT_LATE_OFFSET) + + +async def export_realm( + backend: Backend, + organization_id: OrganizationID, + realm_id: VlobID, + snapshot_timestamp: DateTime, + output_db_path: Path, + on_progress: ProgressReportCallback, +): + earlier_allowed_timestamp = get_earliest_allowed_snapshot_timestamp() + if snapshot_timestamp > earlier_allowed_timestamp: + raise RealmExporterInputError( + f"Snapshot timestamp cannot be more recent than present time - {BALLPARK_CLIENT_LATE_OFFSET}s (i.e. `{earlier_allowed_timestamp}`)" + ) + + # 0) Ensure the export is possible since organization & realm exist on the server + + outcome = await backend.realm.export_do_base_info( + organization_id=organization_id, realm_id=realm_id, snapshot_timestamp=snapshot_timestamp + ) + match outcome: + case RealmExportDoBaseInfo() as base_info: + pass + case RealmExportDoBaseInfoBadOutcome.ORGANIZATION_NOT_FOUND: + raise RealmExporterInputError(f"Organization `{organization_id.str}` doesn't exists") + case RealmExportDoBaseInfoBadOutcome.REALM_NOT_FOUND: + raise RealmExporterInputError( + f"Realm `{realm_id.hex}` doesn't exist in organization `{organization_id.str}`" + ) + case RealmExportDoBaseInfoBadOutcome.REALM_DIDNT_EXIST_AT_SNAPSHOT_TIMESTAMP: + raise RealmExporterInputError( + f"Requested snapshot timestamp `{snapshot_timestamp}` is older than realm creation" + ) + + # 1) Create the output SQLite database (or re-open if it already exists) + + async with OutputDBConnection.connect_or_create( + output_db_path=output_db_path, + organization_id=organization_id, + realm_id=realm_id, + snapshot_timestamp=snapshot_timestamp, + root_verify_key=base_info.root_verify_key.encode(), + vlobs_total_bytes=base_info.vlobs_total_bytes, + blocks_total_bytes=base_info.blocks_total_bytes, + ) as output_db_con: + # 2) Export certificates + + await _do_export_certificates( + on_progress, + backend, + output_db_con, + organization_id, + realm_id, + common_certificate_timestamp_upper_bound=base_info.common_certificate_timestamp_upper_bound, + realm_certificate_timestamp_upper_bound=base_info.realm_certificate_timestamp_upper_bound, + sequester_certificate_timestamp_upper_bound=base_info.sequester_certificate_timestamp_upper_bound, + ) + + # 3) Export vlobs + + await _do_export_vlobs( + on_progress, + backend, + output_db_con, + organization_id, + realm_id, + base_info.vlob_offset_marker_upper_bound, + ) + + # 4) Export blocks metadata + + await _do_export_blocks_metadata( + on_progress, + backend, + output_db_con, + organization_id, + realm_id, + base_info.block_offset_marker_upper_bound, + ) + + # 4) Export blocks data + + await _do_export_blocks_data(on_progress, backend, output_db_con, organization_id) + + # 5) All done \o/ + + +async def _do_export_certificates( + on_progress: ProgressReportCallback, + backend: Backend, + output_db_con: OutputDBConnection, + organization_id: OrganizationID, + realm_id: VlobID, + common_certificate_timestamp_upper_bound: DateTime, + realm_certificate_timestamp_upper_bound: DateTime, + sequester_certificate_timestamp_upper_bound: DateTime | None, +) -> None: + # 0) Skip the operation if the export database already contains it + + def _get_certificates_export_done(con: sqlite3.Connection) -> bool: + row = con.execute( + "SELECT certificates_export_done FROM info", + ).fetchone() + match row[0]: + case 1: + return True + case 0: + return False + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `certificates_export_done` contains unexpected value `{unknown!r}`" + ) + + certificates_export_done = await output_db_con.execute(_get_certificates_export_done) + if certificates_export_done: + return + + # Export needed + + on_progress("certificates_start") + + # 1) Fetch all certificates + + outcome = await backend.realm.export_do_certificates( + organization_id=organization_id, + realm_id=realm_id, + common_certificate_timestamp_upper_bound=common_certificate_timestamp_upper_bound, + realm_certificate_timestamp_upper_bound=realm_certificate_timestamp_upper_bound, + sequester_certificate_timestamp_upper_bound=sequester_certificate_timestamp_upper_bound, + ) + match outcome: + case RealmExportCertificates() as certificates: + pass + case ( + ( + RealmExportDoCertificatesBadOutcome.ORGANIZATION_NOT_FOUND + | RealmExportDoCertificatesBadOutcome.REALM_NOT_FOUND + ) as error + ): + # Organization&realm existence has already been checked, so this shouldn't occur + raise RealmExporterInputError(f"Unexpect outcome when exporting certificates: {error}") + + # 2) Write certificates to export database (and mark this export step as done) + + def _write_sqlite_db(con: sqlite3.Connection): + con.executemany( + "INSERT INTO common_certificate (_id, certificate) VALUES (?, ?)", + enumerate(certificates.common_certificates), + ) + con.executemany( + "INSERT INTO realm_certificate (_id, certificate) VALUES (?, ?)", + enumerate(certificates.realm_certificates), + ) + con.executemany( + "INSERT INTO sequester_certificate (_id, certificate) VALUES (?, ?)", + enumerate(certificates.sequester_certificates), + ) + con.execute( + "UPDATE info SET certificates_export_done = 1", + ) + con.commit() + + await output_db_con.execute(_write_sqlite_db) + + on_progress("certificates_done") + + +async def _do_export_vlobs( + on_progress: ProgressReportCallback, + backend: Backend, + output_db_con: OutputDBConnection, + organization_id: OrganizationID, + realm_id: VlobID, + vlob_offset_marker_upper_bound: SequentialID, +) -> None: + # 0) Skip the operation if the export database already contains it + + def _get_vlobs_export_done( + con: sqlite3.Connection, + ) -> tuple[bool, SequentialID, ExportedBytes, ToExportBytes]: + row = con.execute("SELECT vlobs_export_done, vlobs_total_bytes FROM info").fetchone() + match row[0]: + case 1: + done = True + case 0: + done = False + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `info.vlobs_export_done` contains unexpected value `{unknown!r}` (expected bool)" + ) + match row[1]: + case int() as vlobs_total_bytes: + pass + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `info.vlobs_total_bytes` contains unexpected value `{unknown!r}` (expected int)" + ) + + row = con.execute("SELECT MAX(sequential_id), SUM(size) FROM vlob_atom").fetchone() + match row[0]: + case None: + # Sequential ID starts at 1 so using 0 here is safe + last_vlob_sequential_id = 0 + case int() as last_vlob_sequential_id: + pass + case unknown: + assert False, f"Unexpected value for `last_vlob_sequential_id`: {unknown!r}" + match row[1]: + case None: + exported_bytes = 0 + case int() as exported_bytes: + pass + case unknown: + assert False, f"Unexpected value for `exported_bytes`: {unknown!r}" + + return done, last_vlob_sequential_id, exported_bytes, vlobs_total_bytes + + ( + vlobs_export_done, + current_batch_offset_marker, + exported_bytes, + vlobs_total_bytes, + ) = await output_db_con.execute(_get_vlobs_export_done) + if vlobs_export_done: + return + + # Export needed + + on_progress(("vlobs", exported_bytes, vlobs_total_bytes)) + + while current_batch_offset_marker < vlob_offset_marker_upper_bound: + # 1) Download a batch of data + + outcome = await backend.realm.export_do_vlobs_batch( + organization_id=organization_id, + realm_id=realm_id, + batch_offset_marker=current_batch_offset_marker, + batch_size=VLOB_EXPORT_BATCH_SIZE, + ) + match outcome: + case RealmExportVlobsBatch() as batch: + pass + case ( + ( + RealmExportDoVlobsBatchBadOutcome.ORGANIZATION_NOT_FOUND + | RealmExportDoVlobsBatchBadOutcome.REALM_NOT_FOUND + ) as error + ): + # Organization&realm existence has already been checked, so this shouldn't occur + raise RealmExporterInputError(f"Unexpect outcome when exporting vlobs: {error}") + + if not batch.items: + raise RealmExporterInputError( + "Unexpect outcome when exporting vlobs: all vlob has been exported without finding the upper bound marker" + ) + elif batch.items[-1].sequential_id > vlob_offset_marker_upper_bound: + # This batch is the last one as it contains the upper bound, + # hence we have to filter out the items that are above it. + batch.items = [ + item for item in batch.items if item.sequential_id <= vlob_offset_marker_upper_bound + ] + + # 2) Write the batch to export database + + def _write_sqlite_db(con: sqlite3.Connection): + con.executemany( + "INSERT INTO vlob_atom (\ + sequential_id,\ + vlob_id,\ + version,\ + key_index,\ + blob,\ + size,\ + author,\ + timestamp\ + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ( + ( + item.sequential_id, + item.vlob_id.bytes, + item.version, + item.key_index, + item.blob, + item.size, + item.author.bytes, + item.timestamp.as_timestamp_micros(), + ) + for item in batch.items + ), + ) + con.commit() + + await output_db_con.execute(_write_sqlite_db) + + current_batch_offset_marker = batch.batch_offset_marker + + exported_bytes += sum(item.size for item in batch.items) + on_progress(("vlobs", exported_bytes, vlobs_total_bytes)) + + # 3) Mark this export step as done + + def _write_sqlite_db(con: sqlite3.Connection): + con.execute( + "UPDATE info SET vlobs_export_done = TRUE", + ) + con.commit() + + await output_db_con.execute(_write_sqlite_db) + + +async def _do_export_blocks_metadata( + on_progress: ProgressReportCallback, + backend: Backend, + output_db_con: OutputDBConnection, + organization_id: OrganizationID, + realm_id: VlobID, + block_offset_marker_upper_bound: SequentialID, +) -> None: + # 0) Skip the operation if the export database already contains it + + def _get_blocks_metadata_export_done( + con: sqlite3.Connection, + ) -> tuple[bool, SequentialID, ExportedBytes, ToExportBytes]: + row = con.execute( + "SELECT blocks_metadata_export_done, blocks_total_bytes FROM info" + ).fetchone() + match row[0]: + case 1: + done = True + case 0: + done = False + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `blocks_metadata_export_done` contains unexpected value `{unknown!r}`" + ) + match row[1]: + case int() as blocks_total_bytes: + pass + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `info.blocks_total_bytes` contains unexpected value `{unknown!r}` (expected int)" + ) + + row = con.execute("SELECT MAX(sequential_id), SUM(size) FROM block").fetchone() + match row[0]: + case None: + # Sequential ID starts at 1 so using 0 here is safe + last_block_metadata_sequential_id = 0 + case int() as last_block_metadata_sequential_id: + pass + case unknown: + assert ( + False + ), f"Unexpected value for `last_block_metadata_sequential_id`: {unknown!r}" + match row[1]: + case None: + exported_bytes = 0 + case int() as exported_bytes: + pass + case unknown: + assert False, f"Unexpected value for `exported_bytes`: {unknown!r}" + + return done, last_block_metadata_sequential_id, exported_bytes, blocks_total_bytes + + ( + blocks_metadata_export_done, + current_batch_offset_marker, + exported_bytes, + blocks_total_bytes, + ) = await output_db_con.execute(_get_blocks_metadata_export_done) + if blocks_metadata_export_done: + return + + # Export needed + + on_progress(("blocks_metadata", exported_bytes, blocks_total_bytes)) + + while current_batch_offset_marker < block_offset_marker_upper_bound: + # 1) Download a batch of data + + outcome = await backend.realm.export_do_blocks_metadata_batch( + organization_id=organization_id, + realm_id=realm_id, + batch_offset_marker=current_batch_offset_marker, + batch_size=BLOCK_METADATA_EXPORT_BATCH_SIZE, + ) + match outcome: + case RealmExportBlocksMetadataBatch() as batch: + pass + case ( + ( + RealmExportDoBlocksBatchMetadataBadOutcome.ORGANIZATION_NOT_FOUND + | RealmExportDoBlocksBatchMetadataBadOutcome.REALM_NOT_FOUND + ) as error + ): + # Organization&realm existence has already been checked, so this shouldn't occur + raise RealmExporterInputError( + f"Unexpect outcome when exporting certificates: {error}" + ) + + if not batch.items: + raise RealmExporterInputError( + "Unexpect outcome when exporting blocks metadata: all blocks has been exported without finding the upper bound marker" + ) + elif batch.items[-1].sequential_id > block_offset_marker_upper_bound: + # This batch is the last one as it contains the upper bound, + # hence we have to filter out the items that are above it. + batch.items = [ + item + for item in batch.items + if item.sequential_id <= block_offset_marker_upper_bound + ] + + # 2) Write the batch to export database + + def _write_sqlite_db(con: sqlite3.Connection): + con.executemany( + "INSERT INTO block (\ + sequential_id,\ + block_id,\ + author,\ + size,\ + key_index\ + ) VALUES (?, ?, ?, ?, ?)", + ( + ( + item.sequential_id, + item.block_id.bytes, + item.author.bytes, + item.size, + item.key_index, + ) + for item in batch.items + ), + ) + con.commit() + + await output_db_con.execute(_write_sqlite_db) + + current_batch_offset_marker = batch.batch_offset_marker + + exported_bytes += sum(item.size for item in batch.items) + on_progress(("blocks_metadata", exported_bytes, blocks_total_bytes)) + + def _write_sqlite_db(con: sqlite3.Connection): + con.execute( + "UPDATE info SET blocks_metadata_export_done = TRUE", + ) + con.commit() + + await output_db_con.execute(_write_sqlite_db) + + +async def _do_export_blocks_data( + on_progress: ProgressReportCallback, + backend: Backend, + output_db_con: OutputDBConnection, + organization_id: OrganizationID, +) -> None: + # 0) Skip the operation if the export database already contains it + + def _get_blocks_data_export_done( + con: sqlite3.Connection, + ) -> tuple[bool, ExportedBytes, ToExportBytes]: + row = con.execute( + "SELECT blocks_data_export_done, blocks_total_bytes FROM info", + ).fetchone() + match row[0]: + case 1: + done = True + case 0: + done = False + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `blocks_data_export_done` contains unexpected value `{unknown!r}`" + ) + match row[1]: + case int() as blocks_total_bytes: + pass + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `info.blocks_total_bytes` contains unexpected value `{unknown!r}` (expected int)" + ) + + row = con.execute( + "SELECT SUM(block.size) FROM block_data LEFT JOIN block ON block_data.block = block.sequential_id" + ).fetchone() + match row[0]: + case None: + exported_bytes = 0 + case int() as exported_bytes: + pass + case unknown: + assert False, f"Unexpected value for `exported_bytes`: {unknown!r}" + + return done, exported_bytes, blocks_total_bytes + + blocks_data_export_done, exported_bytes, blocks_total_bytes = await output_db_con.execute( + _get_blocks_data_export_done + ) + if blocks_data_export_done: + return + + # Export needed + + on_progress(("blocks_data", exported_bytes, blocks_total_bytes)) + + # /!\ One important point to note is that, unlike vlob and block metadata export + # steps, we don't export items in a strictly growing pattern according to their + # sequential ID. + # + # This is for two reasons: + # - We can determine what remains to export simply by looking into the output + # database for block metadata without corresponding data. + # This ensure the output database is always consistent once export is finished. + # - We fetch the block data in parallel, and flush it to the output database + # whenever a memory threshold is reached. + # Also, connection to the blockstore can be unreliable and we try to carry on + # nevertheless. + # + # In particular, the last point means two things: + # - The data we flush on the output database has no ordering guarantee. + # - Any given batch can end up partially exported before we move to the next one. + + def _get_next_batch_of_blocks(con: sqlite3.Connection) -> deque[tuple[SequentialID, BlockID]]: + rows = con.execute( + "SELECT block.sequential_id, block.block_id\ + FROM block LEFT JOIN block_data\ + ON block_data.block = block.sequential_id\ + WHERE block_data.data IS NULL\ + LIMIT ?", + # Use the metadata batch size here since we *are* retrieving metadata + # from the export database. + (BLOCK_METADATA_EXPORT_BATCH_SIZE,), + ).fetchall() + + batch: deque[tuple[SequentialID, BlockID]] = deque() + for row in rows: + match row[0]: + case int() as sequential_id: + pass + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `block` table contains unexpected `sequential_id` value `{unknown!r}` (expected int)" + ) + + match row[1]: + case bytes() as raw: + try: + block_id = BlockID.from_bytes(raw) + except ValueError as exc: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `block` table contains unexpected `block_id` value `{raw!r}` (not a valid BlockID)" + ) from exc + + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `block` table contains unexpected `block_id` value `{unknown!r}` (expected bytes)" + ) + + batch.append((sequential_id, block_id)) + + return batch + + # We need a lock here to ensure the SQLite flush and updates of `to_flush_data` & + # `to_flush_data_total_size` are done atomically. + # This is important since `to_flush_data` & `to_flush_data_total_size` are accessed + # from multiple threads (the main asyncio thread + the `asyncio.to_thread` used + # to do SQLite operations). + to_flush_lock = threading.Lock() + to_flush_data: list[tuple[SequentialID, bytes]] = [] + to_flush_data_total_size = 0 + + def _flush_data_to_sqlite(con: sqlite3.Connection) -> None: + nonlocal to_flush_data_total_size, to_flush_data + + # It could be tempting here to take the lock for the whole duration of the + # SQLite operation here, but this would be a bad idea since it would block + # the asyncio event loop whenever `_add_block_data_and_maybe_flush_to_sqlite` + # is called. + # So instead we only take the lock to steal the list of data to flush and + # reset it. + with to_flush_lock: + to_insert = to_flush_data + to_flush_data = [] + to_flush_data_total_size = 0 + + con.executemany( + "INSERT INTO block_data (block, data) VALUES (?, ?)", + to_insert, + ) + con.commit() + + async def _add_block_data_and_maybe_flush_to_sqlite( + block_sequential_id: SequentialID, data: bytes + ) -> None: + nonlocal to_flush_data_total_size + nonlocal exported_bytes + + with to_flush_lock: + to_flush_data.append((block_sequential_id, data)) + to_flush_data_total_size += len(data) + + if to_flush_data_total_size >= BLOCK_DATA_EXPORT_RAM_LIMIT: + await output_db_con.execute(_flush_data_to_sqlite) + + # Note we report the progress before the flush is actually done on the output + # database. + # This is because the slow part (justifying the need for this progress report) + # is fetching data from the blockstore. + exported_bytes += len(data) + on_progress(("blocks_data", exported_bytes, blocks_total_bytes)) + + while True: + batch = await output_db_con.execute(_get_next_batch_of_blocks) + if not batch: + break + + # Now process our batch in parallel + + consecutive_store_unavailable_errors = 0 + + class StoreUnavailable(Exception): + pass + + async def _fetch_data_in_batch() -> None: + while True: + try: + (block_sequential_id, block_id) = batch.popleft() + except IndexError: + # Nothing more to fetch ! + break + + outcome = await backend.blockstore.read(organization_id, block_id) + match outcome: + case bytes() as data: + nonlocal consecutive_store_unavailable_errors + consecutive_store_unavailable_errors = 0 + await _add_block_data_and_maybe_flush_to_sqlite(block_sequential_id, data) + + case BlockStoreReadBadOutcome.BLOCK_NOT_FOUND: + # TODO: We currently never remove any block data from a realm (there + # is a `deleted_on` field in the `block` table but it is unused + # for now). + # This code should be updated if we ever decide to do so. + raise RealmExporterInputDbError( + f"Block `{block_id}` is missing from the blockstore database" + ) + + case BlockStoreReadBadOutcome.STORE_UNAVAILABLE: + # By raising this error, we stop all parallel tasks (leaving the + # current batch un-achieved but this is fine by design) to wait a + # bit before retrying. + # It would be a shame to abandon all those precious downloaded bytes, + # so flush them before leaving ! + await output_db_con.execute(_flush_data_to_sqlite) + raise StoreUnavailable + + while True: + try: + async with asyncio.TaskGroup() as tg: + for _ in range(BLOCK_DATA_EXPORT_PARALLELISM): + tg.create_task(_fetch_data_in_batch()) + + # The current batch is done, we must ensure all the corresponding data + # are flushed to the output database before fetching a new one (otherwise + # the next batch will contain the blocks that have been fetched but not + # flushed yet !). + await output_db_con.execute(_flush_data_to_sqlite) + break + + except StoreUnavailable: + consecutive_store_unavailable_errors += 1 + if consecutive_store_unavailable_errors > MAX_CONSECUTIVE_STORE_UNAVAILABLE_ERRORS: + raise RealmExporterInputDbError( + "Blockstore database is unavailable after too many retries" + ) + backoff = min( + 2**consecutive_store_unavailable_errors, MAX_STORE_UNAVAILABLE_BACKOFF_SLEEP + ) + logger.warning(f"Blockstore database is unavailable, retrying in {backoff}s") + + def _write_sqlite_db(con: sqlite3.Connection): + # Sanity check to ensure each `block` and `block_data` tables are + # consistent with each other. + row = con.execute( + "SELECT 1 FROM block LEFT JOIN block_data ON block.sequential_id = block_data.block WHERE block_data.data IS NULL LIMIT 1" + ).fetchone() + assert row is None, "Some blocks have been exported but their data is missing" + + con.execute( + "UPDATE info SET blocks_data_export_done = TRUE", + ) + con.commit() + + await output_db_con.execute(_write_sqlite_db) diff --git a/server/tests/sequester_export/__init__.py b/server/tests/sequester_export/__init__.py new file mode 100644 index 00000000000..05e02a3b569 --- /dev/null +++ b/server/tests/sequester_export/__init__.py @@ -0,0 +1 @@ +# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS diff --git a/server/tests/sequester_export/sample_export.sqlite b/server/tests/sequester_export/sample_export.sqlite new file mode 100644 index 00000000000..a1b665de0eb Binary files /dev/null and b/server/tests/sequester_export/sample_export.sqlite differ diff --git a/server/tests/sequester_export/test_export.py b/server/tests/sequester_export/test_export.py new file mode 100644 index 00000000000..b43f1588f85 --- /dev/null +++ b/server/tests/sequester_export/test_export.py @@ -0,0 +1,734 @@ +# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS + +import sqlite3 +from dataclasses import dataclass +from pathlib import Path +from shutil import copyfile +from unittest.mock import ANY + +import anyio +import pytest + +from parsec._parsec import ( + BlockID, + DateTime, + DeviceID, + OrganizationID, + RealmRole, + RealmRoleCertificate, + VerifyKey, + VlobID, +) +from parsec._parsec import testbed as tb +from parsec.ballpark import BALLPARK_CLIENT_LATE_OFFSET +from parsec.realm_export import ( + ExportProgressStep, + RealmExporterInputError, + RealmExporterOutputDbError, + export_realm, +) +from tests.common import ( + Backend, + MinimalorgRpcClients, + SequesteredOrgRpcClients, + WorkspaceHistoryOrgRpcClients, +) + +# Sample export has been generated by taking the export of test `test_export_ok_non_sequestered[current_snapshot]` +SAMPLE_EXPORT_SNAPSHOT_TIMESTAMP = DateTime(2025, 1, 10, 15, 37, 00, 105938) +SAMPLE_EXPORT_REALM_ID = VlobID.from_hex("f0000000000000000000000000000008") + + +@pytest.fixture +def sample_export_db_path( + workspace_history_org: WorkspaceHistoryOrgRpcClients, tmp_path: Path +) -> Path: + output_db_path = ( + tmp_path + / f"parsec-export-{workspace_history_org.organization_id.str}-realm-{SAMPLE_EXPORT_REALM_ID.hex}-{SAMPLE_EXPORT_SNAPSHOT_TIMESTAMP.to_rfc3339()}.sqlite" + ) + + # We copy the file before providing it to the test to ensure the original file is not modified + copyfile(Path(__file__).parent / "sample_export.sqlite", output_db_path) + + # Also we patch the export so that is has the same organization ID as the one we want to test + # (since the organization ID changes between tests) + con = sqlite3.connect(output_db_path) + con.execute("UPDATE info SET organization_id = ?", (workspace_history_org.organization_id.str,)) + con.commit() + con.close() + + return output_db_path + + +@dataclass(slots=True) +class ExportExpected: + realm_id: VlobID + snapshot_timestamp: DateTime + root_verify_key: VerifyKey + common_certificates: list[bytes] + sequester_certificates: list[bytes] + realm_certificates: list[bytes] + # List of (vlob_id, version, key_index, blob, size, author, timestamp) + vlobs: list[tuple[VlobID, int, int, bytes, int, DeviceID, DateTime]] + # List of (block_id, author, size, key_index, data) + blocks: list[tuple[BlockID, DeviceID, int, int, bytes]] + + +def extract_export_expected_from_template( + testbed_template: tb.TestbedTemplateContent, + realm_id: VlobID, + snapshot_timestamp: DateTime, +) -> ExportExpected: + root_verify_key: VerifyKey | None = None + common_certificates: list[bytes] = [] + sequester_certificates: list[bytes] = [] + realm_certificates: list[bytes] = [] + vlobs: list[tuple[VlobID, int, int, bytes, int, DeviceID, DateTime]] = [] + blocks: list[tuple[BlockID, DeviceID, int, int, bytes]] = [] + + for event in testbed_template.events: + match event: + case tb.TestbedEventBootstrapOrganization(): + common_certificates.append(event.first_user_raw_certificate) + common_certificates.append(event.first_user_first_device_raw_certificate) + root_verify_key = event.root_signing_key.verify_key + if ( + event.sequester_authority_raw_certificate + and event.timestamp <= snapshot_timestamp + ): + sequester_certificates.append(event.sequester_authority_raw_certificate) + case tb.TestbedEventNewSequesterService(): + if event.timestamp <= snapshot_timestamp: + sequester_certificates.append(event.raw_certificate) + case tb.TestbedEventRevokeSequesterService(): + if event.timestamp <= snapshot_timestamp: + sequester_certificates.append(event.raw_certificate) + case tb.TestbedEventNewUser(): + if event.timestamp <= snapshot_timestamp: + common_certificates.append(event.user_raw_certificate) + common_certificates.append(event.first_device_raw_certificate) + case tb.TestbedEventNewDevice(): + if event.timestamp <= snapshot_timestamp: + common_certificates.append(event.raw_certificate) + case tb.TestbedEventUpdateUserProfile(): + if event.timestamp <= snapshot_timestamp: + common_certificates.append(event.raw_certificate) + case tb.TestbedEventRevokeUser(): + if event.timestamp <= snapshot_timestamp: + common_certificates.append(event.raw_certificate) + case tb.TestbedEventNewRealm(): + if event.realm_id == realm_id and event.timestamp <= snapshot_timestamp: + realm_certificates.append(event.raw_certificate) + case tb.TestbedEventShareRealm(): + if event.realm == realm_id and event.timestamp <= snapshot_timestamp: + realm_certificates.append(event.raw_certificate) + case tb.TestbedEventRenameRealm(): + if event.realm == realm_id and event.timestamp <= snapshot_timestamp: + realm_certificates.append(event.raw_certificate) + case tb.TestbedEventRotateKeyRealm(): + if event.realm == realm_id and event.timestamp <= snapshot_timestamp: + realm_certificates.append(event.raw_certificate) + case tb.TestbedEventArchiveRealm(): + if event.realm == realm_id and event.timestamp <= snapshot_timestamp: + realm_certificates.append(event.raw_certificate) + case tb.TestbedEventCreateOrUpdateOpaqueVlob(): + if event.realm == realm_id and event.timestamp <= snapshot_timestamp: + vlobs.append( + ( + event.vlob_id, + event.version, + event.key_index, + event.encrypted, + len(event.encrypted), + event.author, + event.timestamp, + ) + ) + case tb.TestbedEventCreateBlock(): + if event.realm == realm_id and event.timestamp <= snapshot_timestamp: + blocks.append( + ( + event.block_id, + event.author, + len(event.encrypted), + event.key_index, + event.encrypted, + ) + ) + case tb.TestbedEventCreateOpaqueBlock(): + if event.realm == realm_id and event.timestamp <= snapshot_timestamp: + blocks.append( + ( + event.block_id, + event.author, + len(event.encrypted), + event.key_index, + event.encrypted, + ) + ) + # Other events don't produce data that is exported + case ( + tb.TestbedEventNewShamirRecoveryInvitation() + | tb.TestbedEventNewUserInvitation() + | tb.TestbedEventNewDeviceInvitation() + | tb.TestbedEventNewShamirRecovery() + | tb.TestbedEventDeleteShamirRecovery() + | tb.TestbedEventUpdateOrganization() + | tb.TestbedEventFreezeUser() + ): + pass + + assert root_verify_key is not None + + return ExportExpected( + realm_id=realm_id, + snapshot_timestamp=snapshot_timestamp, + root_verify_key=root_verify_key, + common_certificates=common_certificates, + sequester_certificates=sequester_certificates, + realm_certificates=realm_certificates, + vlobs=vlobs, + blocks=blocks, + ) + + +def check_export_content( + output_db_path: Path, export_expected: ExportExpected, expected_organization_id: OrganizationID +): + con = sqlite3.connect(output_db_path) + + # Check export content: info table + + row = con.execute(""" + SELECT + magic, + version, + organization_id, + realm_id, + root_verify_key, + snapshot_timestamp, + certificates_export_done, + vlobs_export_done, + blocks_metadata_export_done, + blocks_data_export_done + FROM info + """).fetchone() + assert row is not None + ( + export_magic, + export_version, + export_organization_id, + export_realm_id, + export_root_verify_key, + export_snapshot_timestamp, + export_certificates_export_done, + export_vlobs_export_done, + export_blocks_metadata_export_done, + export_blocks_data_export_done, + ) = row + assert export_magic == 87948 # Export format magic number + assert export_version == 1 # Export format version + assert export_organization_id == expected_organization_id.str + assert export_realm_id == export_expected.realm_id.bytes + assert export_root_verify_key == export_expected.root_verify_key.encode() + assert export_snapshot_timestamp == export_expected.snapshot_timestamp.as_timestamp_micros() + assert export_certificates_export_done == 1 + assert export_vlobs_export_done == 1 + assert export_blocks_metadata_export_done == 1 + assert export_blocks_data_export_done == 1 + + # Check export content: certificate tables + + rows = con.execute("SELECT certificate FROM common_certificate").fetchall() + assert [row[0] for row in rows] == export_expected.common_certificates + + rows = con.execute("SELECT certificate FROM sequester_certificate").fetchall() + assert [row[0] for row in rows] == export_expected.sequester_certificates + + rows = con.execute("SELECT certificate FROM realm_certificate").fetchall() + assert [row[0] for row in rows] == export_expected.realm_certificates + + # Check export content: vlobs table + + vlobs = con.execute(""" + SELECT + sequential_id, + vlob_id, + version, + key_index, + blob, + size, + author, + timestamp + FROM vlob_atom + ORDER BY sequential_id + """).fetchall() + + # Ensure `sequential_id` and `timestamp` are consistent, this is not mandatory in + # production (since two concurrent vlobs can have the same timestamp), however + # it is the case in our testbed data. So it's an interesting check anyway ;-) + + last_sequential_id = None + last_timestamp = None + for sequential_id, _, _, _, _, _, _, timestamp in vlobs: + if last_sequential_id is not None: + assert sequential_id > last_sequential_id + + if last_timestamp is not None: + assert timestamp >= last_timestamp + + last_sequential_id = sequential_id + last_timestamp = timestamp + + assert [ + ( + # Ignore `sequential_id` since it actual value depends on how the insertion was + # done (e.g. PostgreSQL autoincrement index skipped due to transaction rollback) + VlobID.from_bytes(row[1]), # vlob_id + row[2], # version + row[3], # key_index + row[4], # blob + row[5], # size + DeviceID.from_bytes(row[6]), # author + DateTime.from_timestamp_micros(row[7]), # timestamp + ) + for row in vlobs + ] == export_expected.vlobs + + # Check export content: block tables + + blocks = con.execute(""" + SELECT + block.sequential_id, + block.block_id, + block.author, + block.size, + block.key_index, + block_data.data + FROM block LEFT OUTER JOIN block_data ON block.sequential_id = block_data.block + ORDER BY block.sequential_id + """).fetchall() + + assert [ + ( + # Ignore `sequential_id` since it actual value depends on how the insertion was + # done (e.g. PostgreSQL autoincrement index skipped due to transaction rollback) + BlockID.from_bytes(row[1]), # block_id + DeviceID.from_bytes(row[2]), # author + row[3], # size + row[4], # key_index + row[5], # data + ) + for row in blocks + ] == export_expected.blocks + + +@pytest.mark.parametrize("kind", ["current_snapshot", "past_snapshot"]) +async def test_export_ok_non_sequestered( + workspace_history_org: WorkspaceHistoryOrgRpcClients, + backend: Backend, + tmp_path: Path, + kind: str, + xfail_if_postgresql: None, +): + match kind: + case "current_snapshot": + expected_snapshot_timestamp = DateTime.now().subtract( + seconds=BALLPARK_CLIENT_LATE_OFFSET + ) + case "past_snapshot": + expected_snapshot_timestamp = DateTime(2001, 1, 7) + case unknown: + assert False, unknown + + output_db_path = tmp_path / "output.sqlite" + + on_progress_events: list[ExportProgressStep] = [] + + def _on_progress(event: ExportProgressStep): + on_progress_events.append(event) + + await export_realm( + backend=backend, + organization_id=workspace_history_org.organization_id, + realm_id=workspace_history_org.wksp1_id, + output_db_path=output_db_path, + snapshot_timestamp=expected_snapshot_timestamp, + on_progress=_on_progress, + ) + + # Check on progress events + + assert on_progress_events[0] == "certificates_start" + assert on_progress_events[1] == "certificates_done" + end_of_vlobs_index = next( + i + 2 for i, (e, *_) in enumerate(on_progress_events[2:]) if e != "vlobs" + ) + end_of_blocks_metadata_index = next( + i + end_of_vlobs_index + for i, (e, *_) in enumerate(on_progress_events[end_of_vlobs_index:]) + if e != "blocks_metadata" + ) + # Ensure exported bytes is strictly growing + vlobs_events = on_progress_events[2:end_of_vlobs_index] + assert sorted(vlobs_events) == vlobs_events + blocks_metadata_events = on_progress_events[end_of_vlobs_index:end_of_blocks_metadata_index] + assert sorted(blocks_metadata_events) == blocks_metadata_events + blocks_data_events = on_progress_events[end_of_blocks_metadata_index:] + assert sorted(blocks_data_events) == blocks_data_events + + # Check output database + + assert output_db_path.is_file() + + export_expected = extract_export_expected_from_template( + workspace_history_org.testbed_template, + workspace_history_org.wksp1_id, + expected_snapshot_timestamp, + ) + check_export_content(output_db_path, export_expected, workspace_history_org.organization_id) + + +@pytest.mark.parametrize("kind", ["current_snapshot", "past_snapshot"]) +async def test_export_ok_sequestered( + sequestered_org: SequesteredOrgRpcClients, + backend: Backend, + tmp_path: Path, + kind: str, + xfail_if_postgresql: None, +): + match kind: + case "current_snapshot": + expected_snapshot_timestamp = DateTime.now().subtract( + seconds=BALLPARK_CLIENT_LATE_OFFSET + ) + case "past_snapshot": + # We want our snapshot to export up to when `sequester_service_1` is + # revoked, so the certificate about `sequester_service_2` creation should + # be ignored (see documentation of the `sequestered` testbed template). + events = iter(sequestered_org.testbed_template.events) + expected_snapshot_timestamp = next( + (e for e in events if isinstance(e, tb.TestbedEventRevokeSequesterService)) + ).timestamp + # Sanity check to ensure our snapshot will skip some data + assert ( + len([e for e in events if isinstance(e, tb.TestbedEventNewSequesterService)]) == 1 + ) + case unknown: + assert False, unknown + + output_db_path = tmp_path / "output.sqlite" + + await export_realm( + backend=backend, + organization_id=sequestered_org.organization_id, + realm_id=sequestered_org.wksp1_id, + output_db_path=output_db_path, + snapshot_timestamp=expected_snapshot_timestamp, + on_progress=lambda x: None, + ) + + assert output_db_path.is_file() + + export_expected = extract_export_expected_from_template( + sequestered_org.testbed_template, + sequestered_org.wksp1_id, + expected_snapshot_timestamp, + ) + check_export_content(output_db_path, export_expected, sequestered_org.organization_id) + + +async def test_restart_partially_exported( + workspace_history_org: WorkspaceHistoryOrgRpcClients, + backend: Backend, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + xfail_if_postgresql: None, +): + expected_snapshot_timestamp = DateTime.now().subtract(seconds=BALLPARK_CLIENT_LATE_OFFSET) + + output_db_path = tmp_path / "output.sqlite" + + monkeypatch.setattr("parsec.realm_export.VLOB_EXPORT_BATCH_SIZE", 3) + monkeypatch.setattr("parsec.realm_export.BLOCK_METADATA_EXPORT_BATCH_SIZE", 3) + + steps_to_cancel = iter( + ( + "certificates_start", + "certificates_done", + ("vlobs", 0, ANY), + lambda e: e[0] == "vlobs" and e[1] > 0, + ("blocks_metadata", 0, ANY), + lambda e: e[0] == "blocks_metadata" and e[1] > 0, + ("blocks_data", 0, ANY), + lambda e: e[0] == "blocks_data" and e[1] > 0, + ) + ) + + while True: + cancel_on_event = next(steps_to_cancel, None) + with anyio.CancelScope() as cancel_scope: + # Cancel the export at multiple different steps + def _on_progress(event: ExportProgressStep): + if cancel_on_event is None: + return + elif callable(cancel_on_event) and cancel_on_event(event): + cancel_scope.cancel() + elif cancel_on_event == event: + cancel_scope.cancel() + + await export_realm( + backend=backend, + organization_id=workspace_history_org.organization_id, + realm_id=workspace_history_org.wksp1_id, + output_db_path=output_db_path, + snapshot_timestamp=expected_snapshot_timestamp, + on_progress=_on_progress, + ) + + # Finally we finished without being cancelled + break + + assert next(steps_to_cancel, None) is None + assert output_db_path.is_file() + + export_expected = extract_export_expected_from_template( + workspace_history_org.testbed_template, + workspace_history_org.wksp1_id, + expected_snapshot_timestamp, + ) + check_export_content(output_db_path, export_expected, workspace_history_org.organization_id) + + +async def test_re_export_is_noop( + workspace_history_org: WorkspaceHistoryOrgRpcClients, + backend: Backend, + sample_export_db_path: Path, + xfail_if_postgresql: None, +): + output_db_stat = sample_export_db_path.stat() + + await export_realm( + backend=backend, + organization_id=workspace_history_org.organization_id, + realm_id=workspace_history_org.wksp1_id, + output_db_path=sample_export_db_path, + snapshot_timestamp=SAMPLE_EXPORT_SNAPSHOT_TIMESTAMP, + on_progress=lambda x: None, + ) + + new_output_db_stat = sample_export_db_path.stat() + assert new_output_db_stat.st_mtime_ns == output_db_stat.st_mtime_ns + assert new_output_db_stat.st_size == output_db_stat.st_size + + +@pytest.mark.parametrize( + "kind", + [ + "organization_id", + "realm_id", + "snapshot_timestamp", + "root_verify_key", + "vlobs_total_bytes", + "blocks_total_bytes", + ], +) +async def test_re_export_with_different_params( + workspace_history_org: WorkspaceHistoryOrgRpcClients, + minimalorg: MinimalorgRpcClients, + backend: Backend, + sample_export_db_path: Path, + kind: str, + xfail_if_postgresql: None, +): + snapshot_timestamp = SAMPLE_EXPORT_SNAPSHOT_TIMESTAMP + organization_id = workspace_history_org.organization_id + realm_id = SAMPLE_EXPORT_REALM_ID + match kind: + case "organization_id": + organization_id = minimalorg.organization_id + expected_error_msg = f"Existing output export database is for a different realm: got `{workspace_history_org.organization_id}` instead of expected `{organization_id}`" + # Export operation checks the organization & realm exist before reading + # the existing export database, so we need to create a realm with the + # corresponding realm ID in the different organization. + now = SAMPLE_EXPORT_SNAPSHOT_TIMESTAMP.subtract(seconds=1) + await backend.realm.create( + now=now, + organization_id=minimalorg.organization_id, + author=minimalorg.alice.device_id, + author_verify_key=minimalorg.alice.signing_key.verify_key, + realm_role_certificate=RealmRoleCertificate( + author=minimalorg.alice.device_id, + timestamp=now, + realm_id=realm_id, + user_id=minimalorg.alice.user_id, + role=RealmRole.OWNER, + ).dump_and_sign(minimalorg.alice.signing_key), + ) + + case "realm_id": + other_realm_id = VlobID.new() + now = SAMPLE_EXPORT_SNAPSHOT_TIMESTAMP.subtract(seconds=1) + await backend.realm.create( + now=now, + organization_id=workspace_history_org.organization_id, + author=workspace_history_org.alice.device_id, + author_verify_key=workspace_history_org.alice.signing_key.verify_key, + realm_role_certificate=RealmRoleCertificate( + author=workspace_history_org.alice.device_id, + timestamp=now, + realm_id=other_realm_id, + user_id=workspace_history_org.alice.user_id, + role=RealmRole.OWNER, + ).dump_and_sign(workspace_history_org.alice.signing_key), + ) + realm_id = other_realm_id + expected_error_msg = f"Existing output export database is for a different realm: got `0x{SAMPLE_EXPORT_REALM_ID.hex}` instead of expected `0x{other_realm_id.hex}`" + + case "snapshot_timestamp": + snapshot_timestamp = DateTime(2020, 1, 1) + expected_error_msg = "Existing output export database is for a different timestamp: got `2025-01-10T15:37:00.105938Z` instead of expected `2020-01-01T00:00:00Z`" + + case "root_verify_key": + con = sqlite3.connect(sample_export_db_path) + con.execute("UPDATE info SET root_verify_key = ?", (b"\x01" * 32,)) + con.commit() + con.close() + expected_error_msg = "Existing output export database is for a different realm: realm ID `0xf0000000000000000000000000000008` is the same but root verify key differs" + + case "vlobs_total_bytes": + con = sqlite3.connect(sample_export_db_path) + con.execute("UPDATE info SET vlobs_total_bytes = vlobs_total_bytes + 1") + con.commit() + con.close() + expected_error_msg = "Existing output export database doesn't match: realm ID `0xf0000000000000000000000000000008` and snapshot timestamp `2025-01-10T15:37:00.105938Z` are the same, but vlobs total bytes differs" + + case "blocks_total_bytes": + con = sqlite3.connect(sample_export_db_path) + con.execute("UPDATE info SET blocks_total_bytes = blocks_total_bytes + 1") + con.commit() + con.close() + expected_error_msg = "Existing output export database doesn't match: realm ID `0xf0000000000000000000000000000008` and snapshot timestamp `2025-01-10T15:37:00.105938Z` are the same, but blocks total bytes differs" + + case unknown: + assert False, unknown + + with pytest.raises(RealmExporterOutputDbError) as exc: + await export_realm( + backend=backend, + organization_id=organization_id, + realm_id=realm_id, + output_db_path=sample_export_db_path, + snapshot_timestamp=snapshot_timestamp, + on_progress=lambda x: None, + ) + + assert str(exc.value) == expected_error_msg + + +async def test_export_organization_not_found( + backend: Backend, tmp_path: Path, xfail_if_postgresql: None +): + output_db_path = tmp_path / "output.sqlite" + + with pytest.raises(RealmExporterInputError) as exc: + await export_realm( + backend=backend, + organization_id=OrganizationID("Dummy"), + realm_id=VlobID.new(), + output_db_path=output_db_path, + snapshot_timestamp=DateTime.now().subtract(seconds=BALLPARK_CLIENT_LATE_OFFSET), + on_progress=lambda x: None, + ) + assert str(exc.value) == "Organization `Dummy` doesn't exists" + + +async def test_export_realm_not_found( + workspace_history_org: WorkspaceHistoryOrgRpcClients, + backend: Backend, + tmp_path: Path, + xfail_if_postgresql: None, +): + output_db_path = tmp_path / "output.sqlite" + + with pytest.raises(RealmExporterInputError) as exc: + await export_realm( + backend=backend, + organization_id=workspace_history_org.organization_id, + realm_id=VlobID.from_hex("aec70837083b48c98d6b305f608975b3"), + output_db_path=output_db_path, + snapshot_timestamp=DateTime.now().subtract(seconds=BALLPARK_CLIENT_LATE_OFFSET), + on_progress=lambda x: None, + ) + assert ( + str(exc.value) + == f"Realm `aec70837083b48c98d6b305f608975b3` doesn't exist in organization `{workspace_history_org.organization_id}`" + ) + + +@pytest.mark.parametrize("kind", ["in_the_future", "too_close_to_present"]) +async def test_export_snapshot_timestamp_in_the_future( + workspace_history_org: WorkspaceHistoryOrgRpcClients, + backend: Backend, + tmp_path: Path, + kind: str, + xfail_if_postgresql: None, +): + match kind: + case "in_the_future": + bad_snapshot_timestamp = DateTime.now().add(seconds=1) + case "too_close_to_present": + bad_snapshot_timestamp = ( + DateTime.now().subtract(seconds=BALLPARK_CLIENT_LATE_OFFSET).add(seconds=1) + ) + case unknown: + assert False, unknown + + output_db_path = tmp_path / "output.sqlite" + + with pytest.raises(RealmExporterInputError) as exc: + await export_realm( + backend=backend, + organization_id=workspace_history_org.organization_id, + realm_id=workspace_history_org.wksp1_id, + output_db_path=output_db_path, + snapshot_timestamp=bad_snapshot_timestamp, + on_progress=lambda x: None, + ) + assert str(exc.value).startswith( + "Snapshot timestamp cannot be more recent than present time - 320s (i.e. `" + ) + + +async def test_export_snapshot_timestamp_older_than_realm_creation( + workspace_history_org: WorkspaceHistoryOrgRpcClients, + backend: Backend, + tmp_path: Path, + xfail_if_postgresql: None, +): + wksp1_created_on = next( + ( + e + for e in workspace_history_org.testbed_template.events + if isinstance(e, tb.TestbedEventNewRealm) + and e.realm_id == workspace_history_org.wksp1_id + ) + ).timestamp + bad_snapshot_timestamp = wksp1_created_on.subtract(seconds=1) + + output_db_path = tmp_path / "output.sqlite" + + with pytest.raises(RealmExporterInputError) as exc: + await export_realm( + backend=backend, + organization_id=workspace_history_org.organization_id, + realm_id=workspace_history_org.wksp1_id, + output_db_path=output_db_path, + snapshot_timestamp=bad_snapshot_timestamp, + on_progress=lambda x: None, + ) + assert ( + str(exc.value) + == "Requested snapshot timestamp `2000-12-31T23:59:59Z` is older than realm creation" + )