diff --git a/.circleci/config.yml b/.circleci/config.yml index 4c430e07..284c6945 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -39,7 +39,7 @@ jobs: name: 'pylint' command: | source /usr/local/share/virtualenvs/tap-shopify/bin/activate - pylint tap_shopify -d missing-docstring,too-many-branches,consider-using-f-string,consider-using-generator,consider-using-dict-items,unnecessary-dunder-call,duplicate-code + pylint tap_shopify -d missing-docstring,too-many-branches,consider-using-f-string,consider-using-generator,consider-using-dict-items,unnecessary-dunder-call,duplicate-code,global-statement json_validator: executor: docker-executor steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index f980a9fd..71059af8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 1.12.0 + * Update bookmark logic for transactions and order_refunds stream [#198](https://github.com/singer-io/tap-shopify/pull/198) + +## 1.11.0 + * Deprecate the streams for the older version. [#196](https://github.com/singer-io/tap-shopify/pull/196) + * Deprecated streams - products, inventory_items, metafields (product) + ## 1.10.0 * Updates the Shopify SDK to 12.3.0 * Updates API version used to 2024-01 diff --git a/setup.py b/setup.py index 30f2aec2..6f62a082 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="tap-shopify", - version="1.10.0", + version="1.12.0", description="Singer.io tap for extracting Shopify data", author="Stitch", url="http://github.com/singer-io/tap-shopify", diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index c6323c90..dc6929c9 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 import os -import datetime +from datetime import datetime, timezone import json import time import math @@ -13,13 +13,31 @@ from singer import metadata from singer import Transformer from tap_shopify.context import Context -from tap_shopify.exceptions import ShopifyError +from tap_shopify.exceptions import ShopifyError, ShopifyDeprecationError from tap_shopify.streams.base import shopify_error_handling, get_request_timeout import tap_shopify.streams # Load stream objects into Context REQUIRED_CONFIG_KEYS = ["shop", "api_key"] + LOGGER = singer.get_logger() SDC_KEYS = {'id': 'integer', 'name': 'string', 'myshopify_domain': 'string'} +DEPRECATED_STREAMS = ["products", "inventory_items"] +IS_METAFIELDS_SELECTED = False +SELECTED_DEPRECATED_STREAMS = [] +CUTOFF_DATE = datetime(2025, 3, 31, tzinfo=timezone.utc) +TODAY_UTC = utils.now() + +def raise_warning(): + + if "products" in SELECTED_DEPRECATED_STREAMS and IS_METAFIELDS_SELECTED: + SELECTED_DEPRECATED_STREAMS.append("metafields (product related)") + + if SELECTED_DEPRECATED_STREAMS and TODAY_UTC > CUTOFF_DATE: + raise ShopifyDeprecationError( + f"The {SELECTED_DEPRECATED_STREAMS} stream(s) are no longer supported after 31st March" + " 2025. Please upgrade to the latest version of tap-shopify, which supports GraphQL" + "endpoints for these streams." + ) @shopify_error_handling def initialize_shopify_client(): @@ -161,6 +179,18 @@ def sync(): LOGGER.info('Skipping stream: %s', stream_id) continue + if stream_id in DEPRECATED_STREAMS: + SELECTED_DEPRECATED_STREAMS.append(stream_id) + if TODAY_UTC > CUTOFF_DATE: + LOGGER.critical( + "The %s stream is no longer supported. Please upgrade to the latest " + "version of tap-shopify, which supports GraphQL endpoints for this stream.", + stream_id + ) + continue + if stream_id == 'metafields': + global IS_METAFIELDS_SELECTED + IS_METAFIELDS_SELECTED = True LOGGER.info('Syncing stream: %s', stream_id) if not Context.state.get('bookmarks'): @@ -211,6 +241,7 @@ def main(): Context.catalog = discover() sync() + raise_warning() except pyactiveresource.connection.ResourceNotFound as exc: raise ShopifyError(exc, 'Ensure shop is entered correctly') from exc except pyactiveresource.connection.UnauthorizedAccess as exc: @@ -224,6 +255,8 @@ def main(): msg = body.get('errors') finally: raise ShopifyError(exc, msg) from exc + except ShopifyDeprecationError as exc: + raise ShopifyDeprecationError(exc) from None except Exception as exc: raise ShopifyError(exc) from exc diff --git a/tap_shopify/exceptions.py b/tap_shopify/exceptions.py index 94fa62dd..1284f2a5 100644 --- a/tap_shopify/exceptions.py +++ b/tap_shopify/exceptions.py @@ -1,3 +1,6 @@ class ShopifyError(Exception): def __init__(self, error, msg=''): super().__init__('{}\n{}'.format(error.__class__.__name__, msg)) + +class ShopifyDeprecationError(Exception): + pass diff --git a/tap_shopify/streams/metafields.py b/tap_shopify/streams/metafields.py index 2c62d1d0..7f16e359 100644 --- a/tap_shopify/streams/metafields.py +++ b/tap_shopify/streams/metafields.py @@ -1,6 +1,8 @@ import json +from datetime import datetime, timezone import shopify import singer +from singer import utils from tap_shopify.context import Context from tap_shopify.streams.base import (Stream, @@ -9,9 +11,15 @@ OutOfOrderIdsError) LOGGER = singer.get_logger() +CUTOFF_DATE = datetime(2025, 3, 31, tzinfo=timezone.utc) +TODAY_UTC = utils.now() +PARENT_STREAMS = ['orders', 'customers', 'products', 'custom_collections'] def get_selected_parents(): - for parent_stream in ['orders', 'customers', 'products', 'custom_collections']: + # Shopify has sunset the products REST API endpoint on 31st January 2025 + if TODAY_UTC > CUTOFF_DATE: + PARENT_STREAMS.remove('products') + for parent_stream in PARENT_STREAMS: if Context.is_selected(parent_stream): yield Context.stream_objects[parent_stream]() diff --git a/tap_shopify/streams/order_refunds.py b/tap_shopify/streams/order_refunds.py index 23e13c8a..6f13cd9a 100644 --- a/tap_shopify/streams/order_refunds.py +++ b/tap_shopify/streams/order_refunds.py @@ -1,5 +1,6 @@ import shopify -from singer.utils import strftime, strptime_to_utc +import singer +from singer.utils import strptime_to_utc from tap_shopify.context import Context from tap_shopify.streams.base import (Stream, shopify_error_handling, @@ -10,6 +11,7 @@ class OrderRefunds(Stream): name = 'order_refunds' replication_object = shopify.Refund replication_key = 'created_at' + parent_stream = None @shopify_error_handling def get_refunds(self, parent_object, since_id): @@ -24,6 +26,7 @@ def get_refunds(self, parent_object, since_id): def get_objects(self): selected_parent = Context.stream_objects['orders']() selected_parent.name = "refund_orders" + self.parent_stream = selected_parent # Page through all `orders`, bookmarking at `refund_orders` for parent_object in selected_parent.get_objects(): @@ -44,7 +47,6 @@ def get_objects(self): def sync(self): bookmark = self.get_bookmark() - max_bookmark = bookmark for refund in self.get_objects(): refund_dict = refund.to_dict() replication_value = strptime_to_utc(refund_dict[self.replication_key]) @@ -54,10 +56,8 @@ def sync(self): canonicalize(transaction_dict, field_name) yield refund_dict - if replication_value > max_bookmark: - max_bookmark = replication_value - - self.update_bookmark(strftime(max_bookmark)) - + max_bookmark = singer.get_bookmark( + Context.state, self.parent_stream.name, self.parent_stream.replication_key) + self.update_bookmark(max_bookmark) Context.stream_objects['order_refunds'] = OrderRefunds diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py index 6615a7cd..652efaa4 100644 --- a/tap_shopify/streams/transactions.py +++ b/tap_shopify/streams/transactions.py @@ -1,6 +1,6 @@ import shopify import singer -from singer.utils import strftime, strptime_to_utc +from singer.utils import strptime_to_utc from tap_shopify.context import Context from tap_shopify.streams.base import (Stream, shopify_error_handling, @@ -17,6 +17,7 @@ class Transactions(Stream): name = 'transactions' replication_key = 'created_at' replication_object = shopify.Transaction + parent_stream = None # Added decorator over functions of shopify SDK replication_object.find = shopify_error_handling(replication_object.find) # Transactions have no updated_at property. Therefore we have @@ -59,6 +60,7 @@ def get_objects(self): # Get transactions, bookmarking at `transaction_orders` selected_parent = Context.stream_objects['orders']() selected_parent.name = "transaction_orders" + self.parent_stream = selected_parent # Page through all `orders`, bookmarking at `transaction_orders` for parent_object in selected_parent.get_objects(): @@ -68,7 +70,6 @@ def get_objects(self): def sync(self): bookmark = self.get_bookmark() - max_bookmark = bookmark for transaction in self.get_objects(): transaction_dict = transaction.to_dict() replication_value = strptime_to_utc(transaction_dict[self.replication_key]) @@ -77,9 +78,8 @@ def sync(self): canonicalize(transaction_dict, field_name) yield transaction_dict - if replication_value > max_bookmark: - max_bookmark = replication_value - - self.update_bookmark(strftime(max_bookmark)) + max_bookmark = singer.get_bookmark( + Context.state, self.parent_stream.name, self.parent_stream.replication_key) + self.update_bookmark(max_bookmark) Context.stream_objects['transactions'] = Transactions diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index c6d29b3c..c3d8a448 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -77,7 +77,7 @@ def bookmarks_test(self, conn_id, testable_streams): #simulated_states = self.calculated_states_by_stream(first_sync_bookmark) # We are hardcoding the updated state to ensure that we get atleast 1 record in second sync. These values have been provided after reviewing the max bookmark value for each of the streams - simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transactions': {'created_at': '2021-12-20T00:08:52-05:00'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}} + simulated_states = {'products': {'updated_at': '2024-09-14T03:01:11.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transactions': {'created_at': '2021-12-20T00:08:52-05:00'}, 'metafields': {'updated_at': '2025-01-21T13:28:24.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2024-12-13T08:42:56.000000Z'}} for stream, updated_state in simulated_states.items(): new_state['bookmarks'][stream] = updated_state @@ -148,7 +148,7 @@ def bookmarks_test(self, conn_id, testable_streams): for record in second_sync_messages: replication_key_value = record.get(replication_key) # verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks - self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark") + self.assertGreaterEqual(self.convert_state_to_utc(replication_key_value), simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark") # verify the 2nd sync bookmark value is the max replication key value for a given stream self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced") diff --git a/tests/test_interrupted_sync.py b/tests/test_interrupted_sync.py index d34aff54..e630fb1a 100644 --- a/tests/test_interrupted_sync.py +++ b/tests/test_interrupted_sync.py @@ -87,15 +87,13 @@ def test_run(self): completed_streams = stream_groups.get('completed') yet_to_be_synced_streams = stream_groups.get('yet_to_be_synced') - base_state = {'bookmarks': - {'currently_sync_stream': currently_syncing_stream, - 'customers': {'updated_at': '2023-03-28T18:53:28.000000Z'}, - 'events': {'created_at': '2023-01-22T05:05:53.000000Z'}, - 'metafields': {'updated_at': '2023-01-07T21:18:05.000000Z'}, - 'orders': {'updated_at': '2023-01-22T05:07:44.000000Z'}, - 'products': {'updated_at': '2023-01-22T05:05:56.000000Z'}, - 'transactions': {'created_at': '2022-06-26T00:06:38-04:00'} - }} + base_state = {"bookmarks": {"currently_sync_stream": currently_syncing_stream, + "customers": first_sync_state.get("bookmarks").get("customers"), + "events": first_sync_state.get("bookmarks").get("events"), + "metafields": first_sync_state.get("bookmarks").get("metafields"), + "orders": first_sync_state.get("bookmarks").get("orders"), + "products": first_sync_state.get("bookmarks").get("products"), + "transactions": first_sync_state.get("bookmarks").get("transactions"),}} # remove yet to be synced streams from base state and then set new state new_state = { @@ -196,18 +194,8 @@ def test_run(self): self.assertIsNotNone(resuming_bookmark_value) self.assertTrue(self.is_expected_date_format(resuming_bookmark_value)) - # verify the resuming bookmark is greater than 1st sync bookmark - # This is the expected behaviour for shopify as they are using date windowing - # TDL-17096 : Resuming bookmark value is getting assigned from execution time - # rather than the actual bookmark time for some streams. - # TODO transactions stream has equal bookmarks, orders stream has shown both equal - # and greater than bookmark behavior, confirm if this is correct - if stream == 'transactions': - self.assertEqual(resuming_bookmark_value, first_bookmark_value) - elif stream == 'orders': - self.assertGreaterEqual(resuming_bookmark_value, first_bookmark_value) - else: - self.assertGreater(resuming_bookmark_value, first_bookmark_value) + # verify the resuming bookmark is greater or equal than 1st sync bookmark + self.assertGreaterEqual(resuming_bookmark_value, first_bookmark_value) # verify oldest record from resuming sync respects bookmark from previous sync if stream in new_state['bookmarks'].keys() and resuming_sync_messages: