Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6 from radbrt/column_casing
Browse files Browse the repository at this point in the history
New column casing, more sensible conversion from CamelCase (and uppercase) to snake_case
  • Loading branch information
hholgersen authored Dec 19, 2022
2 parents bd1ba3e + 8c23ead commit 63744f4
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 16 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com).
## Known limitations

- Does not handle complex types such as objects and arrays
- Does not sanitize column names, so might fail on strange column names
- Does not handle large INTs
- Does not handle encoded strings

Expand Down
4 changes: 2 additions & 2 deletions target_mssql/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,10 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: #
return cast(sqlalchemy.types.TypeEngine, mssql.VARCHAR(1))

if self._jsonschema_type_check(jsonschema_type, ("object",)):
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR())
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.JSON())

if self._jsonschema_type_check(jsonschema_type, ("array",)):
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR())
return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.JSON())

return cast(sqlalchemy.types.TypeEngine, sqlalchemy.types.VARCHAR())

Expand Down
49 changes: 41 additions & 8 deletions target_mssql/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

from __future__ import annotations

import re
from typing import Any, Dict, Iterable, List, Optional

import sqlalchemy
from singer_sdk.helpers._conformers import replace_leading_digit
from singer_sdk.sinks import SQLSink
from sqlalchemy import Column

Expand Down Expand Up @@ -137,13 +139,21 @@ def process_batch(self, context: dict) -> None:
context: Stream partition or context dictionary.
"""
# First we need to be sure the main table is already created
conformed_records = (
[self.conform_record(record) for record in context["records"]]
if isinstance(context["records"], list)
else (self.conform_record(record) for record in context["records"])
)

join_keys = [self.conform_name(key, "column") for key in self.key_properties]
schema = self.conform_schema(self.schema)

if self.key_properties:
self.logger.info(f"Preparing table {self.full_table_name}")
self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.schema,
primary_keys=self.key_properties,
schema=schema,
primary_keys=join_keys,
as_temp_table=False,
)
# Create a temp table (Creates from the table above)
Expand All @@ -161,24 +171,24 @@ def process_batch(self, context: dict) -> None:
# Insert into temp table
self.bulk_insert_records(
full_table_name=tmp_table_name,
schema=self.schema,
records=context["records"],
schema=schema,
records=conformed_records,
is_temp_table=True,
)
# Merge data from Temp table to main table
self.logger.info(f"Merging data from temp table to {self.full_table_name}")
self.merge_upsert_from_table(
from_table_name=tmp_table_name,
to_table_name=self.full_table_name,
schema=self.schema,
join_keys=self.key_properties,
schema=schema,
join_keys=join_keys,
)

else:
self.bulk_insert_records(
full_table_name=self.full_table_name,
schema=self.schema,
records=context["records"],
schema=schema,
records=conformed_records,
)

def merge_upsert_from_table(
Expand Down Expand Up @@ -258,3 +268,26 @@ def parse_full_table_name(
db_name, schema_name, table_name = parts

return db_name, schema_name, table_name

def snakecase(self, name):
name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
name = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name)
return name.lower()

def conform_name(self, name: str, object_type: Optional[str] = None) -> str:
"""Conform a stream property name to one suitable for the target system.
Transforms names to snake case by default, applicable to most common DBMSs'.
Developers may override this method to apply custom transformations
to database/schema/table/column names.
Args:
name: Property name.
object_type: One of ``database``, ``schema``, ``table`` or ``column``.
Returns:
The name transformed to snake case.
"""
# strip non-alphanumeric characters, keeping - . _ and spaces
name = re.sub(r"[^a-zA-Z0-9_\-\.\s]", "", name)
# convert to snakecase
name = self.snakecase(name)
# replace leading digit
return replace_leading_digit(name)
6 changes: 3 additions & 3 deletions target_mssql/tests/data_files/camelcase.singer
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"type": "SCHEMA", "stream": "TestCamelcase", "schema": {"type": "object", "properties": { "Id": {"type": "string"}, "clientName": {"type": "string"} }}, "key_properties": ["Id"]}
{"type": "RECORD", "stream": "TestCamelcase", "record": {"Id": "1", "clientName": "Gitter Windows Desktop App"}}
{"type": "RECORD", "stream": "TestCamelcase", "record": {"Id": "2", "clientName": "Gitter iOS App"}}
{"type": "SCHEMA", "stream": "TestCamelcase", "schema": {"type": "object", "properties": { "CustomerID": {"type": "string"}, "clientName": {"type": "string"} }}, "key_properties": ["CustomerID"]}
{"type": "RECORD", "stream": "TestCamelcase", "record": {"CustomerID": "1", "clientName": "Gitter Windows Desktop App"}}
{"type": "RECORD", "stream": "TestCamelcase", "record": {"CustomerID": "2", "clientName": "Gitter iOS App"}}
10 changes: 10 additions & 0 deletions target_mssql/tests/data_files/simple_continents.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"type": "SCHEMA", "stream": "nocontinents", "schema": {"properties": {"code": {"type": ["null", "string"]}, "name": {"type": ["null", "string"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "nocontinents", "record": {"code": "AF", "name": "Africa"}, "time_extracted": "2022-07-17T20:43:18.860687Z"}
{"type": "STATE", "value": {"bookmarks": {"nocontinents": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "nocontinents", "record": {"code": "AN", "name": "Antarctica"}, "time_extracted": "2022-07-17T20:43:18.860817Z"}
{"type": "RECORD", "stream": "nocontinents", "record": {"code": "AS", "name": "Asia"}, "time_extracted": "2022-07-17T20:43:18.860857Z"}
{"type": "RECORD", "stream": "nocontinents", "record": {"code": "EU", "name": "Europe"}, "time_extracted": "2022-07-17T20:43:18.860890Z"}
{"type": "RECORD", "stream": "nocontinents", "record": {"code": "NA", "name": "North America"}, "time_extracted": "2022-07-17T20:43:18.860922Z"}
{"type": "RECORD", "stream": "nocontinents", "record": {"code": "OC", "name": "Oceania"}, "time_extracted": "2022-07-17T20:43:18.860952Z"}
{"type": "RECORD", "stream": "nocontinents", "record": {"code": "SA", "name": "South America"}, "time_extracted": "2022-07-17T20:43:18.860983Z"}
{"type": "STATE", "value": {"bookmarks": {"nocontinents": {}}}}
15 changes: 15 additions & 0 deletions target_mssql/tests/data_files/simple_countries.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{"type": "SCHEMA", "stream": "countries", "schema": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}, "native": {"type": ["string", "null"]}, "phone": {"type": ["string", "null"]}, "capital": {"type": ["string", "null"]}, "currency": {"type": ["string", "null"]}, "emoji": {"type": ["string", "null"]}, "continent": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}}, "type": ["object", "null"]}, "languages": {"items": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}}, "type": "object"}, "type": ["array", "null"]}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "countries", "record": {"code": "AD", "name": "Andorra", "native": "Andorra", "phone": "376", "continent": {"code": "EU", "name": "Europe"}, "capital": "Andorra la Vella", "currency": "EUR", "languages": [{"code": "ca", "name": "Catalan"}], "emoji": "\ud83c\udde6\ud83c\udde9"}, "time_extracted": "2022-07-17T20:43:19.031894Z"}
{"type": "STATE", "value": {"bookmarks": {"continents": {}, "countries": {"starting_replication_value": null}}}}
{"type": "RECORD", "stream": "countries", "record": {"code": "AE", "name": "United Arab Emirates", "native": "\u062f\u0648\u0644\u0629 \u0627\u0644\u0625\u0645\u0627\u0631\u0627\u062a \u0627\u0644\u0639\u0631\u0628\u064a\u0629 \u0627\u0644\u0645\u062a\u062d\u062f\u0629", "phone": "971", "continent": {"code": "AS", "name": "Asia"}, "capital": "Abu Dhabi", "currency": "AED", "languages": [{"code": "ar", "name": "Arabic"}], "emoji": "\ud83c\udde6\ud83c\uddea"}, "time_extracted": "2022-07-17T20:43:19.032033Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AF", "name": "Afghanistan", "native": "\u0627\u0641\u063a\u0627\u0646\u0633\u062a\u0627\u0646", "phone": "93", "continent": {"code": "AS", "name": "Asia"}, "capital": "Kabul", "currency": "AFN", "languages": [{"code": "ps", "name": "Pashto"}, {"code": "uz", "name": "Uzbek"}, {"code": "tk", "name": "Turkmen"}], "emoji": "\ud83c\udde6\ud83c\uddeb"}, "time_extracted": "2022-07-17T20:43:19.032097Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AG", "name": "Antigua and Barbuda", "native": "Antigua and Barbuda", "phone": "1268", "continent": {"code": "NA", "name": "North America"}, "capital": "Saint John's", "currency": "XCD", "languages": [{"code": "en", "name": "English"}], "emoji": "\ud83c\udde6\ud83c\uddec"}, "time_extracted": "2022-07-17T20:43:19.032150Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AI", "name": "Anguilla", "native": "Anguilla", "phone": "1264", "continent": {"code": "NA", "name": "North America"}, "capital": "The Valley", "currency": "XCD", "languages": [{"code": "en", "name": "English"}], "emoji": "\ud83c\udde6\ud83c\uddee"}, "time_extracted": "2022-07-17T20:43:19.032198Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AL", "name": "Albania", "native": "Shqip\u00ebria", "phone": "355", "continent": {"code": "EU", "name": "Europe"}, "capital": "Tirana", "currency": "ALL", "languages": [{"code": "sq", "name": "Albanian"}], "emoji": "\ud83c\udde6\ud83c\uddf1"}, "time_extracted": "2022-07-17T20:43:19.032252Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AM", "name": "Armenia", "native": "\u0540\u0561\u0575\u0561\u057d\u057f\u0561\u0576", "phone": "374", "continent": {"code": "AS", "name": "Asia"}, "capital": "Yerevan", "currency": "AMD", "languages": [{"code": "hy", "name": "Armenian"}, {"code": "ru", "name": "Russian"}], "emoji": "\ud83c\udde6\ud83c\uddf2"}, "time_extracted": "2022-07-17T20:43:19.032298Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AO", "name": "Angola", "native": "Angola", "phone": "244", "continent": {"code": "AF", "name": "Africa"}, "capital": "Luanda", "currency": "AOA", "languages": [{"code": "pt", "name": "Portuguese"}], "emoji": "\ud83c\udde6\ud83c\uddf4"}, "time_extracted": "2022-07-17T20:43:19.032346Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AQ", "name": "Antarctica", "native": "Antarctica", "phone": "672", "continent": {"code": "AN", "name": "Antarctica"}, "capital": null, "currency": null, "languages": [], "emoji": "\ud83c\udde6\ud83c\uddf6"}, "time_extracted": "2022-07-17T20:43:19.032421Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AR", "name": "Argentina", "native": "Argentina", "phone": "54", "continent": {"code": "SA", "name": "South America"}, "capital": "Buenos Aires", "currency": "ARS", "languages": [{"code": "es", "name": "Spanish"}, {"code": "gn", "name": "Guarani"}], "emoji": "\ud83c\udde6\ud83c\uddf7"}, "time_extracted": "2022-07-17T20:43:19.032473Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AS", "name": "American Samoa", "native": "American Samoa", "phone": "1684", "continent": {"code": "OC", "name": "Oceania"}, "capital": "Pago Pago", "currency": "USD", "languages": [{"code": "en", "name": "English"}, {"code": "sm", "name": "Samoan"}], "emoji": "\ud83c\udde6\ud83c\uddf8"}, "time_extracted": "2022-07-17T20:43:19.032521Z"}
{"type": "RECORD", "stream": "countries", "record": {"code": "AT", "name": "Austria", "native": "\u00d6sterreich", "phone": "43", "continent": {"code": "EU", "name": "Europe"}, "capital": "Vienna", "currency": "EUR", "languages": [{"code": "de", "name": "German"}], "emoji": "\ud83c\udde6\ud83c\uddf9"}, "time_extracted": "2022-07-17T20:43:19.032568Z"}
{"type": "STATE", "value": {"bookmarks": {"continents": {}, "countries": {}}}}
17 changes: 15 additions & 2 deletions target_mssql/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def test_countries_to_mssql(mssql_config):
sync_end_to_end(tap, target)


def test_aapl_to_mssql(mssql_config):
@pytest.mark.skip("Can't handle objects yet")
def test_table(mssql_config):
tap = Fundamentals(config={}, state=None)
target = Targetmssql(config=mssql_config)
sync_end_to_end(tap, target)
Expand Down Expand Up @@ -99,7 +100,7 @@ def test_record_missing_required_property(mssql_target):

# TODO test that data is correctly set
# see target-sqllit/tests/test_target_sqllite.py
@pytest.mark.skip(reason="Waiting for SDK to handle this")
# @pytest.mark.skip(reason="Waiting for SDK to handle this")
def test_column_camel_case(mssql_target):
file_name = "camelcase.singer"
singer_file_to_target(file_name, mssql_target)
Expand Down Expand Up @@ -174,6 +175,7 @@ def test_encoded_string_data(mssql_target):
singer_file_to_target(file_name, mssql_target)


@pytest.mark.skip(reason="Can't handle objects yet")
def test_tap_appl(mssql_target):
file_name = "tap_aapl.singer"
singer_file_to_target(file_name, mssql_target)
Expand Down Expand Up @@ -209,3 +211,14 @@ def test_simple_stream(mssql_target):
def test_null_key(mssql_target):
file_name = "null_key.singer"
singer_file_to_target(file_name, mssql_target)


def test_simple_continents(mssql_target):
file_name = "simple_continents.singer"
singer_file_to_target(file_name, mssql_target)


@pytest.mark.skip(reason="TODO")
def test_simple_countries(mssql_target):
file_name = "simple_countries.singer"
singer_file_to_target(file_name, mssql_target)

0 comments on commit 63744f4

Please sign in to comment.