From a8dbabc80418fd73ef2eeef4927a3695c35af1a4 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Fri, 8 Oct 2021 17:36:01 +0530 Subject: [PATCH 1/6] Added comments for better readability --- tap_facebook/__init__.py | 43 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 126d46d4..5c7a88bd 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -98,6 +98,9 @@ def transform_datetime_string(dts): return singer.strftime(parsed_dt) def iter_delivery_info_filter(stream_type): + """ + Prepare filter paramaters for ads, adsets and campaigns + """ filt = { "field": stream_type + ".delivery_info", "operator": "IN", @@ -172,6 +175,7 @@ class Stream(object): catalog_entry = attr.ib() def automatic_fields(self): + # Prepare set of fields with automatic inclusion fields = set() if self.catalog_entry: props = metadata.to_map(self.catalog_entry.metadata) @@ -185,6 +189,7 @@ def automatic_fields(self): def fields(self): + # prepare set of selected and automatic fields fields = set() if self.catalog_entry: props = metadata.to_map(self.catalog_entry.metadata) @@ -295,6 +300,7 @@ def _call_get_ads(self, params): def __iter__(self): def do_request(): + # Set a params for ads without deleted objects params = {'limit': RESULT_RETURN_LIMIT} if self.current_bookmark: params.update({'filtering': [{'field': 'ad.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}]}) @@ -303,8 +309,10 @@ def do_request(): def do_request_multiple(): params = {'limit': RESULT_RETURN_LIMIT} bookmark_params = [] + # Set a params for date filter based on bookmark if self.current_bookmark: bookmark_params.append({'field': 'ad.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}) + # Set a params for ads with deleted objects also when include_deleted is set true for del_info_filt in iter_delivery_info_filter('ad'): params.update({'filtering': [del_info_filt] + bookmark_params}) filt_ads = self._call_get_ads(params) @@ -314,6 +322,7 @@ def do_request_multiple(): def prepare_record(ad): return ad.api_get(fields=self.fields()).export_all_data() + # Call a request function based on include_deleted parameter if CONFIG.get('include_deleted', 'false').lower() == 'true': ads = do_request_multiple() else: @@ -339,6 +348,7 @@ def _call_get_ad_sets(self, params): def __iter__(self): def do_request(): + # Set a params for ads without deleted objects params = {'limit': RESULT_RETURN_LIMIT} if self.current_bookmark: params.update({'filtering': [{'field': 'adset.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}]}) @@ -347,8 +357,10 @@ def do_request(): def do_request_multiple(): params = {'limit': RESULT_RETURN_LIMIT} bookmark_params = [] + # Set a params for date filter based on bookmark if self.current_bookmark: bookmark_params.append({'field': 'adset.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}) + # Set a params for ads with deleted objects also when include_deleted is set true for del_info_filt in iter_delivery_info_filter('adset'): params.update({'filtering': [del_info_filt] + bookmark_params}) filt_adsets = self._call_get_ad_sets(params) @@ -358,6 +370,7 @@ def do_request_multiple(): def prepare_record(ad_set): return ad_set.api_get(fields=self.fields()).export_all_data() + # Call a request function based on include_deleted parameter if CONFIG.get('include_deleted', 'false').lower() == 'true': ad_sets = do_request_multiple() else: @@ -385,6 +398,7 @@ def __iter__(self): pull_ads = 'ads' in props def do_request(): + # Set a params for ads without deleted objects params = {'limit': RESULT_RETURN_LIMIT} if self.current_bookmark: params.update({'filtering': [{'field': 'campaign.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}]}) @@ -393,8 +407,10 @@ def do_request(): def do_request_multiple(): params = {'limit': RESULT_RETURN_LIMIT} bookmark_params = [] + # Set a params for date filter based on bookmark if self.current_bookmark: bookmark_params.append({'field': 'campaign.' + UPDATED_TIME_KEY, 'operator': 'GREATER_THAN', 'value': self.current_bookmark.int_timestamp}) + # Set a params for ads with deleted objects also when include_deleted is set true for del_info_filt in iter_delivery_info_filter('campaign'): params.update({'filtering': [del_info_filt] + bookmark_params}) filt_campaigns = self._call_get_campaigns(params) @@ -411,6 +427,7 @@ def prepare_record(campaign): campaign_out['ads']['data'].append({'id': ad_id}) return campaign_out + # Call a request function based on include_deleted parameter if CONFIG.get('include_deleted', 'false').lower() == 'true': campaigns = do_request_multiple() else: @@ -427,6 +444,7 @@ class Leads(Stream): key_properties = ['id'] def compare_lead_created_times(self, leadA, leadB): + # Compare two leads and return lead with max replication key if leadA is None: return leadB timestampA = pendulum.parse(leadA[self.replication_key]) @@ -471,11 +489,13 @@ def sync_batches(self, stream_objects): @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) def get_ads(self): + # Set a parameters and get ads params = {'limit': RESULT_RETURN_LIMIT} yield from self.account.get_ads(params=params) @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) def get_leads(self, ads, start_time, previous_start_time): + # Set a parameters and get leads start_time = int(start_time.timestamp()) # Get unix timestamp params = {'limit': RESULT_RETURN_LIMIT, 'filtering': [{'field': 'time_created', @@ -488,6 +508,7 @@ def get_leads(self, ads, start_time, previous_start_time): yield from ad.get_leads(params=params) def sync(self): + # Get a bookmark or start date for leads start_time = pendulum.utcnow() previous_start_time = self.state.get("bookmarks", {}).get("leads", {}).get(self.replication_key, CONFIG.get('start_date')) @@ -496,6 +517,7 @@ def sync(self): leads = self.get_leads(ads, start_time, int(previous_start_time.timestamp())) latest_lead_time = self.sync_batches(leads) + # Write bookmark and update state if latest lead time found from data if not latest_lead_time is None: singer.write_bookmark(self.state, 'leads', self.replication_key, latest_lead_time) singer.write_state(self.state) @@ -517,6 +539,7 @@ def sync(self): ] def get_start(stream, bookmark_key): + # Get start time by bookmark of state or start date tap_stream_id = stream.name state = stream.state or {} current_bookmark = singer.get_bookmark(state, tap_stream_id, bookmark_key) @@ -586,6 +609,7 @@ def job_params(self): if CONFIG.get('insights_buffer_days'): buffer_days = int(CONFIG.get('insights_buffer_days')) + # Set start date and end date using buffer days and facebook retention period buffered_start_date = start_date.subtract(days=buffer_days) min_start_date = pendulum.today().subtract(months=self.FACEBOOK_INSIGHTS_RETENTION_PERIOD) if buffered_start_date < min_start_date: @@ -723,10 +747,11 @@ def initialize_stream(account, catalog_entry, state): # pylint: disable=too-many def get_streams_to_sync(account, catalog, state): + # Initialize and return streams selected in catalog streams = [] for stream in STREAMS: catalog_entry = next((s for s in catalog.streams if s.tap_stream_id == stream), None) - if catalog_entry and catalog_entry.is_selected(): + if catalog_entry and catalog_entry.is_selected(): # Check if stream is selected in catalog # TODO: Don't need name and stream_alias since it's on catalog_entry name = catalog_entry.stream stream_alias = catalog_entry.stream_alias @@ -734,16 +759,20 @@ def get_streams_to_sync(account, catalog, state): return streams def transform_date_hook(data, typ, schema): + # Transform provided data into date format if type is 'date-time' if typ == 'string' and schema.get('format') == 'date-time' and isinstance(data, str): transformed = transform_datetime_string(data) return transformed return data def do_sync(account, catalog, state): + # Get streams selected in catalog streams_to_sync = get_streams_to_sync(account, catalog, state) refs = load_shared_schema_refs() + # Loop over streams selected in catalog for stream in streams_to_sync: LOGGER.info('Syncing %s, fields %s', stream.name, stream.fields()) + # Resolve schema references and write schema in output schema = singer.resolve_schema_references(load_schema(stream), refs) metadata_map = metadata.to_map(stream.catalog_entry.metadata) bookmark_key = BOOKMARK_KEYS.get(stream.name) @@ -758,6 +787,7 @@ def do_sync(account, catalog, state): with Transformer(pre_hook=transform_date_hook) as transformer: with metrics.record_counter(stream.name) as counter: for message in stream: + # Transform data as per field selection and write records if 'record' in message: counter.increment() time_extracted = utils.now() @@ -774,6 +804,7 @@ def get_abs_path(path): def load_schema(stream): + # Load schema of all streams from schemas path = get_abs_path('schemas/{}.json'.format(stream.name)) schema = utils.load_json(path) @@ -792,12 +823,14 @@ def discover_schemas(): streams = initialize_streams_for_discovery() for stream in streams: LOGGER.info('Loading schema for %s', stream.name) + # Resolve references and make final schema schema = singer.resolve_schema_references(load_schema(stream), refs) mdata = metadata.to_map(metadata.get_standard_metadata(schema, key_properties=stream.key_properties)) bookmark_key = BOOKMARK_KEYS.get(stream.name) + # Set automatic inclusion for bookmark keys if bookmark_key == UPDATED_TIME_KEY or bookmark_key == CREATED_TIME_KEY : mdata = metadata.write(mdata, ('properties', bookmark_key), 'inclusion', 'automatic') @@ -808,6 +841,7 @@ def discover_schemas(): return result def load_shared_schema_refs(): + # Load dictionary with references(shared) schemas shared_schemas_path = get_abs_path('schemas/shared') shared_file_names = [f for f in os.listdir(shared_schemas_path) @@ -821,6 +855,7 @@ def load_shared_schema_refs(): return shared_schema_refs def do_discover(): + # Discover schemas and dump in STDOUT LOGGER.info('Loading schemas') json.dump(discover_schemas(), sys.stdout, indent=4) @@ -836,10 +871,12 @@ def main_impl(): global RESULT_RETURN_LIMIT RESULT_RETURN_LIMIT = CONFIG.get('result_return_limit', RESULT_RETURN_LIMIT) + # Set FacebookAdsApi with provided access token global API API = FacebookAdsApi.init(access_token=access_token) user = fb_user.User(fbid='me') + # Get accounts for provided creds and validate account accounts = user.get_ad_accounts() account = None for acc in accounts: @@ -850,12 +887,12 @@ def main_impl(): except FacebookError as fb_error: raise_from(SingerConfigurationError, fb_error) - if args.discover: + if args.discover: # Discover mode try: do_discover() except FacebookError as fb_error: raise_from(SingerDiscoveryError, fb_error) - elif args.properties: + elif args.properties: # Sync mode catalog = Catalog.from_dict(args.properties) try: do_sync(account, catalog, args.state) From 5402c465b733e03e743482242d671fe4305ba55e Mon Sep 17 00:00:00 2001 From: Harsh <80324346+harshpatel4crest@users.noreply.github.com> Date: Fri, 19 Nov 2021 13:24:07 +0530 Subject: [PATCH 2/6] TDL-9728: Stream `ads_insights_age_gender` has unexpected datatype for replication key field `date_start` (#172) * added format as date-time in schema file * added code coverage * added check for date format in the bookmark test * added the check for first sync messages Co-authored-by: namrata270998 --- .circleci/config.yml | 8 ++++- .../schemas/ads_insights_age_and_gender.json | 6 ++-- .../ads_insights_hourly_advertiser.json | 6 ++-- tests/test_facebook_bookmarks.py | 29 +++++++++++++++---- 4 files changed, 39 insertions(+), 10 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 77a8115a..523b94bc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -53,7 +53,13 @@ jobs: name: 'Run Unit Tests' command: | source /usr/local/share/virtualenvs/tap-facebook/bin/activate - nosetests tests/unittests + pip install nose coverage + nosetests --with-coverage --cover-erase --cover-package=tap_facebook --cover-html-dir=htmlcov tests/unittests + coverage html + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov - slack/notify-on-failure: only_for_branches: master run_integration_tests: diff --git a/tap_facebook/schemas/ads_insights_age_and_gender.json b/tap_facebook/schemas/ads_insights_age_and_gender.json index f17f3dca..e1d91df6 100644 --- a/tap_facebook/schemas/ads_insights_age_and_gender.json +++ b/tap_facebook/schemas/ads_insights_age_and_gender.json @@ -25,7 +25,8 @@ "type": [ "null", "string" - ] + ], + "format": "date-time" }, "ad_id": { "type": [ @@ -283,7 +284,8 @@ "type": [ "null", "string" - ] + ], + "format": "date-time" }, "objective": { "type": [ diff --git a/tap_facebook/schemas/ads_insights_hourly_advertiser.json b/tap_facebook/schemas/ads_insights_hourly_advertiser.json index 70f8c8e1..27a3e9f5 100644 --- a/tap_facebook/schemas/ads_insights_hourly_advertiser.json +++ b/tap_facebook/schemas/ads_insights_hourly_advertiser.json @@ -165,13 +165,15 @@ "type": [ "null", "string" - ] + ], + "format": "date-time" }, "date_stop": { "type": [ "null", "string" - ] + ], + "format": "date-time" }, "engagement_rate_ranking": { "type": [ diff --git a/tests/test_facebook_bookmarks.py b/tests/test_facebook_bookmarks.py index ada0da17..6efe77ee 100644 --- a/tests/test_facebook_bookmarks.py +++ b/tests/test_facebook_bookmarks.py @@ -84,6 +84,16 @@ def calculated_states_by_stream(self, current_state): return stream_to_calculated_state + # function for verifying the date format + def is_expected_date_format(self, date): + try: + # parse date + datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + # return False if date is in not expected format + return False + # return True in case of no error + return True def test_run(self): expected_streams = self.expected_streams() @@ -193,14 +203,16 @@ def bookmarks_test(self, expected_streams): for record in second_sync_messages: + # for "ads_insights_age_and_gender" and "ads_insights_hourly_advertiser" + # verify that the "date_start" and "date_stop" is in expected format + if stream in ["ads_insights_age_and_gender", "ads_insights_hourly_advertiser"]: + date_start = record.get("date_start") + self.assertTrue(self.is_expected_date_format(date_start)) + date_stop = record.get("date_stop") + self.assertTrue(self.is_expected_date_format(date_stop)) # Verify the second sync records respect the previous (simulated) bookmark value replication_key_value = record.get(replication_key) - if stream in {'ads_insights_age_and_gender', 'ads_insights_hourly_advertiser'}: # BUG | https://stitchdata.atlassian.net/browse/SRCE-4873 - replication_key_value = datetime.datetime.strftime( - dateutil.parser.parse(replication_key_value), - self.BOOKMARK_COMPARISON_FORMAT - ) self.assertGreaterEqual(replication_key_value, simulated_bookmark_minus_lookback, msg="Second sync records do not repect the previous bookmark.") @@ -211,6 +223,13 @@ def bookmarks_test(self, expected_streams): ) for record in first_sync_messages: + # for "ads_insights_age_and_gender" and "ads_insights_hourly_advertiser" + # verify that the "date_start" and "date_stop" is in expected format + if stream in ["ads_insights_age_and_gender", "ads_insights_hourly_advertiser"]: + date_start = record.get("date_start") + self.assertTrue(self.is_expected_date_format(date_start)) + date_stop = record.get("date_stop") + self.assertTrue(self.is_expected_date_format(date_stop)) # Verify the first sync bookmark value is the max replication key value for a given stream replication_key_value = record.get(replication_key) From fc9abc8e4805445d43c9464eab396b42b553ab33 Mon Sep 17 00:00:00 2001 From: savan-chovatiya <80703490+savan-chovatiya@users.noreply.github.com> Date: Fri, 19 Nov 2021 13:41:28 +0530 Subject: [PATCH 3/6] TDL-9809: `forced-replication-method` missing from metadata for some streams and TDL-9872: replication keys are not specified as expected in discoverable metadata (#167) * added valid replication keys in catalog * modified the code * TDL-9809: Added replication keys in metadata * adde code coverage * Resolved review comments Co-authored-by: harshpatel4_crest Co-authored-by: namrata270998 --- tap_facebook/__init__.py | 11 +++++++-- tests/test_facebook_discovery.py | 42 +++++++++++--------------------- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 126d46d4..63512dac 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -170,6 +170,7 @@ class Stream(object): account = attr.ib() stream_alias = attr.ib() catalog_entry = attr.ib() + replication_method = 'FULL_TABLE' def automatic_fields(self): fields = set() @@ -200,6 +201,7 @@ def fields(self): class IncrementalStream(Stream): state = attr.ib() + replication_method = 'INCREMENTAL' def __attrs_post_init__(self): self.current_bookmark = get_start(self, UPDATED_TIME_KEY) @@ -425,6 +427,7 @@ class Leads(Stream): replication_key = "created_time" key_properties = ['id'] + replication_method = 'INCREMENTAL' def compare_lead_created_times(self, leadA, leadB): if leadA is None: @@ -554,6 +557,7 @@ def advance_bookmark(stream, bookmark_key, date): @attr.s class AdsInsights(Stream): base_properties = ['campaign_id', 'adset_id', 'ad_id', 'date_start'] + replication_method = 'INCREMENTAL' state = attr.ib() options = attr.ib() @@ -794,10 +798,13 @@ def discover_schemas(): LOGGER.info('Loading schema for %s', stream.name) schema = singer.resolve_schema_references(load_schema(stream), refs) + bookmark_key = BOOKMARK_KEYS.get(stream.name) + mdata = metadata.to_map(metadata.get_standard_metadata(schema, - key_properties=stream.key_properties)) + key_properties=stream.key_properties, + replication_method=stream.replication_method, + valid_replication_keys=[bookmark_key] if bookmark_key else None)) - bookmark_key = BOOKMARK_KEYS.get(stream.name) if bookmark_key == UPDATED_TIME_KEY or bookmark_key == CREATED_TIME_KEY : mdata = metadata.write(mdata, ('properties', bookmark_key), 'inclusion', 'automatic') diff --git a/tests/test_facebook_discovery.py b/tests/test_facebook_discovery.py index 46e04fc0..a97947e9 100644 --- a/tests/test_facebook_discovery.py +++ b/tests/test_facebook_discovery.py @@ -84,40 +84,26 @@ def test_run(self): msg="There is NOT only one top level breadcrumb for {}".format(stream) + \ "\nstream_properties | {}".format(stream_properties)) - # BUG_1 | https://stitchdata.atlassian.net/browse/SRCE-4855 - failing_with_no_replication_keys = { - 'ads_insights_country', 'adsets', 'adcreative', 'ads', 'ads_insights_region', - 'campaigns', 'ads_insights_age_and_gender', 'ads_insights_platform_and_device', - 'ads_insights_dma', 'ads_insights', 'leads', 'ads_insights_hourly_advertiser' - } - if stream not in failing_with_no_replication_keys: # BUG_1 - # verify replication key(s) match expectations - self.assertSetEqual( - expected_replication_keys, actual_replication_keys - ) + # verify replication key(s) match expectations + self.assertSetEqual( + expected_replication_keys, actual_replication_keys + ) # verify primary key(s) match expectations self.assertSetEqual( expected_primary_keys, actual_primary_keys, ) - # BUG_2 | https://stitchdata.atlassian.net/browse/SRCE-4856 - failing_with_no_replication_method = { - 'ads_insights_country', 'adsets', 'adcreative', 'ads', 'ads_insights_region', - 'campaigns', 'ads_insights_age_and_gender', 'ads_insights_platform_and_device', - 'ads_insights_dma', 'ads_insights', 'leads', 'ads_insights_hourly_advertiser' - } - if stream not in failing_with_no_replication_method: # BUG_2 - # verify the replication method matches our expectations - self.assertEqual( - expected_replication_method, actual_replication_method - ) - - # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL - if actual_replication_keys: - self.assertEqual(self.INCREMENTAL, actual_replication_method) - else: - self.assertEqual(self.FULL_TABLE, actual_replication_method) + # verify the replication method matches our expectations + self.assertEqual( + expected_replication_method, actual_replication_method + ) + + # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + if actual_replication_keys: + self.assertEqual(self.INCREMENTAL, actual_replication_method) + else: + self.assertEqual(self.FULL_TABLE, actual_replication_method) # verify that primary keys and replication keys # are given the inclusion of automatic in metadata. From 12bb05d04ef076241fc9a894d6c6fe0c166c4bc5 Mon Sep 17 00:00:00 2001 From: savan-chovatiya <80703490+savan-chovatiya@users.noreply.github.com> Date: Fri, 19 Nov 2021 14:00:11 +0530 Subject: [PATCH 4/6] TDL-7455: Add tap-tester test to verify replication of deleted records (#168) * TDL-7455: Added archived data integration test * TDL-7455: Updated integration test * added code coverage * Resolved review comment Co-authored-by: namrata270998 --- .circleci/config.yml | 14 ++++ tests/test_facebook_archived_data.py | 112 +++++++++++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 tests/test_facebook_archived_data.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 523b94bc..83f84179 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -160,6 +160,12 @@ workflows: test_name: "test_facebook_tests_run.py" requires: - ensure_env + - run_integration_tests: + context: circleci-user + name: "test_facebook_archived_data.py" + test_name: "test_facebook_archived_data.py" + requires: + - ensure_env - build: context: circleci-user requires: @@ -171,6 +177,7 @@ workflows: - test_facebook_bookmarks.py - test_facebook_automatic_fields.py - test_facebook_tests_run.py + - test_facebook_archived_data.py - deploy: context: circleci-user requires: @@ -226,6 +233,12 @@ workflows: test_name: "test_facebook_tests_run.py" requires: - ensure_env + - run_integration_tests: + context: circleci-user + name: "test_facebook_archived_data.py" + test_name: "test_facebook_archived_data.py" + requires: + - ensure_env - build: context: circleci-user requires: @@ -237,6 +250,7 @@ workflows: - test_facebook_bookmarks.py - test_facebook_automatic_fields.py - test_facebook_tests_run.py + - test_facebook_archived_data.py triggers: - schedule: cron: "0 6 * * *" diff --git a/tests/test_facebook_archived_data.py b/tests/test_facebook_archived_data.py new file mode 100644 index 00000000..c86113b6 --- /dev/null +++ b/tests/test_facebook_archived_data.py @@ -0,0 +1,112 @@ +import os + +from tap_tester import connections, runner, menagerie + +from base import FacebookBaseTest + +class FacebookArchivedData(FacebookBaseTest): + + @staticmethod + def name(): + return "tap_tester_facebook_archived_data" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), + 'start_date' : '2021-10-06T00:00:00Z', + 'end_date' : '2021-10-07T00:00:00Z', + 'insights_buffer_days': '1', + 'include_deleted': 'false' + } + if original: + return return_value + + return_value["include_deleted"] = 'true' + return return_value + + def test_run(self): + ''' + Testing the archived data with 'include_deleted' parameter + ''' + # include_deleted is supported for below streams only + expected_streams = ['ads', 'adsets', 'campaigns'] + + ########################################################################## + ### First Sync with include_deleted = false + ########################################################################## + + # instantiate connection with the include_deleted = false + conn_id_1 = connections.ensure_connection(self) + + # run check mode + found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) + + # table and field selection + test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id_1, test_catalogs_1_all_fields, select_all_fields=True) + + # run initial sync + record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1) + synced_records_1 = runner.get_records_from_target_output() + + ########################################################################## + ### Second Sync with include_deleted = true + ########################################################################## + + # create a new connection with the include_deleted = true + conn_id_2 = connections.ensure_connection(self, original_properties=False) + + # run check mode + found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2) + + # table and field selection + test_catalogs_2_all_fields = [catalog for catalog in found_catalogs_2 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id_2, test_catalogs_2_all_fields, select_all_fields=True) + + # run sync + record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2) + synced_records_2 = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected primary keys + expected_primary_keys = self.expected_primary_keys()[stream] + + # collect information about count of records + record_count_sync_1 = record_count_by_stream_1.get(stream, 0) + record_count_sync_2 = record_count_by_stream_2.get(stream, 0) + + # collect list and set of primary keys for all the records + primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_1.get(stream).get('messages') + if message.get('action') == 'upsert'] + primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_2.get(stream).get('messages') + if message.get('action') == 'upsert'] + primary_keys_sync_1 = set(primary_keys_list_1) + primary_keys_sync_2 = set(primary_keys_list_2) + + # collect list of effective_status for all the records + records_status_sync1 = [message.get('data').get('effective_status') + for message in synced_records_1.get(stream).get('messages') + if message.get('action') == 'upsert'] + records_status_sync2 = [message.get('data').get('effective_status') + for message in synced_records_2.get(stream).get('messages') + if message.get('action') == 'upsert'] + + # Verifying that no ARCHIVED records are returned for sync 1 + self.assertNotIn('ARCHIVED', records_status_sync1) + + # Verifying that ARCHIVED records are returned for sync 2 + self.assertIn('ARCHIVED', records_status_sync2) + + # Verify the number of records replicated in sync 2 is greater than the number + # of records replicated in sync 1 + self.assertGreater(record_count_sync_2, record_count_sync_1) + + # Verify the records replicated in sync 1 were also replicated in sync 2 + self.assertTrue(primary_keys_sync_1.issubset(primary_keys_sync_2)) From 923761bda2c6c5476988ac7dfa6380c9f99689a9 Mon Sep 17 00:00:00 2001 From: Harsh <80324346+harshpatel4crest@users.noreply.github.com> Date: Fri, 19 Nov 2021 17:20:18 +0530 Subject: [PATCH 5/6] TDL-7596: Add tap-tester test for attribution window (#169) * added tap tester test for attribution window * updated the code * added code coverage * updated the code according to the comments * updated code to raise error when attribution window is not 1, 7, 28 * test: run invalid attribution window intergation test * updated test case * test: updated test case code * test: test invalid attribution window * test: test invalid attribution window * test: test invalid attribution window * test: test invalid attribution window * test: run invalid attribution window test case * added intergation test for invalid sttribution window Co-authored-by: namrata270998 --- .circleci/config.yml | 28 ++++++ tap_facebook/__init__.py | 13 +-- tests/test_facebook_attribution_window.py | 89 +++++++++++++++++++ ...est_facebook_invalid_attribution_window.py | 40 +++++++++ tests/unittests/test_attribution_window.py | 29 ++++++ 5 files changed, 194 insertions(+), 5 deletions(-) create mode 100644 tests/test_facebook_attribution_window.py create mode 100644 tests/test_facebook_invalid_attribution_window.py create mode 100644 tests/unittests/test_attribution_window.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 83f84179..8dc4b860 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -166,6 +166,18 @@ workflows: test_name: "test_facebook_archived_data.py" requires: - ensure_env + - run_integration_tests: + context: circleci-user + name: "test_facebook_attribution_window.py" + test_name: "test_facebook_attribution_window.py" + requires: + - ensure_env + - run_integration_tests: + context: circleci-user + name: "test_facebook_invalid_attribution_window.py" + test_name: "test_facebook_invalid_attribution_window.py" + requires: + - ensure_env - build: context: circleci-user requires: @@ -178,6 +190,8 @@ workflows: - test_facebook_automatic_fields.py - test_facebook_tests_run.py - test_facebook_archived_data.py + - test_facebook_attribution_window.py + - test_facebook_invalid_attribution_window.py - deploy: context: circleci-user requires: @@ -239,6 +253,18 @@ workflows: test_name: "test_facebook_archived_data.py" requires: - ensure_env + - run_integration_tests: + context: circleci-user + name: "test_facebook_attribution_window.py" + test_name: "test_facebook_attribution_window.py" + requires: + - ensure_env + - run_integration_tests: + context: circleci-user + name: "test_facebook_invalid_attribution_window.py" + test_name: "test_facebook_invalid_attribution_window.py" + requires: + - ensure_env - build: context: circleci-user requires: @@ -251,6 +277,8 @@ workflows: - test_facebook_automatic_fields.py - test_facebook_tests_run.py - test_facebook_archived_data.py + - test_facebook_attribution_window.py + - test_facebook_invalid_attribution_window.py triggers: - schedule: cron: "0 6 * * *" diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 1471b1ba..0a39cd18 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -588,14 +588,17 @@ def __attrs_post_init__(self): if self.options.get('primary-keys'): self.key_properties.extend(self.options['primary-keys']) + self.buffer_days = 28 + if CONFIG.get('insights_buffer_days'): + self.buffer_days = int(CONFIG.get('insights_buffer_days')) + # attribution window should only be 1, 7 or 28 + if self.buffer_days not in [1, 7, 28]: + raise Exception("The attribution window must be 1, 7 or 28.") + def job_params(self): start_date = get_start(self, self.bookmark_key) - buffer_days = 28 - if CONFIG.get('insights_buffer_days'): - buffer_days = int(CONFIG.get('insights_buffer_days')) - - buffered_start_date = start_date.subtract(days=buffer_days) + buffered_start_date = start_date.subtract(days=self.buffer_days) min_start_date = pendulum.today().subtract(months=self.FACEBOOK_INSIGHTS_RETENTION_PERIOD) if buffered_start_date < min_start_date: LOGGER.warning("%s: Start date is earlier than %s months ago, using %s instead. " diff --git a/tests/test_facebook_attribution_window.py b/tests/test_facebook_attribution_window.py new file mode 100644 index 00000000..05ac61a7 --- /dev/null +++ b/tests/test_facebook_attribution_window.py @@ -0,0 +1,89 @@ +import os + +from tap_tester import runner, connections + +from base import FacebookBaseTest + +class FacebookAttributionWindow(FacebookBaseTest): + + # set attribution window + ATTRIBUTION_WINDOW = 7 + + @staticmethod + def name(): + return "tap_tester_facebook_attribution_window" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), + 'start_date' : '2019-07-24T00:00:00Z', + 'end_date' : '2019-07-26T00:00:00Z', + 'insights_buffer_days': str(self.ATTRIBUTION_WINDOW) + } + if original: + return return_value + + return_value["start_date"] = self.start_date + return return_value + + def test_run(self): + self.run_test(self.ATTRIBUTION_WINDOW) # attribution window: 7 + + self.ATTRIBUTION_WINDOW = 28 + self.run_test(self.ATTRIBUTION_WINDOW) # attribution window: 28 + + def run_test(self, attr_window): + """ + Test to check the attribution window + """ + + conn_id = connections.ensure_connection(self) + + # get start date + start_date = self.get_properties()['start_date'] + # calculate start date with attribution window + start_date_with_attribution_window = self.timedelta_formatted(start_date, days=-attr_window) + + # 'attribution window' is only supported for 'ads_insights' streams + expected_streams = [] + for stream in self.expected_streams(): + if self.is_insight(stream): + expected_streams.append(stream) + + # Run in check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Select only the expected streams tables + catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, select_all_fields=True) + + # Run a sync job using orchestrator + self.run_and_verify_sync(conn_id) + sync_records = runner.get_records_from_target_output() + + expected_replication_keys = self.expected_replication_keys() + + for stream in expected_streams: + with self.subTest(stream=stream): + + replication_key = next(iter(expected_replication_keys[stream])) + + # get records + records = [record.get('data') for record in sync_records.get(stream).get('messages')] + + # check for the record is between attribution date and start date + is_between = False + + for record in records: + replication_key_value = record.get(replication_key) + + # Verify the sync records respect the (simulated) start date value + self.assertGreaterEqual(self.parse_date(replication_key_value), self.parse_date(start_date_with_attribution_window), + msg="The record does not respect the attribution window.") + + # verify if the record's bookmark value is between start date and (simulated) start date value + if self.parse_date(start_date_with_attribution_window) <= self.parse_date(replication_key_value) < self.parse_date(start_date): + is_between = True + + self.assertTrue(is_between, msg="No record found between start date and attribution date.") diff --git a/tests/test_facebook_invalid_attribution_window.py b/tests/test_facebook_invalid_attribution_window.py new file mode 100644 index 00000000..3fc72953 --- /dev/null +++ b/tests/test_facebook_invalid_attribution_window.py @@ -0,0 +1,40 @@ +import os + +from tap_tester import runner, connections, menagerie + +from base import FacebookBaseTest + +class FacebookInvalidAttributionWindow(FacebookBaseTest): + + @staticmethod + def name(): + return "tap_tester_facebook_invalid_attribution_window" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), + 'start_date' : '2019-07-24T00:00:00Z', + 'end_date' : '2019-07-26T00:00:00Z', + 'insights_buffer_days': '10' # set attribution window other than 1, 7 or 28 + } + if original: + return return_value + + return_value["start_date"] = self.start_date + return return_value + + def test_run(self): + """ + Test to verify that the error is raise when passing attribution window other than 1, 7 or 28 + """ + # create connection + conn_id = connections.ensure_connection(self) + # run check mode + check_job_name = runner.run_check_mode(self, conn_id) + # get exit status + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + # get discovery error message + discovery_error_message = exit_status.get('discovery_error_message') + # validate the error message + self.assertEquals(discovery_error_message, "The attribution window must be 1, 7 or 28.") diff --git a/tests/unittests/test_attribution_window.py b/tests/unittests/test_attribution_window.py new file mode 100644 index 00000000..a3e6cc5c --- /dev/null +++ b/tests/unittests/test_attribution_window.py @@ -0,0 +1,29 @@ +import unittest +import tap_facebook.__init__ as tap_facebook + +class TestAttributionWindow(unittest.TestCase): + """ + Test case to verify that proper error message is raise + when user enters attribution window other than 1, 7 and 28 + """ + + def test_invalid_attribution_window(self): + error_message = None + + # set config + tap_facebook.CONFIG = { + "start_date": "2019-01-01T00:00:00Z", + "account_id": "test_account_id", + "access_token": "test_access_token", + "insights_buffer_days": 30 + } + + try: + # initialize 'AdsInsights' stream as attribution window is only supported in those streams + tap_facebook.AdsInsights("test", "test", "test", None, {}, {}) + except Exception as e: + # save error message for assertion + error_message = str(e) + + # verify the error message was as expected + self.assertEquals(error_message, "The attribution window must be 1, 7 or 28.") From 59d60bd03738b546a123758c625f19b55cdaee2f Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Fri, 19 Nov 2021 18:21:24 +0530 Subject: [PATCH 6/6] added comment --- tap_facebook/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index f6a2b0e2..12eb92f0 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -621,6 +621,7 @@ def __attrs_post_init__(self): def job_params(self): start_date = get_start(self, self.bookmark_key) + # Set start date and end date using buffer days and facebook retention period buffered_start_date = start_date.subtract(days=self.buffer_days) min_start_date = pendulum.today().subtract(months=self.FACEBOOK_INSIGHTS_RETENTION_PERIOD) if buffered_start_date < min_start_date: