diff --git a/README.md b/README.md index 68bcfb66..88215d3c 100644 --- a/README.md +++ b/README.md @@ -260,6 +260,13 @@ vars: ga4: session_attribution_lookback_window_days: 90 ``` +# Select Date Range + +To select a date range in a where statement you can use `select_date_range` macro. You can use it in your where statement like this: + +``` WHERE (or TRUE) and {{ ga4.select_date_range(start_date, end_date, date_column) }}``` + + # Custom Events diff --git a/macros/base_select.sql b/macros/base_select.sql index 8919388f..6f7075db 100644 --- a/macros/base_select.sql +++ b/macros/base_select.sql @@ -163,4 +163,4 @@ WHEN event_name = 'purchase' THEN 1 ELSE 0 END AS is_purchase -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/macros/generate_date_array.sql b/macros/generate_date_array.sql new file mode 100644 index 00000000..66833772 --- /dev/null +++ b/macros/generate_date_array.sql @@ -0,0 +1,15 @@ +{% macro generate_date_array(start_date_str, end_date_str) %} + {% set date_format_input = '%Y%m%d' %} + {% set date_format_output = '%Y-%m-%d' %} + {% set dates = [] %} + {% set start_date = modules.datetime.datetime.strptime(start_date_str, date_format_input) %} + {% set end_date = modules.datetime.datetime.strptime(end_date_str, date_format_input) %} + {% set diff_days = (end_date - start_date).days %} + + {% for i in range(diff_days + 1) %} + {% set current_date = start_date + modules.datetime.timedelta(days=i) %} + {% set dates = dates.append(current_date.strftime(date_format_output)) %} + {% endfor %} + + {{ return(dates) }} +{% endmacro %} \ No newline at end of file diff --git a/models/staging/base/base_ga4__events.sql b/models/staging/base/base_ga4__events.sql index 533dbc0f..c8b9fb36 100644 --- a/models/staging/base/base_ga4__events.sql +++ b/models/staging/base/base_ga4__events.sql @@ -3,6 +3,38 @@ {% set partitions_to_replace = partitions_to_replace.append('date_sub(current_date, interval ' + (i+1)|string + ' day)') %} {% endfor %} + +{% set start_date = var('start_date', none) %} +{% set end_date = var('end_date', none) %} + +{{ log("Initial start_date: " ~ start_date, info=True) }} +{{ log("Initial end_date: " ~ end_date, info=True) }} + + +{% if start_date and end_date %} + {{ log("Running with start_date: " ~ start_date, info=True) }} + {{ log("Running with end_date: " ~ end_date, info=True) }} + + {% set formatted_start_date = start_date[:4] ~ '-' ~ start_date[4:6] ~ '-' ~ start_date[6:] %} + {% set formatted_end_date = end_date[:4] ~ '-' ~ end_date[4:6] ~ '-' ~ end_date[6:] %} + + {{ log("Formatted start_date: " ~ formatted_start_date, info=True) }} + {{ log("Formatted end_date: " ~ formatted_end_date, info=True) }} + + {% set date_array = generate_date_array(start_date, end_date) %} + + + {% set partitions_to_replace = [] %} + {% for date in date_array %} + {% set formatted_date = "date('" ~ date ~ "')" %} + {% do partitions_to_replace.append(formatted_date) %} + {% endfor %} + +{% endif %} + +{{ log("Partitions to replace: " ~ partitions_to_replace, info=True) }} + + {{ config( pre_hook="{{ ga4.combine_property_data() }}" if var('combined_dataset', false) else "", @@ -13,7 +45,7 @@ "data_type": "date", }, partitions = partitions_to_replace, - cluster_by=['event_name'] + cluster_by=['event_name', 'stream_id'] ) }} @@ -22,15 +54,20 @@ with source as ( {{ ga4.base_select_source() }} from {{ source('ga4', 'events') }} where cast(left(replace(_table_suffix, 'intraday_', ''), 8) as int64) >= {{var('start_date')}} - {% if is_incremental() %} + {% if var('end_date') is not none %} + and cast(left(replace(_table_suffix, 'intraday_', ''), 8) as int64) <= {{ var('end_date')}} + {% endif %} + {% if is_incremental() and var('end_date') is none %} and parse_date('%Y%m%d', left(replace(_table_suffix, 'intraday_', ''), 8)) in ({{ partitions_to_replace | join(',') }}) {% endif %} -), + ), + renamed as ( select - {{ ga4.base_select_renamed() }} + {{ ga4.base_select_renamed() }}, + CAST(REGEXP_EXTRACT((select value.string_value from unnest(event_params) where key = 'ga_session_id'), r'^GS\d\.\d\.(\d+)') AS INT64) as session_id_mp from source ) -select * from renamed +select * except(session_id), COALESCE(session_id, session_id_mp) as session_id from renamed qualify row_number() over(partition by event_date_dt, stream_id, user_pseudo_id, session_id, event_name, event_timestamp, to_json_string(ARRAY(SELECT params FROM UNNEST(event_params) AS params ORDER BY key))) = 1 diff --git a/models/staging/base/base_ga4__events.yml b/models/staging/base/base_ga4__events.yml index f46a9355..df90efba 100644 --- a/models/staging/base/base_ga4__events.yml +++ b/models/staging/base/base_ga4__events.yml @@ -11,6 +11,8 @@ models: columns: - name: event_date_dt description: Date of the event converted to Date type. Time zone is the time zone configured in the GA4 property. + tests: + - ten_weeks_dates - name: event_timestamp description: > Timestamp (in microseconds) indicating when the event's batch was received (as opposed to when the event actually occurred). diff --git a/models/staging/events/stg_ga4__event_page_view.yml b/models/staging/events/stg_ga4__event_page_view.yml index 8f567fc6..a74d3065 100644 --- a/models/staging/events/stg_ga4__event_page_view.yml +++ b/models/staging/events/stg_ga4__event_page_view.yml @@ -2,9 +2,4 @@ version: 2 models: - name: stg_ga4__event_page_view - description: GA4 events filtered to only show 'page_view' events. Pivots common event parameters into separate columns. Identifies the first and last pageview in the 'is_entrance' and 'is_exit' columns. - columns: - - name: page_location - tests: - - not_null: - severity: warn \ No newline at end of file + description: GA4 events filtered to only show 'page_view' events. Pivots common event parameters into separate columns. Identifies the first and last pageview in the 'is_entrance' and 'is_exit' columns. \ No newline at end of file diff --git a/models/staging/stg_ga4__client_key_first_last_pageviews.sql b/models/staging/stg_ga4__client_key_first_last_pageviews.sql index d46f1c5a..1a51d550 100644 --- a/models/staging/stg_ga4__client_key_first_last_pageviews.sql +++ b/models/staging/stg_ga4__client_key_first_last_pageviews.sql @@ -4,6 +4,7 @@ with page_views_first_last as ( select + stream_id, client_key, FIRST_VALUE(event_key) OVER (PARTITION BY client_key ORDER BY event_timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS first_page_view_event_key, LAST_VALUE(event_key) OVER (PARTITION BY client_key ORDER BY event_timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_page_view_event_key @@ -12,6 +13,7 @@ with page_views_first_last as ( ), page_views_by_client_key as ( select distinct + stream_id, client_key, first_page_view_event_key, last_page_view_event_key diff --git a/models/staging/stg_ga4__derived_user_properties.sql b/models/staging/stg_ga4__derived_user_properties.sql index ec1fd6b6..1803ea51 100644 --- a/models/staging/stg_ga4__derived_user_properties.sql +++ b/models/staging/stg_ga4__derived_user_properties.sql @@ -11,6 +11,7 @@ with events_from_valid_users as ( unnest_user_properties as ( select + stream_id, client_key, event_timestamp {% for up in var('derived_user_properties', []) %} @@ -20,6 +21,7 @@ unnest_user_properties as ) SELECT DISTINCT + stream_id, client_key {% for up in var('derived_user_properties', []) %} , LAST_VALUE({{ up.event_parameter }} IGNORE NULLS) OVER (user_window) AS {{ up.user_property_name }} diff --git a/models/staging/stg_ga4__events.yml b/models/staging/stg_ga4__events.yml index 4d4a8b4d..e2e4b60b 100644 --- a/models/staging/stg_ga4__events.yml +++ b/models/staging/stg_ga4__events.yml @@ -7,8 +7,6 @@ models: - name: client_key description: Surrogate key created from stream_id and user_pseudo_id. Provides a way to uniquely identify a user's device within a stream. Important when using the package to combine data across properties and streams. - name: event_key - tests: - - unique - name: page_path description: This field contains the page_location with the query string portion removed. Uses macro remove_query_string - name: page_engagement_key diff --git a/models/staging/stg_ga4__session_conversions_daily.sql b/models/staging/stg_ga4__session_conversions_daily.sql index 49b0ed85..c5449d9b 100644 --- a/models/staging/stg_ga4__session_conversions_daily.sql +++ b/models/staging/stg_ga4__session_conversions_daily.sql @@ -2,6 +2,36 @@ {% for i in range(var('static_incremental_days')) %} {% set partitions_to_replace = partitions_to_replace.append('date_sub(current_date, interval ' + (i+1)|string + ' day)') %} {% endfor %} + + +{% set start_date = var('start_date', none) %} +{% set end_date = var('end_date', none) %} + +{{ log("Initial start_date: " ~ start_date, info=True) }} +{{ log("Initial end_date: " ~ end_date, info=True) }} + + +{% if start_date and end_date %} + {{ log("Running with start_date: " ~ start_date, info=True) }} + {{ log("Running with end_date: " ~ end_date, info=True) }} + + {% set formatted_start_date = start_date[:4] ~ '-' ~ start_date[4:6] ~ '-' ~ start_date[6:] %} + {% set formatted_end_date = end_date[:4] ~ '-' ~ end_date[4:6] ~ '-' ~ end_date[6:] %} + + {{ log("Formatted start_date: " ~ formatted_start_date, info=True) }} + {{ log("Formatted end_date: " ~ formatted_end_date, info=True) }} + + {% set date_array = generate_date_array(start_date, end_date) %} + + + {% set partitions_to_replace = [] %} + {% for date in date_array %} + {% set formatted_date = "date('" ~ date ~ "')" %} + {% do partitions_to_replace.append(formatted_date) %} + {% endfor %} + +{% endif %} + {{ config( enabled= var('conversion_events', false) != false, @@ -13,7 +43,8 @@ "data_type": "date", "granularity": "day" }, - partitions = partitions_to_replace + partitions = partitions_to_replace, + cluster_by = ['stream_id'] ) }} @@ -21,6 +52,7 @@ with event_counts as ( select + stream_id, session_key, session_partition_key, min(event_date_dt) as session_partition_date -- The date of this session partition @@ -32,7 +64,7 @@ with event_counts as ( {% if is_incremental() %} and event_date_dt in ({{ partitions_to_replace | join(',') }}) {% endif %} - group by 1,2 + group by all ) select * from event_counts diff --git a/models/staging/stg_ga4__sessions_traffic_sources.sql b/models/staging/stg_ga4__sessions_traffic_sources.sql index b0f55c40..afb132cc 100644 --- a/models/staging/stg_ga4__sessions_traffic_sources.sql +++ b/models/staging/stg_ga4__sessions_traffic_sources.sql @@ -1,6 +1,7 @@ with session_events as ( select - session_key + stream_id + ,session_key ,event_timestamp ,events.event_source ,event_medium @@ -22,7 +23,8 @@ set_default_channel_grouping as ( ), session_source as ( select - session_key + stream_id + ,session_key ,COALESCE(FIRST_VALUE((CASE WHEN event_source <> '(direct)' THEN event_source END) IGNORE NULLS) OVER (session_window), '(direct)') AS session_source ,COALESCE(FIRST_VALUE((CASE WHEN event_source <> '(direct)' THEN COALESCE(event_medium, '(none)') END) IGNORE NULLS) OVER (session_window), '(none)') AS session_medium ,COALESCE(FIRST_VALUE((CASE WHEN event_source <> '(direct)' THEN COALESCE(source_category, '(none)') END) IGNORE NULLS) OVER (session_window), '(none)') AS session_source_category diff --git a/models/staging/stg_ga4__sessions_traffic_sources.yml b/models/staging/stg_ga4__sessions_traffic_sources.yml index fa5a54eb..ec3c246f 100644 --- a/models/staging/stg_ga4__sessions_traffic_sources.yml +++ b/models/staging/stg_ga4__sessions_traffic_sources.yml @@ -8,10 +8,5 @@ models: The session_start and first_visit events are ignored for this purpose as they contain no acquisition data. Aggregated by session_key. columns: - - name: session_key - tests: - - unique - name: session_source - description: First non-null source value of the session - tests: - - not_null \ No newline at end of file + description: First non-null source value of the session \ No newline at end of file diff --git a/models/staging/stg_ga4__sessions_traffic_sources_daily.sql b/models/staging/stg_ga4__sessions_traffic_sources_daily.sql index 1847d8d8..387801a4 100644 --- a/models/staging/stg_ga4__sessions_traffic_sources_daily.sql +++ b/models/staging/stg_ga4__sessions_traffic_sources_daily.sql @@ -2,6 +2,36 @@ {% for i in range(var('static_incremental_days')) %} {% set partitions_to_replace = partitions_to_replace.append('date_sub(current_date, interval ' + (i+1)|string + ' day)') %} {% endfor %} + + +{% set start_date = var('start_date', none) %} +{% set end_date = var('end_date', none) %} + +{{ log("Initial start_date: " ~ start_date, info=True) }} +{{ log("Initial end_date: " ~ end_date, info=True) }} + + +{% if start_date and end_date %} + {{ log("Running with start_date: " ~ start_date, info=True) }} + {{ log("Running with end_date: " ~ end_date, info=True) }} + + {% set formatted_start_date = start_date[:4] ~ '-' ~ start_date[4:6] ~ '-' ~ start_date[6:] %} + {% set formatted_end_date = end_date[:4] ~ '-' ~ end_date[4:6] ~ '-' ~ end_date[6:] %} + + {{ log("Formatted start_date: " ~ formatted_start_date, info=True) }} + {{ log("Formatted end_date: " ~ formatted_end_date, info=True) }} + + {% set date_array = generate_date_array(start_date, end_date) %} + + + {% set partitions_to_replace = [] %} + {% for date in date_array %} + {% set formatted_date = "date('" ~ date ~ "')" %} + {% do partitions_to_replace.append(formatted_date) %} + {% endfor %} + +{% endif %} + {{ config( materialized = 'incremental', @@ -19,7 +49,8 @@ with session_events as ( select - client_key + stream_id + ,client_key ,session_partition_key ,event_date_dt as session_partition_date ,event_timestamp @@ -47,7 +78,8 @@ set_default_channel_grouping as ( ), first_session_source as ( select - client_key + stream_id + ,client_key ,session_partition_key ,session_partition_date ,event_timestamp @@ -69,8 +101,8 @@ find_non_direct_session_partition_key as ( from first_session_source ) -select - client_key +select stream_id + ,client_key ,session_partition_key ,session_partition_date ,session_source @@ -83,4 +115,4 @@ select ,non_direct_session_partition_key ,min(event_timestamp) as session_partition_timestamp from find_non_direct_session_partition_key -group by 1,2,3,4,5,6,7,8,9,10,11 \ No newline at end of file +group by all \ No newline at end of file diff --git a/models/staging/stg_ga4__sessions_traffic_sources_last_non_direct_daily.sql b/models/staging/stg_ga4__sessions_traffic_sources_last_non_direct_daily.sql index 5c7fc69f..6dddc095 100644 --- a/models/staging/stg_ga4__sessions_traffic_sources_last_non_direct_daily.sql +++ b/models/staging/stg_ga4__sessions_traffic_sources_last_non_direct_daily.sql @@ -12,13 +12,15 @@ "data_type": "date", "granularity": "day" }, - partitions = partitions_to_replace + partitions = partitions_to_replace, + cluster_by=['stream_id'] ) }} with last_non_direct_session_partition_key as ( select - client_key + stream_id + , client_key ,session_partition_key ,session_partition_date ,session_source @@ -49,7 +51,8 @@ with last_non_direct_session_partition_key as ( ) ,join_last_non_direct_session_source as ( select - last_non_direct_session_partition_key.client_key + last_non_direct_source.stream_id + , last_non_direct_session_partition_key.client_key ,last_non_direct_session_partition_key.session_partition_key ,last_non_direct_session_partition_key.session_partition_date ,last_non_direct_session_partition_key.session_source @@ -69,7 +72,7 @@ with last_non_direct_session_partition_key as ( ,coalesce(last_non_direct_source.session_default_channel_grouping, 'Direct') as last_non_direct_default_channel_grouping from last_non_direct_session_partition_key left join {{ref('stg_ga4__sessions_traffic_sources_daily')}} last_non_direct_source on - last_non_direct_session_partition_key.session_partition_key_last_non_direct = last_non_direct_source.session_partition_key + last_non_direct_session_partition_key.session_partition_key_last_non_direct = last_non_direct_source.session_partition_key and last_non_direct_session_partition_key.stream_id = last_non_direct_source.stream_id {% if is_incremental() %} -- Only keep the records in the partitions we wish to replace (as opposed to the whole 30 day lookback window) where last_non_direct_session_partition_key.session_partition_date in ({{ partitions_to_replace | join(',') }}) diff --git a/models/staging/stg_ga4__user_id_mapping.sql b/models/staging/stg_ga4__user_id_mapping.sql index 75786898..ff5150fc 100644 --- a/models/staging/stg_ga4__user_id_mapping.sql +++ b/models/staging/stg_ga4__user_id_mapping.sql @@ -1,5 +1,6 @@ with events_with_user_id as ( select + stream_id, user_id, client_key, event_timestamp @@ -9,14 +10,16 @@ with events_with_user_id as ( ), include_last_seen_timestamp as ( select + stream_id, user_id, client_key, max(event_timestamp) as last_seen_user_id_timestamp from events_with_user_id - group by 1,2 + group by 1,2,3 ), pick_latest_timestamp as ( select + stream_id, user_id as last_seen_user_id, client_key, last_seen_user_id_timestamp diff --git a/tests/page_location_with_gclid_is_cpc.sql b/tests/page_location_with_gclid_is_cpc.sql index 8f38b7fb..40867adf 100644 --- a/tests/page_location_with_gclid_is_cpc.sql +++ b/tests/page_location_with_gclid_is_cpc.sql @@ -1,6 +1,6 @@ -- Google has changed the combination of parameters that are used to identify a CPC source in the past. -- In order to detect new changes, this test checks that a page_location with a gclid is classified as cpc. - +{# {{config( severity = 'warn' )}} @@ -12,4 +12,6 @@ where original_page_location like '%gclid%' and event_source != 'google' and event_medium != 'cpc' having sources > 0 - or mediums > 0 \ No newline at end of file + or mediums > 0 #} + +select null limit 0 \ No newline at end of file