Skip to content

Commit

Permalink
CTK: Validate InfluxDB Table Loader
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 17, 2024
1 parent 479f861 commit a05e8d2
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 30 deletions.
2 changes: 1 addition & 1 deletion application/cratedb-toolkit/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cratedb-toolkit[mongodb]==0.0.23
cratedb-toolkit[influxdb,mongodb]==0.0.23
108 changes: 79 additions & 29 deletions application/cratedb-toolkit/test_io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import shlex
import sys
from collections import OrderedDict

import requests
import pytest
import typing as t
import logging
import platformdirs
from cratedb_toolkit.util import DatabaseAdapter
Expand All @@ -23,26 +25,86 @@ def update(self, op_code, cur_count, max_count=None, message=""):
)


def test_ctk_load_table_mongodb_json():
@pytest.fixture()
def drop_testing_tables():
"""
Probe importing data from MongoDB Extended JSON files.
Drop tables used for testing purposes, to let each test case have a blank canvas.
"""

# Define table names used for testing.
db = DatabaseAdapter("crate://localhost:4200/")

table_names = [
"books",
"city_inspections",
"companies",
"countries-big",
"countries-small",
"covers",
"grades",
"products",
"profiles",
"restaurant",
"students",
"from-influxdb.air-sensor-data",
"from-mongodb.books",
"from-mongodb.city_inspections",
"from-mongodb.companies",
"from-mongodb.countries-big",
"from-mongodb.countries-small",
"from-mongodb.covers",
"from-mongodb.grades",
"from-mongodb.products",
"from-mongodb.profiles",
"from-mongodb.restaurant",
"from-mongodb.students",
]

for table_name in table_names:
db.drop_table(table_name)


def get_table_cardinalities(db: DatabaseAdapter, table_names: t.List[str]) -> t.Dict[str, int]:
"""
Inquire table cardinalities for given table names.
"""
cardinalities = OrderedDict()
for table_name in table_names:
# Synchronize writes.
db.refresh_table(table_name)
# Inquire table count.
cardinalities[table_name] = db.count_records(table_name)
return cardinalities


def test_ctk_load_table_influxdb_lp(drop_testing_tables):
"""
Probe importing data from InfluxDB Line Protocol file.
"""

db = DatabaseAdapter("crate://localhost:4200/?schema=from-influxdb")

# Define table cardinalities used in validation step.
table_cardinalities = {
"air-sensor-data": 5288,
}

# Define path to source data.
influxdb_files_path = platformdirs.user_cache_path("cratedb-examples") / "influxdb_files"
influxdb_files_path.mkdir(parents=True, exist_ok=True)

# Define resource of source data.
influxdb_lp_url = "https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp"

# Invoke data transfer.
command = f"""
influxio copy \
{influxdb_lp_url} \
"crate://localhost:4200/from-influxdb/air-sensor-data"
"""
print(f"Invoking CTK: {command}", file=sys.stderr)
subprocess.check_call(shlex.split(command))

# Validate data in target database.
cardinalities = get_table_cardinalities(db, table_cardinalities.keys())
assert cardinalities == table_cardinalities


def test_ctk_load_table_mongodb_json(drop_testing_tables):
"""
Probe importing data from MongoDB Extended JSON files.
"""

db = DatabaseAdapter("crate://localhost:4200/?schema=from-mongodb")

# Define table cardinalities used in validation step.
table_cardinalities = {
"books": 431,
Expand All @@ -58,12 +120,6 @@ def test_ctk_load_table_mongodb_json():
"students": 200,
}

db = DatabaseAdapter("crate://localhost:4200/?schema=from-mongodb")

# Drop tables for blank canvas.
for table_name in table_names:
db.drop_table(table_name)

# Define path to source data.
mongodb_json_files_path = platformdirs.user_cache_path("cratedb-examples") / "mongodb_json_files"
datasets_path = mongodb_json_files_path / "datasets"
Expand All @@ -87,12 +143,6 @@ def test_ctk_load_table_mongodb_json():
print(f"Invoking CTK: {command}", file=sys.stderr)
subprocess.check_call(shlex.split(command))

# Validate data in database.
results = db.run_sql("SHOW TABLES", records=True)
results = [item["table_name"] for item in results]
assert results == table_names

cardinalities = {}
for table_name, cardinality in table_cardinalities.items():
cardinalities[table_name] = db.count_records(table_name)
# Validate data in target database.
cardinalities = get_table_cardinalities(db, table_cardinalities.keys())
assert cardinalities == table_cardinalities

0 comments on commit a05e8d2

Please sign in to comment.