From 9b3baadb19f9cfc56a812df5cbfd259b97c13ab6 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 17 Sep 2024 01:45:39 +0200 Subject: [PATCH] CTK: Validate InfluxDB Table Loader --- application/cratedb-toolkit/requirements.txt | 2 +- application/cratedb-toolkit/test_io.py | 51 ++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/application/cratedb-toolkit/requirements.txt b/application/cratedb-toolkit/requirements.txt index 370d6d81..7d1a9c9a 100644 --- a/application/cratedb-toolkit/requirements.txt +++ b/application/cratedb-toolkit/requirements.txt @@ -1 +1 @@ -cratedb-toolkit[mongodb]==0.0.23 +cratedb-toolkit[influxdb,mongodb]==0.0.23 diff --git a/application/cratedb-toolkit/test_io.py b/application/cratedb-toolkit/test_io.py index 71ffed96..53528607 100644 --- a/application/cratedb-toolkit/test_io.py +++ b/application/cratedb-toolkit/test_io.py @@ -23,6 +23,57 @@ def update(self, op_code, cur_count, max_count=None, message=""): ) +def test_ctk_load_table_influxdb_lp(): + """ + Probe importing data from InfluxDB Line Protocol file. + """ + + # Define table names used for testing. + table_names = [ + "air-sensor-data", + ] + + # Define table cardinalities used in validation step. + table_cardinalities = { + "air-sensor-data": 431, + } + + db = DatabaseAdapter("crate://localhost:4200/?schema=from-influxdb") + + # Drop tables for blank canvas. + for table_name in table_names: + db.drop_table(table_name) + + # Define path to source data. + influxdb_files_path = platformdirs.user_cache_path("cratedb-examples") / "influxdb_files" + influxdb_files_path.mkdir(parents=True, exist_ok=True) + + # Acquire source data. + air_sensor_data_path = influxdb_files_path / "air-sensor-data.lp" + if not air_sensor_data_path.exists(): + influxdb_lp_url = "https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp" + air_sensor_data_path.write_text(requests.get(influxdb_lp_url).text) + + # Invoke data transfer. + command = f""" +influxio copy \ + "file://{air_sensor_data_path}" \ + "crate://localhost:4200/from-influxdb/air-sensor-data" + """ + print(f"Invoking CTK: {command}", file=sys.stderr) + subprocess.check_call(shlex.split(command)) + + # Validate data in database. + results = db.run_sql("SHOW TABLES", records=True) + results = [item["table_name"] for item in results] + assert results == table_names + + cardinalities = {} + for table_name, cardinality in table_cardinalities.items(): + cardinalities[table_name] = db.count_records(table_name) + assert cardinalities == table_cardinalities + + def test_ctk_load_table_mongodb_json(): """ Probe importing data from MongoDB Extended JSON files.