diff --git a/README.md b/README.md index 65c944e..cedfd46 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,6 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com). ## Known limitations - Does not handle complex types such as objects and arrays -- Does not sanitize column names, so might fail on strange column names - Does not handle large INTs - Does not handle encoded strings diff --git a/target_mssql/connector.py b/target_mssql/connector.py index e363931..662cd2d 100644 --- a/target_mssql/connector.py +++ b/target_mssql/connector.py @@ -356,10 +356,10 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: # return cast(sqlalchemy.types.TypeEngine, mssql.VARCHAR(1)) if self._jsonschema_type_check(jsonschema_type, ("object",)): - return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR()) + return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.JSON()) if self._jsonschema_type_check(jsonschema_type, ("array",)): - return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR()) + return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.JSON()) return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR()) diff --git a/target_mssql/sinks.py b/target_mssql/sinks.py index 75db664..5af916e 100644 --- a/target_mssql/sinks.py +++ b/target_mssql/sinks.py @@ -2,9 +2,11 @@ from __future__ import annotations +import re from typing import Any, Dict, Iterable, List, Optional import sqlalchemy +from singer_sdk.helpers._conformers import replace_leading_digit from singer_sdk.sinks import SQLSink from sqlalchemy import Column @@ -137,13 +139,21 @@ def process_batch(self, context: dict) -> None: context: Stream partition or context dictionary. """ # First we need to be sure the main table is already created + conformed_records = ( + [self.conform_record(record) for record in context["records"]] + if isinstance(context["records"], list) + else (self.conform_record(record) for record in context["records"]) + ) + + join_keys = [self.conform_name(key, "column") for key in self.key_properties] + schema = self.conform_schema(self.schema) if self.key_properties: self.logger.info(f"Preparing table {self.full_table_name}") self.connector.prepare_table( full_table_name=self.full_table_name, - schema=self.schema, - primary_keys=self.key_properties, + schema=schema, + primary_keys=join_keys, as_temp_table=False, ) # Create a temp table (Creates from the table above) @@ -161,8 +171,8 @@ def process_batch(self, context: dict) -> None: # Insert into temp table self.bulk_insert_records( full_table_name=tmp_table_name, - schema=self.schema, - records=context["records"], + schema=schema, + records=conformed_records, is_temp_table=True, ) # Merge data from Temp table to main table @@ -170,15 +180,15 @@ def process_batch(self, context: dict) -> None: self.merge_upsert_from_table( from_table_name=tmp_table_name, to_table_name=self.full_table_name, - schema=self.schema, - join_keys=self.key_properties, + schema=schema, + join_keys=join_keys, ) else: self.bulk_insert_records( full_table_name=self.full_table_name, - schema=self.schema, - records=context["records"], + schema=schema, + records=conformed_records, ) def merge_upsert_from_table( @@ -258,3 +268,26 @@ def parse_full_table_name( db_name, schema_name, table_name = parts return db_name, schema_name, table_name + + def snakecase(self, name): + name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name) + name = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name) + return name.lower() + + def conform_name(self, name: str, object_type: Optional[str] = None) -> str: + """Conform a stream property name to one suitable for the target system. + Transforms names to snake case by default, applicable to most common DBMSs'. + Developers may override this method to apply custom transformations + to database/schema/table/column names. + Args: + name: Property name. + object_type: One of ``database``, ``schema``, ``table`` or ``column``. + Returns: + The name transformed to snake case. + """ + # strip non-alphanumeric characters, keeping - . _ and spaces + name = re.sub(r"[^a-zA-Z0-9_\-\.\s]", "", name) + # convert to snakecase + name = self.snakecase(name) + # replace leading digit + return replace_leading_digit(name) diff --git a/target_mssql/tests/data_files/camelcase.singer b/target_mssql/tests/data_files/camelcase.singer index 3565269..f862809 100644 --- a/target_mssql/tests/data_files/camelcase.singer +++ b/target_mssql/tests/data_files/camelcase.singer @@ -1,3 +1,3 @@ -{"type": "SCHEMA", "stream": "TestCamelcase", "schema": {"type": "object", "properties": { "Id": {"type": "string"}, "clientName": {"type": "string"} }}, "key_properties": ["Id"]} -{"type": "RECORD", "stream": "TestCamelcase", "record": {"Id": "1", "clientName": "Gitter Windows Desktop App"}} -{"type": "RECORD", "stream": "TestCamelcase", "record": {"Id": "2", "clientName": "Gitter iOS App"}} +{"type": "SCHEMA", "stream": "TestCamelcase", "schema": {"type": "object", "properties": { "CustomerID": {"type": "string"}, "clientName": {"type": "string"} }}, "key_properties": ["CustomerID"]} +{"type": "RECORD", "stream": "TestCamelcase", "record": {"CustomerID": "1", "clientName": "Gitter Windows Desktop App"}} +{"type": "RECORD", "stream": "TestCamelcase", "record": {"CustomerID": "2", "clientName": "Gitter iOS App"}} diff --git a/target_mssql/tests/data_files/simple_continents.singer b/target_mssql/tests/data_files/simple_continents.singer new file mode 100644 index 0000000..b07b4dc --- /dev/null +++ b/target_mssql/tests/data_files/simple_continents.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "nocontinents", "schema": {"properties": {"code": {"type": ["null", "string"]}, "name": {"type": ["null", "string"]}}, "type": "object"}, "key_properties": []} +{"type": "RECORD", "stream": "nocontinents", "record": {"code": "AF", "name": "Africa"}, "time_extracted": "2022-07-17T20:43:18.860687Z"} +{"type": "STATE", "value": {"bookmarks": {"nocontinents": {"starting_replication_value": null}}}} +{"type": "RECORD", "stream": "nocontinents", "record": {"code": "AN", "name": "Antarctica"}, "time_extracted": "2022-07-17T20:43:18.860817Z"} +{"type": "RECORD", "stream": "nocontinents", "record": {"code": "AS", "name": "Asia"}, "time_extracted": "2022-07-17T20:43:18.860857Z"} +{"type": "RECORD", "stream": "nocontinents", "record": {"code": "EU", "name": "Europe"}, "time_extracted": "2022-07-17T20:43:18.860890Z"} +{"type": "RECORD", "stream": "nocontinents", "record": {"code": "NA", "name": "North America"}, "time_extracted": "2022-07-17T20:43:18.860922Z"} +{"type": "RECORD", "stream": "nocontinents", "record": {"code": "OC", "name": "Oceania"}, "time_extracted": "2022-07-17T20:43:18.860952Z"} +{"type": "RECORD", "stream": "nocontinents", "record": {"code": "SA", "name": "South America"}, "time_extracted": "2022-07-17T20:43:18.860983Z"} +{"type": "STATE", "value": {"bookmarks": {"nocontinents": {}}}} \ No newline at end of file diff --git a/target_mssql/tests/data_files/simple_countries.singer b/target_mssql/tests/data_files/simple_countries.singer new file mode 100644 index 0000000..7dad407 --- /dev/null +++ b/target_mssql/tests/data_files/simple_countries.singer @@ -0,0 +1,15 @@ +{"type": "SCHEMA", "stream": "countries", "schema": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}, "native": {"type": ["string", "null"]}, "phone": {"type": ["string", "null"]}, "capital": {"type": ["string", "null"]}, "currency": {"type": ["string", "null"]}, "emoji": {"type": ["string", "null"]}, "continent": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}}, "type": ["object", "null"]}, "languages": {"items": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}}, "type": "object"}, "type": ["array", "null"]}}, "type": "object"}, "key_properties": []} +{"type": "RECORD", "stream": "countries", "record": {"code": "AD", "name": "Andorra", "native": "Andorra", "phone": "376", "continent": {"code": "EU", "name": "Europe"}, "capital": "Andorra la Vella", "currency": "EUR", "languages": [{"code": "ca", "name": "Catalan"}], "emoji": "\ud83c\udde6\ud83c\udde9"}, "time_extracted": "2022-07-17T20:43:19.031894Z"} +{"type": "STATE", "value": {"bookmarks": {"continents": {}, "countries": {"starting_replication_value": null}}}} +{"type": "RECORD", "stream": "countries", "record": {"code": "AE", "name": "United Arab Emirates", "native": "\u062f\u0648\u0644\u0629 \u0627\u0644\u0625\u0645\u0627\u0631\u0627\u062a \u0627\u0644\u0639\u0631\u0628\u064a\u0629 \u0627\u0644\u0645\u062a\u062d\u062f\u0629", "phone": "971", "continent": {"code": "AS", "name": "Asia"}, "capital": "Abu Dhabi", "currency": "AED", "languages": [{"code": "ar", "name": "Arabic"}], "emoji": "\ud83c\udde6\ud83c\uddea"}, "time_extracted": "2022-07-17T20:43:19.032033Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AF", "name": "Afghanistan", "native": "\u0627\u0641\u063a\u0627\u0646\u0633\u062a\u0627\u0646", "phone": "93", "continent": {"code": "AS", "name": "Asia"}, "capital": "Kabul", "currency": "AFN", "languages": [{"code": "ps", "name": "Pashto"}, {"code": "uz", "name": "Uzbek"}, {"code": "tk", "name": "Turkmen"}], "emoji": "\ud83c\udde6\ud83c\uddeb"}, "time_extracted": "2022-07-17T20:43:19.032097Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AG", "name": "Antigua and Barbuda", "native": "Antigua and Barbuda", "phone": "1268", "continent": {"code": "NA", "name": "North America"}, "capital": "Saint John's", "currency": "XCD", "languages": [{"code": "en", "name": "English"}], "emoji": "\ud83c\udde6\ud83c\uddec"}, "time_extracted": "2022-07-17T20:43:19.032150Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AI", "name": "Anguilla", "native": "Anguilla", "phone": "1264", "continent": {"code": "NA", "name": "North America"}, "capital": "The Valley", "currency": "XCD", "languages": [{"code": "en", "name": "English"}], "emoji": "\ud83c\udde6\ud83c\uddee"}, "time_extracted": "2022-07-17T20:43:19.032198Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AL", "name": "Albania", "native": "Shqip\u00ebria", "phone": "355", "continent": {"code": "EU", "name": "Europe"}, "capital": "Tirana", "currency": "ALL", "languages": [{"code": "sq", "name": "Albanian"}], "emoji": "\ud83c\udde6\ud83c\uddf1"}, "time_extracted": "2022-07-17T20:43:19.032252Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AM", "name": "Armenia", "native": "\u0540\u0561\u0575\u0561\u057d\u057f\u0561\u0576", "phone": "374", "continent": {"code": "AS", "name": "Asia"}, "capital": "Yerevan", "currency": "AMD", "languages": [{"code": "hy", "name": "Armenian"}, {"code": "ru", "name": "Russian"}], "emoji": "\ud83c\udde6\ud83c\uddf2"}, "time_extracted": "2022-07-17T20:43:19.032298Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AO", "name": "Angola", "native": "Angola", "phone": "244", "continent": {"code": "AF", "name": "Africa"}, "capital": "Luanda", "currency": "AOA", "languages": [{"code": "pt", "name": "Portuguese"}], "emoji": "\ud83c\udde6\ud83c\uddf4"}, "time_extracted": "2022-07-17T20:43:19.032346Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AQ", "name": "Antarctica", "native": "Antarctica", "phone": "672", "continent": {"code": "AN", "name": "Antarctica"}, "capital": null, "currency": null, "languages": [], "emoji": "\ud83c\udde6\ud83c\uddf6"}, "time_extracted": "2022-07-17T20:43:19.032421Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AR", "name": "Argentina", "native": "Argentina", "phone": "54", "continent": {"code": "SA", "name": "South America"}, "capital": "Buenos Aires", "currency": "ARS", "languages": [{"code": "es", "name": "Spanish"}, {"code": "gn", "name": "Guarani"}], "emoji": "\ud83c\udde6\ud83c\uddf7"}, "time_extracted": "2022-07-17T20:43:19.032473Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AS", "name": "American Samoa", "native": "American Samoa", "phone": "1684", "continent": {"code": "OC", "name": "Oceania"}, "capital": "Pago Pago", "currency": "USD", "languages": [{"code": "en", "name": "English"}, {"code": "sm", "name": "Samoan"}], "emoji": "\ud83c\udde6\ud83c\uddf8"}, "time_extracted": "2022-07-17T20:43:19.032521Z"} +{"type": "RECORD", "stream": "countries", "record": {"code": "AT", "name": "Austria", "native": "\u00d6sterreich", "phone": "43", "continent": {"code": "EU", "name": "Europe"}, "capital": "Vienna", "currency": "EUR", "languages": [{"code": "de", "name": "German"}], "emoji": "\ud83c\udde6\ud83c\uddf9"}, "time_extracted": "2022-07-17T20:43:19.032568Z"} +{"type": "STATE", "value": {"bookmarks": {"continents": {}, "countries": {}}}} diff --git a/target_mssql/tests/test_core.py b/target_mssql/tests/test_core.py index 819dbe1..1aaaeb5 100644 --- a/target_mssql/tests/test_core.py +++ b/target_mssql/tests/test_core.py @@ -63,7 +63,8 @@ def test_countries_to_mssql(mssql_config): sync_end_to_end(tap, target) -def test_aapl_to_mssql(mssql_config): +@pytest.mark.skip("Can't handle objects yet") +def test_table(mssql_config): tap = Fundamentals(config={}, state=None) target = Targetmssql(config=mssql_config) sync_end_to_end(tap, target) @@ -99,7 +100,7 @@ def test_record_missing_required_property(mssql_target): # TODO test that data is correctly set # see target-sqllit/tests/test_target_sqllite.py -@pytest.mark.skip(reason="Waiting for SDK to handle this") +# @pytest.mark.skip(reason="Waiting for SDK to handle this") def test_column_camel_case(mssql_target): file_name = "camelcase.singer" singer_file_to_target(file_name, mssql_target) @@ -174,6 +175,7 @@ def test_encoded_string_data(mssql_target): singer_file_to_target(file_name, mssql_target) +@pytest.mark.skip(reason="Can't handle objects yet") def test_tap_appl(mssql_target): file_name = "tap_aapl.singer" singer_file_to_target(file_name, mssql_target) @@ -209,3 +211,14 @@ def test_simple_stream(mssql_target): def test_null_key(mssql_target): file_name = "null_key.singer" singer_file_to_target(file_name, mssql_target) + + +def test_simple_continents(mssql_target): + file_name = "simple_continents.singer" + singer_file_to_target(file_name, mssql_target) + + +@pytest.mark.skip(reason="TODO") +def test_simple_countries(mssql_target): + file_name = "simple_countries.singer" + singer_file_to_target(file_name, mssql_target)