diff --git a/server/parsec/components/memory/realm.py b/server/parsec/components/memory/realm.py index 95d26b92c97..6a80a892d10 100644 --- a/server/parsec/components/memory/realm.py +++ b/server/parsec/components/memory/realm.py @@ -34,6 +34,10 @@ RealmCreateStoreBadOutcome, RealmCreateValidateBadOutcome, RealmDumpRealmsGrantedRolesBadOutcome, + RealmExportCertificates, + RealmExportDoBaseInfo, + RealmExportDoBaseInfoBadOutcome, + RealmExportDoCertificatesBadOutcome, RealmGetCurrentRealmsForUserBadOutcome, RealmGetKeysBundleBadOutcome, RealmGetStatsAsUserBadOutcome, @@ -817,3 +821,122 @@ async def dump_realms_granted_roles( ) return granted_roles + + @override + async def export_do_base_info( + self, organization_id: OrganizationID, realm_id: VlobID + ) -> RealmExportDoBaseInfo | RealmExportDoBaseInfoBadOutcome: + try: + org = self._data.organizations[organization_id] + except KeyError: + return RealmExportDoBaseInfoBadOutcome.ORGANIZATION_NOT_FOUND + + if not org.is_bootstrapped: + return RealmExportDoBaseInfoBadOutcome.ORGANIZATION_NOT_FOUND + + root_verify_key = org.root_verify_key + assert root_verify_key is not None + + try: + realm = org.realms[realm_id] + except KeyError: + return RealmExportDoBaseInfoBadOutcome.REALM_NOT_FOUND + + return RealmExportDoBaseInfo( + root_verify_key=root_verify_key, + ) + + @override + async def export_do_certificates( + self, organization_id: OrganizationID, realm_id: VlobID, snapshot_timestamp: DateTime + ) -> RealmExportCertificates | RealmExportDoCertificatesBadOutcome: + try: + org = self._data.organizations[organization_id] + except KeyError: + return RealmExportDoCertificatesBadOutcome.ORGANIZATION_NOT_FOUND + + try: + realm = org.realms[realm_id] + except KeyError: + return RealmExportDoCertificatesBadOutcome.REALM_NOT_FOUND + + # 1) Common certificates (i.e. user/device/revoked/update) + + # Certificates must be returned ordered by timestamp, however there is a trick + # for the common certificates: when a new user is created, the corresponding + # user and device certificates have the same timestamp, but we must return + # the user certificate first (given device references the user). + # So to achieve this we use a tuple (timestamp, priority, certificate) where + # only the first two field should be used for sorting (the priority field + # handling the case where user and device have the same timestamp). + + common_certificates_unordered: list[tuple[DateTime, int, bytes]] = [] + for user in org.users.values(): + common_certificates_unordered.append((user.cooked.timestamp, 0, user.user_certificate)) + + if user.is_revoked: + assert user.cooked_revoked is not None + assert user.revoked_user_certificate is not None + common_certificates_unordered.append( + (user.cooked_revoked.timestamp, 1, user.revoked_user_certificate) + ) + + for update in user.profile_updates: + common_certificates_unordered.append( + (update.cooked.timestamp, 1, update.user_update_certificate) + ) + + for device in org.devices.values(): + common_certificates_unordered.append( + (device.cooked.timestamp, 1, device.device_certificate) + ) + + common_certificates = [ + c for ts, _, c in sorted(common_certificates_unordered) if ts <= snapshot_timestamp + ] + + # 2) Sequester certificates + + sequester_certificates: list[bytes] = [] + if org.sequester_authority_certificate is not None: + assert org.cooked_sequester_authority is not None + assert org.sequester_services is not None + + sequester_certificates_unordered: list[tuple[DateTime, bytes]] = [] + sequester_certificates_unordered.append( + (org.cooked_sequester_authority.timestamp, org.sequester_authority_certificate) + ) + sequester_certificates_unordered += [ + (service.cooked.timestamp, service.sequester_service_certificate) + for service in org.sequester_services.values() + ] + + sequester_certificates = [ + c for ts, c in sorted(sequester_certificates_unordered) if ts <= snapshot_timestamp + ] + + # 3) Realm certificates + + # Collect all the certificates related to the realm + realm_certificates_unordered: list[tuple[DateTime, bytes]] = [] + realm_certificates_unordered += [ + (role.cooked.timestamp, role.realm_role_certificate) for role in realm.roles + ] + realm_certificates_unordered += [ + (role.cooked.timestamp, role.realm_key_rotation_certificate) + for role in realm.key_rotations + ] + realm_certificates_unordered += [ + (role.cooked.timestamp, role.realm_name_certificate) for role in realm.renames + ] + # TODO: support archiving here ! + + realm_certificates = [ + c for ts, c in sorted(realm_certificates_unordered) if ts <= snapshot_timestamp + ] + + return RealmExportCertificates( + common_certificates=common_certificates, + sequester_certificates=sequester_certificates, + realm_certificates=realm_certificates, + ) diff --git a/server/parsec/components/realm.py b/server/parsec/components/realm.py index b9351350416..d3345d97b74 100644 --- a/server/parsec/components/realm.py +++ b/server/parsec/components/realm.py @@ -351,59 +351,87 @@ class RealmDumpRealmsGrantedRolesBadOutcome(BadOutcomeEnum): @dataclass(slots=True) class RealmExportDoBaseInfo: root_verify_key: VerifyKey - common_certificates: list[bytes] - realm_certificates: list[bytes] - vlob_upper_marker: RealmExportBatchOffsetMarker - vlobs_total: int - block_upper_marker: RealmExportBatchOffsetMarker - blocks_total: int + # Offset marker is basically the internal primary key of the vlob in the PostgreSQL + # database: + # - The primary key is a serial integer that is strictly growing (i.e. the older + # the row the lower the ID). + # - The table contains multiple realms, so the IDs are not growing continuously (also + # the serial type in PostgreSQL give no guarantee on avoiding hole when e.g. a + # transaction is rolled back). + # + # So the idea here is to use this primary key from PostgreSQL as primary in + # our SQLite export. This way we end up with the rows in the correct historical + # order, and also easily know if the export is complete (i.e. if the upper + # bound is part of the export). + vlob_offset_marker_upper_bound: int + block_offset_marker_upper_bound: int + + vlob_items: int + blocks_items: int -class RealmExportDoBaseInfoBadOutcome(BadOutcome): + +class RealmExportDoBaseInfoBadOutcome(BadOutcomeEnum): ORGANIZATION_NOT_FOUND = auto() REALM_NOT_FOUND = auto() @dataclass(slots=True) class RealmExportCertificates: - # List of (, ) - realm_role_certificates: list[tuple[int, bytes]] - - # List of (, , ) - user_certificates: list[tuple[int, bytes, bytes | None]] - - # List of (, ) - user_update_certificates: list[tuple[int, bytes]] - - # List of (, ) - device_certificates: list[tuple[int, bytes]] + common_certificates: list[bytes] + sequester_certificates: list[bytes] + realm_certificates: list[bytes] -class RealmExportDoCertificatesBadOutcome(BadOutcome): +class RealmExportDoCertificatesBadOutcome(BadOutcomeEnum): ORGANIZATION_NOT_FOUND = auto() REALM_NOT_FOUND = auto() +@dataclass(slots=True) +class RealmExportVlobsBatchItem: + realm_vlob_update_index: int + vlob_id: VlobID + version: int + key_index: int + blob: bytes + size: int + author: DeviceID + timestamp: DateTime + + @dataclass(slots=True) class RealmExportVlobsBatch: batch_offset_marker: RealmExportBatchOffsetMarker - # List of (, , , , , ) - vlobs: list[tuple[int, VlobID, int, bytes, int, DateTime]] + items: list[RealmExportVlobsBatchItem] -class RealmExportDoVlobsBatchBadOutcome(BadOutcome): +class RealmExportDoVlobsBatchBadOutcome(BadOutcomeEnum): ORGANIZATION_NOT_FOUND = auto() REALM_NOT_FOUND = auto() @dataclass(slots=True) -class RealmExportBlocksBatch: +class RealmExportBlocksBatchItem: + id: int + block_id: BlockID + author: DeviceID + key_index: int + size: int + + +@dataclass(slots=True) +class RealmExportBlocksMetadataBatch: batch_offset_marker: RealmExportBatchOffsetMarker - # List of (, , , ) - blocks: list[tuple[int, BlockID, bytes, int]] + items: list[RealmExportBlocksBatchItem] + + +class RealmExportDoBlocksBatchMetadatBadOutcome(BadOutcomeEnum): + ORGANIZATION_NOT_FOUND = auto() + REALM_NOT_FOUND = auto() -class RealmExportDoBlocksBatchBadOutcome(BadOutcome): +class RealmExportDoBlocksDataBadOutcome(BadOutcomeEnum): ORGANIZATION_NOT_FOUND = auto() REALM_NOT_FOUND = auto() @@ -544,18 +572,31 @@ async def export_do_base_info( raise NotImplementedError async def export_do_certificates( - self, organization_id: OrganizationID, realm_id: VlobID + self, organization_id: OrganizationID, realm_id: VlobID, snapshot_timestamp: DateTime ) -> RealmExportCertificates | RealmExportDoCertificatesBadOutcome: raise NotImplementedError async def export_do_vlobs_batch( - self, batch_offset_marker: RealmExportBatchOffsetMarker, batch_size: int = 1000 + self, + organization_id: OrganizationID, + realm_id: VlobID, + batch_offset_marker: RealmExportBatchOffsetMarker, + batch_size: int, ) -> RealmExportVlobsBatch | RealmExportDoVlobsBatchBadOutcome: raise NotImplementedError - async def export_do_blocks_batch( - self, batch_offset_marker: RealmExportBatchOffsetMarker, batch_size: int = 1000 - ) -> RealmExportBlocksBatch | RealmExportDoBlocksBatchBadOutcome: + async def export_do_blocks_metadata_batch( + self, + organization_id: OrganizationID, + realm_id: VlobID, + batch_offset_marker: RealmExportBatchOffsetMarker, + batch_size: int = 1000, + ) -> RealmExportBlocksMetadataBatch | RealmExportDoBlocksBatchMetadatBadOutcome: + raise NotImplementedError + + async def export_do_blocks_data( + self, organization_id: OrganizationID, realm_id: VlobID, block_id: BlockID + ) -> bytes | RealmExportDoBlocksDataBadOutcome: raise NotImplementedError # diff --git a/server/parsec/realm_export.py b/server/parsec/realm_export.py index 7c10db7ddc3..8e2760db397 100644 --- a/server/parsec/realm_export.py +++ b/server/parsec/realm_export.py @@ -6,13 +6,17 @@ from pathlib import Path from typing import Any, Callable -from parsec._parsec import OrganizationID, VlobID +from parsec._parsec import DateTime, OrganizationID, VlobID from parsec.backend import Backend from parsec.components.realm import ( + RealmExportBlocksMetadataBatch, RealmExportCertificates, RealmExportDoBaseInfo, RealmExportDoBaseInfoBadOutcome, + RealmExportDoBlocksBatchMetadatBadOutcome, RealmExportDoCertificatesBadOutcome, + RealmExportDoVlobsBatchBadOutcome, + RealmExportVlobsBatch, ) @@ -28,92 +32,110 @@ class RealmExporterOutputDbError(RealmExporterError): pass +# Considering vlob to be a couple of Ko in size, 100k items means under 1Go of data per batch +VLOB_EXPORT_BATCH_SIZE = 100_000 +# Block metada are really small (< 100 bytes) +BLOCK_METADATA_EXPORT_BATCH_SIZE = 1_000_000 + + OUTPUT_DB_MAGIC_NUMBER = 87948 OUTPUT_DB_VERSION = 1 OUTPUT_DB_INIT_QUERY = f""" +------------------------------------------------------------------------- -- Database info +------------------------------------------------------------------------- -- This magic number has two roles: --- 1) it makes unlikely we mistakenly consider an unrelated database as legit --- 2) it acts as a constant ID to easily retrieve the single row in the table +-- 1) It makes unlikely we mistakenly consider an unrelated database as legit. +-- 2) It acts as a constant ID to easily retrieve the single row in the table. CREATE TABLE IF NOT EXISTS info ( magic INTEGER UNIQUE NOT NULL DEFAULT {OUTPUT_DB_MAGIC_NUMBER}, version INTEGER NOT NULL DEFAULT {OUTPUT_DB_VERSION}, + organization_id VARCHAR(32) NOT NULL, realm_id BLOB NOT NULL, root_verify_key BLOB NOT NULL, - export_up_to TIMESTAMPTZ NOT NULL, - certificates_export_done BOOL NOT NULL DEFAULT FALSE, - vlobs_export_done BOOL NOT NULL DEFAULT FALSE, - blocks_export_done BOOL NOT NULL DEFAULT FALSE + + -- The export can be considered to be a snapshot of the realm up to this point in time + -- (Note this doesn't necessarly correspond to when the export has been done). + snapshot_timestamp INTEGER NOT NULL, -- us since UNIX epoch + + certificates_export_done INTEGER NOT NULL DEFAULT 0, -- Boolean + vlobs_export_done INTEGER NOT NULL DEFAULT 0, -- Boolean + blocks_metadata_export_done INTEGER NOT NULL DEFAULT 0, -- Boolean + blocks_data_export_done INTEGER NOT NULL DEFAULT 0 -- Boolean ); --- Metadata & Data export +------------------------------------------------------------------------- +-- Certificates export +------------------------------------------------------------------------- -CREATE TABLE block ( - -- _id is not SERIAL given we will take the one present in the Parsec database - _id PRIMARY KEY, - block_id BLOB NOT NULL, - data BLOB NOT NULL, - author INTEGER REFERENCES device (_id) NOT NULL, +-- Common certificates provided in-order +CREATE TABLE common_certificate ( + _id INTEGER PRIMARY KEY, + certificate BLOB NOT NULL +); + +-- Sequester certificates provided in-order +CREATE TABLE sequester_certificate ( + _id INTEGER PRIMARY KEY, + certificate BLOB NOT NULL +); - UNIQUE(block_id) +-- Realm's certificates provided in-order +CREATE TABLE realm_certificate ( + _id INTEGER PRIMARY KEY, + certificate BLOB NOT NULL ); --- Compared to Parsec's datamodel, we don't store `vlob_encryption_revision` given --- the vlob is provided with third party encryption key only once at creation time +-- Note Shamir recovery and other realms' certificates are not exported +-- since they are unrelated to the realm being exported. + + +------------------------------------------------------------------------- +-- Vlobs export +------------------------------------------------------------------------- + + CREATE TABLE vlob_atom ( - -- We use vlob_update's index as primary key, this is convenient given it makes trivial - -- keeping track of how far we got when restarting an export - _id PRIMARY KEY, - vlob_id BLOB NOT NULL, + -- We use `realm_vlob_update`'s `index` field as primary key. + -- This means the vlob atoms are ordered according to how they got added + -- in the server in the first place. + realm_vlob_update_index INTEGER PRIMARY KEY, + vlob_id BLOB NOT NULL, -- VlobID version INTEGER NOT NULL, + key_index INTEGER NOT NULL, blob BLOB NOT NULL, - -- author/timestamp are required to validate the consistency of blob - -- Care must be taken when exporting this field (and the `device` table) to - -- keep this relationship valid ! - author INTEGER REFERENCES device (_id) NOT NULL, + size INTEGER NOT NULL, + author BLOB NOT NULL, -- DeviceID -- Note this field is called `created_on` in Parsec datamodel, but it correspond -- in fact to the timestamp field in the API ! So we stick with the latter. -- On top of that, unlike PostgreSQL, SQLite doesn't have a TIMESTAMPZ type out - -- of the box so we have to roll our own integer-based format, note this is - -- different than the 8bytes floating point format used in our msgpack-based - -- serialization system (it was a bad idea, see Rust implementation for more info) - timestamp INTEGER NOT NULL, -- us since UNIX epoch - - UNIQUE(vlob_id, version) + -- of the box so we have to roll our own integer-based format. + timestamp INTEGER NOT NULL -- us since UNIX epoch ); --- user/device/realm_role certificates related to the current realm --- --- There is no need for relationship between user/device given all those data --- are small enough to have the script just load them once and kept them in memory --- --- However we cannot just dump all the certificates in a single table given we cannot --- preserve primary key when merging user/device/role tables together. - -CREATE TABLE realm_role ( - _id PRIMARY KEY, - role_certificate BLOB NOT NULL -); -CREATE TABLE user_ ( - _id PRIMARY KEY, - user_certificate BLOB NOT NULL, - revoked_user_certificate BLOB -- NULL if user not revoked -); +------------------------------------------------------------------------- +-- Blocks export +------------------------------------------------------------------------- -CREATE TABLE user_update ( - _id PRIMARY_KEY, - user_update_certificate BLOB NOT NULL + +CREATE TABLE block ( + -- _id is not SERIAL given we will take the one present in the Parsec database + _id INTEGER PRIMARY KEY, + block_id BLOB NOT NULL, + author BLOB NOT NULL, -- DeviceID + size INTEGER NOT NULL, + key_index INTEGER NOT NULL ); -CREATE TABLE device ( - _id PRIMARY KEY, - device_certificate BLOB NOT NULL +CREATE TABLE block_data ( + block INTEGER PRIMARY KEY REFERENCES block(_id), + data BLOB NOT NULL ); """ @@ -123,7 +145,8 @@ def _sqlite_init_db( organization_id: OrganizationID, realm_id: VlobID, root_verify_key: bytes, -) -> tuple[sqlite3.Connection, ExportStatus]: + snapshot_timestamp: DateTime, +) -> sqlite3.Connection: try: con = sqlite3.connect(f"file:{output_db_path}?mode=rw", uri=True, autocommit=False) except sqlite3.Error: @@ -134,8 +157,20 @@ def _sqlite_init_db( # ...and initialize it con.executescript(OUTPUT_DB_INIT_QUERY) con.execute( - "INSERT INTO info (organization_id, realm_id, root_verify_key, export_up_to, export_status) VALUES (?, ?, ?, ?, ?)", - (organization_id.str, realm_id.bytes, root_verify_key, ExportStatus.INITIALIZED.value), + """\ + INSERT INTO info ( + organization_id,\ + realm_id,\ + root_verify_key,\ + snapshot_timestamp\ + ) VALUES (?, ?, ?, ?)\ + """, + ( + organization_id.str, + realm_id.bytes, + root_verify_key, + snapshot_timestamp.as_timestamp_micros(), + ), ) con.commit() except sqlite3.Error as exc: @@ -145,7 +180,7 @@ def _sqlite_init_db( # Export database already exists, we should make sure it format is expected try: row = con.execute( - "SELECT version, realm_id, root_verify_key, export_up_to, export_status FROM info WHERE magic = ?", + "SELECT version, organization_id, realm_id, root_verify_key, snapshot_timestamp FROM info WHERE magic = ?", (OUTPUT_DB_MAGIC_NUMBER,), ).fetchone() except sqlite3.Error as exc: @@ -156,102 +191,153 @@ def _sqlite_init_db( if not row: # `info` table exists and is valid, but magic number doesn't match raise RealmExporterOutputDbError( - "Existing output target is not a valid export database" - ) - db_version, db_realm_id, db_root_verify_key, db_export_up_to, db_export_status = row - if db_version != OUTPUT_DB_VERSION: - raise RealmExporterOutputDbError( - f"Existing output export database version format is not supported: got version `{db_version}` but only version `{OUTPUT_DB_VERSION}` is supported" - ) - if db_realm_id != realm_id.bytes: - raise RealmExporterOutputDbError( - f"Existing output export database is for a different realm: got `{db_realm_id}` instead of expected `{realm_id.bytes}`" - ) - if db_root_verify_key != root_verify_key: - raise RealmExporterOutputDbError( - f"Existing output export database is for a different realm: realm ID `{db_realm_id}` is the same but root verify key differs" + "Existing output target is not a valid export database: no info row" ) - export_status = ExportStatus(db_export_status) + db_version, db_organization_id, db_realm_id, db_root_verify_key, db_snapshot_timestamp = row + + match db_version: + case int(): + if db_version != OUTPUT_DB_VERSION: + raise RealmExporterOutputDbError( + f"Existing output export database version format is not supported: got version `{db_version}` but only version `{OUTPUT_DB_VERSION}` is supported" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.version` value `{unknown!r}` (expected int)" + ) + + match db_organization_id: + case str(): + if db_organization_id != organization_id.str: + raise RealmExporterOutputDbError( + f"Existing output export database is for a different realm: got `{db_organization_id}` instead of expected `{organization_id.str}`" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.organization_id` value `{unknown!r}` (expected str)" + ) + + match db_realm_id: + case bytes(): + if db_realm_id != realm_id.bytes: + raise RealmExporterOutputDbError( + f"Existing output export database is for a different realm: got `0x{db_realm_id.hex()}` instead of expected `0x{realm_id.hex}`" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.realm_id` value `{unknown!r}` (expected bytes)" + ) + + match db_root_verify_key: + case bytes(): + if db_root_verify_key != root_verify_key: + raise RealmExporterOutputDbError( + f"Existing output export database is for a different realm: realm ID `{db_realm_id}` is the same but root verify key differs" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.root_verify_key` value `{unknown!r}` (expected bytes)" + ) + + match db_snapshot_timestamp: + case int(): + if db_snapshot_timestamp != snapshot_timestamp.as_timestamp_micros(): + try: + display_db_snapshot_timestamp = DateTime.from_timestamp_micros( + db_snapshot_timestamp + ) + except ValueError: + display_db_snapshot_timestamp = ( + f"" + ) + raise RealmExporterOutputDbError( + f"Existing output export database is for a different timestamp: got `{display_db_snapshot_timestamp}` instead of expected `{snapshot_timestamp}`" + ) + case unknown: + raise RealmExporterOutputDbError( + f"Existing output target is not a valid export database: invalid `info.snapshot_timestamp` value `{unknown!r}` (expected int)" + ) except: con.close() raise - return (con, export_status) + return con type ProgressReportCallback = Callable async def export_realm( - backend: Backend, - organization_id: OrganizationID, - realm_id: VlobID, - output_db_path: Path, - on_progress: ProgressReportCallback - ): - # 0) Load basic info about the export - - outcome = await backend.realm.export_do_base_info( - organization_id=organization_id, realm_id=realm_id - ) - match outcome: - case RealmExportDoBaseInfo() as base_info: - pass - case RealmExportDoBaseInfoBadOutcome.ORGANIZATION_NOT_FOUND: - raise RealmExporterInputError(f"Organization `{organization_id.str}` doesn't exists") - case RealmExportDoBaseInfoBadOutcome.REALM_NOT_FOUND: - raise RealmExporterInputError( - f"Realm `{realm_id.hex}` doesn't exist in organization `{organization_id.str}`" - ) + backend: Backend, + organization_id: OrganizationID, + realm_id: VlobID, + snapshot_timestamp: DateTime, + output_db_path: Path, + on_progress: ProgressReportCallback, +): + # 0) Ensure the export is possible since organization & realm exist on the server - # 1) Create the output SQLite database (or re-open if it already exists) + outcome = await backend.realm.export_do_base_info( + organization_id=organization_id, realm_id=realm_id + ) + match outcome: + case RealmExportDoBaseInfo() as base_info: + pass + case RealmExportDoBaseInfoBadOutcome.ORGANIZATION_NOT_FOUND: + raise RealmExporterInputError(f"Organization `{organization_id.str}` doesn't exists") + case RealmExportDoBaseInfoBadOutcome.REALM_NOT_FOUND: + raise RealmExporterInputError( + f"Realm `{realm_id.hex}` doesn't exist in organization `{organization_id.str}`" + ) - output_db_con, current_export_status = await asyncio.to_thread( - _sqlite_init_db, - output_db_path=output_db_path, - organization_id=organization_id, - realm_id=realm_id, - root_verify_key=base_info.root_verify_key.encode(), - ) + # 1) Create the output SQLite database (or re-open if it already exists) + + output_db_con = await asyncio.to_thread( + _sqlite_init_db, + output_db_path=output_db_path, + organization_id=organization_id, + realm_id=realm_id, + root_verify_key=base_info.root_verify_key.encode(), + snapshot_timestamp=snapshot_timestamp, + ) - while True: + # 2) Export certificates - match current_export_status: - case ExportStatus.INITIALIZED: - # 2) Export certificates - current_export_status = await _do_export_certificates(backend, output_db_con, on_progress, organization_id, realm_id) + await _do_export_certificates( + backend, output_db_con, organization_id, realm_id, snapshot_timestamp + ) - case ExportStatus.CERTIFICATES_EXPORTED: - # 3) Export vlobs - current_export_status = await _do_export_vlobs(backend, output_db_con, on_progress) + # 3) Export vlobs - # TODO + await _do_export_vlobs(backend, output_db_con, organization_id, realm_id, snapshot_timestamp) - current_export_status = ExportStatus.VLOBS_EXPORTED - await asyncio.to_thread(_sqlite_update_export_status, output_db_con, current_export_status) + # # 4) Export blocks metadata - case ExportStatus.VLOBS_EXPORTED: - # 3) Export blocks - current_export_status = await _do_export_blocks(backend, output_db_con, on_progress) + # await _do_export_blocks_metadata(backend, output_db_con, organization_id, realm_id) - # TODO + # # 4) Export blocks data - current_export_status = ExportStatus.DONE - await asyncio.to_thread(_sqlite_update_export_status, output_db_con, current_export_status) + # await _do_export_blocks_data(backend, output_db_con, organization_id, realm_id) - case ExportStatus.DONE: - # 4) All done \o/ - break + # 5) All done \o/ -async def _do_export_certificates(backend: Backend, output_db_con: sqlite3.Connection, organization_id: OrganizationID, realm_id: VlobID) -> None: - # Skip the operation if the export database already contains it +async def _do_export_certificates( + backend: Backend, + output_db_con: sqlite3.Connection, + organization_id: OrganizationID, + realm_id: VlobID, + snapshot_timestamp: DateTime, +) -> None: + # 0) Skip the operation if the export database already contains it def _get_certificates_export_done() -> Any: - return output_db_con.execute( + row = output_db_con.execute( "SELECT certificates_export_done FROM info", ).fetchone() + return bool(row[0]) + certificates_export_done = await asyncio.to_thread(_get_certificates_export_done) match certificates_export_done: case True: @@ -261,59 +347,283 @@ def _get_certificates_export_done() -> Any: # Export needed pass case unknown: - raise RealmExporterOutputDbError(f"Output export database appears to be corrupted: `certificates_export_done` contains unexpected value {unknown:r}") + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `certificates_export_done` contains unexpected value `{unknown!r}`" + ) - # Fetch certificates + # 1) Fetch all certificates - # TODO: limit certificates to a certain date ? - outcome = await backend.realm.export_do_certificates(organization_id=organization_id, realm_id=realm_id) + outcome = await backend.realm.export_do_certificates( + organization_id, realm_id, snapshot_timestamp + ) match outcome: case RealmExportCertificates() as batch: pass - case (RealmExportDoCertificatesBadOutcome.ORGANIZATION_NOT_FOUND | RealmExportDoCertificatesBadOutcome.REALM_NOT_FOUND) as error: + case ( + ( + RealmExportDoCertificatesBadOutcome.ORGANIZATION_NOT_FOUND + | RealmExportDoCertificatesBadOutcome.REALM_NOT_FOUND + ) as error + ): # Organization&realm existence has already been checked, so this shouldn't occur raise RealmExporterInputError(f"Unexpect outcome when exporting certificates: {error}") - # Write certificates to export database + # 2) Write certificates to export database - def _write_sqlite_db(): + def _write_sqlite_db(batch: RealmExportCertificates): output_db_con.executemany( - "INSERT INTO realm_role (_id, role_certificate) VALUES (?, ?) ON CONFLICT(_id) DO NOTHING", - batch.realm_role_certificates + "INSERT INTO common_certificate (_id, certificate) VALUES (?, ?)", + enumerate(batch.common_certificates), ) output_db_con.executemany( - "INSERT INTO user_ (_id, user_certificate, revoked_user_certificate) VALUES (?, ?, ?) ON CONFLICT(_id) DO NOTHING", - batch.user_certificates + "INSERT INTO realm_certificate (_id, certificate) VALUES (?, ?)", + enumerate(batch.realm_certificates), ) output_db_con.executemany( - "INSERT INTO user_update (_id, user_update_certificate) VALUES (?, ?) ON CONFLICT(_id) DO NOTHING", - batch.user_update_certificates + "INSERT INTO sequester_certificate (_id, certificate) VALUES (?, ?)", + enumerate(batch.sequester_certificates), ) - output_db_con.executemany( - "INSERT INTO device (_id, device_certificate) VALUES (?, ?) ON CONFLICT(_id) DO NOTHING", - batch.device_certificates + output_db_con.execute( + "UPDATE info SET certificates_export_done = 1", ) + output_db_con.commit() + + await asyncio.to_thread(_write_sqlite_db, batch) + + +async def _do_export_vlobs( + backend: Backend, + output_db_con: sqlite3.Connection, + organization_id: OrganizationID, + realm_id: VlobID, + snapshot_timestamp: DateTime, +) -> None: + # 0) Skip the operation if the export database already contains it + + def _get_vlobs_export_done() -> Any: + row = output_db_con.execute( + "SELECT vlobs_export_done FROM info", + ).fetchone() + return bool(row[0]) + + vlobs_export_done = await asyncio.to_thread(_get_vlobs_export_done) + match vlobs_export_done: + case True: + # Nothing to do + return + case False: + # Export needed + pass + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `vlobs_export_done` contains unexpected value `{unknown!r}`" + ) + + current_batch_offset_marker = 0 + while True: + # 1) Download a batch of data + + outcome = await backend.realm.export_do_vlobs_batch( + organization_id, realm_id, batch_offset_marker=current_batch_offset_marker + ) + match outcome: + case RealmExportVlobsBatch() as batch: + pass + case ( + ( + RealmExportDoVlobsBatchBadOutcome.ORGANIZATION_NOT_FOUND + | RealmExportDoVlobsBatchBadOutcome.REALM_NOT_FOUND + ) as error + ): + # Organization&realm existence has already been checked, so this shouldn't occur + raise RealmExporterInputError( + f"Unexpect outcome when exporting certificates: {error}" + ) + + if not batch.items: + break + + # 2) Write the batch to export database + + def _write_sqlite_db(): + output_db_con.executemany( + "INSERT INTO vlob_atom (\ + realm_vlob_update_index,\ + vlob_id,\ + version,\ + key_index,\ + blob,\ + size,\ + author,\ + timestamp\ + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ( + ( + item.realm_vlob_update_index, + item.vlob_id.bytes, + item.version, + item.key_index, + item.blob, + item.size, + item.author, + item.timestamp.as_timestamp_micros(), + ) + for item in batch.items + ), + ) + output_db_con.commit() + + await asyncio.to_thread(_write_sqlite_db) + + current_batch_offset_marker = batch.batch_offset_marker + + # TODO: progress report + + def _write_sqlite_db(): + output_db_con.execute( + "UPDATE info SET vlobs_export_done = TRUE", + ) + output_db_con.commit() + + await asyncio.to_thread(_write_sqlite_db) + + +async def _do_export_blocks_metadata( + backend: Backend, + output_db_con: sqlite3.Connection, + organization_id: OrganizationID, + realm_id: VlobID, + batch_size: int, +) -> None: + # 0) Skip the operation if the export database already contains it + + def _get_blocks_metadata_export_done() -> Any: + row = output_db_con.execute( + "SELECT blocks_metadata_export_done FROM info", + ).fetchone() + return bool(row[0]) + + blocks_metadata_export_done = await asyncio.to_thread(_get_blocks_metadata_export_done) + match blocks_metadata_export_done: + case True: + # Nothing to do + return + case False: + # Export needed + pass + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `blocks_metadata_export_done` contains unexpected value `{unknown!r}`" + ) + + current_batch_offset_marker = 0 + while True: + # 1) Download a batch of data + + outcome = await backend.realm.export_do_blocks_metadata_batch( + organization_id=organization_id, + realm_id=realm_id, + batch_offset_marker=current_batch_offset_marker, + batch_size=batch_size, + ) + match outcome: + case RealmExportBlocksMetadataBatch() as batch: + pass + case ( + ( + RealmExportDoBlocksBatchMetadatBadOutcome.ORGANIZATION_NOT_FOUND + | RealmExportDoBlocksBatchMetadatBadOutcome.REALM_NOT_FOUND + ) as error + ): + # Organization&realm existence has already been checked, so this shouldn't occur + raise RealmExporterInputError( + f"Unexpect outcome when exporting certificates: {error}" + ) + + if not batch.items: + break + + # 2) Write the batch to export database + + def _write_sqlite_db(): + output_db_con.executemany( + "INSERT INTO block (\ + _id,\ + block_id,\ + author,\ + size,\ + key_index\ + ) VALUES (?, ?, ?, ?, ?)", + ( + ( + item.id, + item.block_id.bytes, + item.author, + item.key_index, + item.size, + ) + for item in batch.items + ), + ) + output_db_con.commit() + + await asyncio.to_thread(_write_sqlite_db) + + current_batch_offset_marker = batch.batch_offset_marker + + # TODO: progress report + + def _write_sqlite_db(): output_db_con.execute( - "UPDATE info SET (certificates_export_done = TRUE)", + "UPDATE info SET blocks_metadata_export_done = TRUE", ) output_db_con.commit() await asyncio.to_thread(_write_sqlite_db) -async def _do_export_vlobs(backend: Backend, output_db_con: sqlite3.Connection, on_progress: ProgressReportCallback) -> ExportStatus: +async def _do_export_blocks_data( + backend: Backend, + output_db_con: sqlite3.Connection, + organization_id: OrganizationID, + realm_id: VlobID, +) -> None: + # 0) Skip the operation if the export database already contains it + + def _get_blocks_export_done() -> Any: + row = output_db_con.execute( + "SELECT blocks_export_done FROM info", + ).fetchone() + return bool(row[0]) + + blocks_export_done = await asyncio.to_thread(_get_blocks_export_done) + match blocks_export_done: + case True: + # Nothing to do + return + case False: + # Export needed + pass + case unknown: + raise RealmExporterOutputDbError( + f"Output export database appears to be corrupted: `blocks_export_done` contains unexpected value `{unknown!r}`" + ) + # TODO: retreive marker from SQLite # TODO: run multiple batches in parallel (but save in SQLite must be in order !) # TODO: while loop to run multiple batches missing - # TODO: progress report - # TODO: save batch to sqlite - raise NotImplementedError + def _write_sqlite_db(): + output_db_con.execute( + "UPDATE info SET blocks_data_export_done = TRUE", + ) + output_db_con.commit() + await asyncio.to_thread(_write_sqlite_db) -async def _do_export_blocks(backend: Backend, output_db_con: sqlite3.Connection, on_progress: ProgressReportCallback) -> ExportStatus: + # TODO: save batch to sqlite raise NotImplementedError diff --git a/server/tests/test_sequester.py b/server/tests/test_sequester.py index ab0c772423c..6dd8da3d328 100644 --- a/server/tests/test_sequester.py +++ b/server/tests/test_sequester.py @@ -1,6 +1,5 @@ # Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS -from typing import Awaitable, Protocol from unittest.mock import ANY import pytest @@ -32,22 +31,6 @@ from tests.common import Backend, MinimalorgRpcClients, SequesteredOrgRpcClients -class DoCreateServiceFunc(Protocol): - def __call__( - self, - now: DateTime, - organization_id: OrganizationID, - service_certificate: bytes, - ) -> Awaitable[ - ( - SequesterServiceCertificate - | SequesterCreateServiceValidateBadOutcome - | SequesterCreateServiceStoreBadOutcome - | RequireGreaterTimestamp - ) - ]: ... - - @pytest.mark.parametrize("kind", ("storage", "webhook")) async def test_create_service_ok( sequestered_org: SequesteredOrgRpcClients, backend: Backend, kind: str