diff --git a/application/cratedb-toolkit/requirements.txt b/application/cratedb-toolkit/requirements.txt index 370d6d81..7d1a9c9a 100644 --- a/application/cratedb-toolkit/requirements.txt +++ b/application/cratedb-toolkit/requirements.txt @@ -1 +1 @@ -cratedb-toolkit[mongodb]==0.0.23 +cratedb-toolkit[influxdb,mongodb]==0.0.23 diff --git a/application/cratedb-toolkit/test_io.py b/application/cratedb-toolkit/test_io.py index 71ffed96..f43e0444 100644 --- a/application/cratedb-toolkit/test_io.py +++ b/application/cratedb-toolkit/test_io.py @@ -1,7 +1,9 @@ import shlex import sys +from collections import OrderedDict -import requests +import pytest +import typing as t import logging import platformdirs from cratedb_toolkit.util import DatabaseAdapter @@ -23,26 +25,86 @@ def update(self, op_code, cur_count, max_count=None, message=""): ) -def test_ctk_load_table_mongodb_json(): +@pytest.fixture() +def drop_testing_tables(): """ - Probe importing data from MongoDB Extended JSON files. + Drop tables used for testing purposes, to let each test case have a blank canvas. """ - # Define table names used for testing. + db = DatabaseAdapter("crate://localhost:4200/") + table_names = [ - "books", - "city_inspections", - "companies", - "countries-big", - "countries-small", - "covers", - "grades", - "products", - "profiles", - "restaurant", - "students", + "from-influxdb.air-sensor-data", + "from-mongodb.books", + "from-mongodb.city_inspections", + "from-mongodb.companies", + "from-mongodb.countries-big", + "from-mongodb.countries-small", + "from-mongodb.covers", + "from-mongodb.grades", + "from-mongodb.products", + "from-mongodb.profiles", + "from-mongodb.restaurant", + "from-mongodb.students", ] + for table_name in table_names: + db.drop_table(table_name) + + +def get_table_cardinalities(db: DatabaseAdapter, table_names: t.List[str]) -> t.Dict[str, int]: + """ + Inquire table cardinalities for given table names. + """ + cardinalities = OrderedDict() + for table_name in table_names: + # Synchronize writes. + db.refresh_table(table_name) + # Inquire table count. + cardinalities[table_name] = db.count_records(table_name) + return cardinalities + + +def test_ctk_load_table_influxdb_lp(drop_testing_tables): + """ + Probe importing data from InfluxDB Line Protocol file. + """ + + db = DatabaseAdapter("crate://localhost:4200/?schema=from-influxdb") + + # Define table cardinalities used in validation step. + table_cardinalities = { + "air-sensor-data": 5288, + } + + # Define path to source data. + influxdb_files_path = platformdirs.user_cache_path("cratedb-examples") / "influxdb_files" + influxdb_files_path.mkdir(parents=True, exist_ok=True) + + # Define resource of source data. + influxdb_lp_url = "https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp" + + # Invoke data transfer. + command = f""" +influxio copy \ + {influxdb_lp_url} \ + "crate://localhost:4200/from-influxdb/air-sensor-data" + """ + print(f"Invoking CTK: {command}", file=sys.stderr) + subprocess.check_call(shlex.split(command)) + + # Validate data in target database. + cardinalities = get_table_cardinalities(db, table_cardinalities.keys()) + assert cardinalities == table_cardinalities + + +def test_ctk_load_table_mongodb_json(drop_testing_tables): + """ + Probe importing data from MongoDB Extended JSON files. + """ + + db = DatabaseAdapter("crate://localhost:4200/?schema=from-mongodb") + # Define table cardinalities used in validation step. table_cardinalities = { "books": 431, @@ -58,12 +120,6 @@ def test_ctk_load_table_mongodb_json(): "students": 200, } - db = DatabaseAdapter("crate://localhost:4200/?schema=from-mongodb") - - # Drop tables for blank canvas. - for table_name in table_names: - db.drop_table(table_name) - # Define path to source data. mongodb_json_files_path = platformdirs.user_cache_path("cratedb-examples") / "mongodb_json_files" datasets_path = mongodb_json_files_path / "datasets" @@ -87,12 +143,6 @@ def test_ctk_load_table_mongodb_json(): print(f"Invoking CTK: {command}", file=sys.stderr) subprocess.check_call(shlex.split(command)) - # Validate data in database. - results = db.run_sql("SHOW TABLES", records=True) - results = [item["table_name"] for item in results] - assert results == table_names - - cardinalities = {} - for table_name, cardinality in table_cardinalities.items(): - cardinalities[table_name] = db.count_records(table_name) + # Validate data in target database. + cardinalities = get_table_cardinalities(db, table_cardinalities.keys()) assert cardinalities == table_cardinalities