Skip to content

Commit

Permalink
MongoDB: For ctk load table, use "partial" scan for inferring schema
Browse files Browse the repository at this point in the history
... based on the first 10,000 documents.
  • Loading branch information
amotl committed Sep 2, 2024
1 parent f92121b commit 419f34e
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
- MongoDB: Add support for UUID types
- MongoDB: Improve reading timestamps in previous BSON formats
- MongoDB: Fix processing empty arrays/lists. By default, assume `TEXT` as inner type.
- MongoDB: For `ctk load table`, use "partial" scan for inferring the collection schema,
based on the first 10,000 documents.

## 2024/09/02 v0.0.21
- DynamoDB: Add special decoding for varied lists.
Expand Down
2 changes: 1 addition & 1 deletion cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int
url=str(mongodb_uri),
database=mongodb_database,
collection=mongodb_collection,
scan="full",
scan="partial",
transformation=transformation,
limit=limit,
)
Expand Down
8 changes: 6 additions & 2 deletions cratedb_toolkit/io/mongodb/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@
)


# TODO: Make configurable.
PARTIAL_SCAN_COUNT = 10_000


def extract_schema_from_collection(collection: Collection, partial: bool, limit: int = 0) -> t.Dict[str, t.Any]:
"""
Extract a schema definition from a collection.
Expand All @@ -95,7 +99,7 @@ def extract_schema_from_collection(collection: Collection, partial: bool, limit:

schema: dict = {"count": 0, "document": {}}
if partial:
count = 1
count = PARTIAL_SCAN_COUNT
else:
count = collection.estimated_document_count()
with progressbar:
Expand All @@ -105,7 +109,7 @@ def extract_schema_from_collection(collection: Collection, partial: bool, limit:
schema["count"] += 1
schema["document"] = extract_schema_from_document(document, schema["document"])
progressbar.update(task, advance=1)
if partial:
if partial and schema["count"] >= PARTIAL_SCAN_COUNT:
break
except KeyboardInterrupt:
return schema
Expand Down

0 comments on commit 419f34e

Please sign in to comment.