diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index fef72d6..5ffc2cd 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -21,6 +21,7 @@ vars: rag_hubspot_engagement_company_identifier: "hubspot_engagement_company" rag_hubspot_engagement_contact_identifier: "hubspot_engagement_contact" rag_hubspot_engagement_deal_identifier: "hubspot_engagement_deal" + rag_hubspot_engagement_deal_company: "hubspot_deal_company" rag_hubspot_company_identifier: "hubspot_company" rag_hubspot_contact_identifier: "hubspot_contact" rag_hubspot_owner_identifier: "hubspot_owner" diff --git a/macros/staging/hubspot/get_hubspot_deal_company_columns.sql b/macros/staging/hubspot/get_hubspot_deal_company_columns.sql new file mode 100644 index 0000000..aa8199c --- /dev/null +++ b/macros/staging/hubspot/get_hubspot_deal_company_columns.sql @@ -0,0 +1,11 @@ +{% macro get_hubspot_deal_company_columns() %} + +{% set columns = [ + {"name": "_fivetran_synced", "datatype": dbt.type_timestamp()}, + {"name": "deal_id", "datatype": dbt.type_int()}, + {"name": "company_id", "datatype": dbt.type_int()} +] %} + +{{ return(columns) }} + +{% endmacro %} \ No newline at end of file diff --git a/macros/utility/create_json.sql b/macros/utility/create_json.sql new file mode 100644 index 0000000..b76218c --- /dev/null +++ b/macros/utility/create_json.sql @@ -0,0 +1,38 @@ +{% macro create_json(columns) -%} + {% if target.type == 'bigquery' -%} + TO_JSON_STRING( + STRUCT( + {%- for column in columns -%} + {{ column }} AS {{ column }} + {%- if not loop.last -%}, {% endif -%} + {%- endfor -%} + ) + ) + {% elif target.type == 'snowflake' -%} + CAST( + OBJECT_CONSTRUCT( + {%- for column in columns -%} + '{{ column }}', {{ column }} + {%- if not loop.last -%}, {% endif -%} + {%- endfor -%} + ) + AS STRING + ) + {% elif target.type == 'redshift' -%} + json_build_object( + {%- for column in columns -%} + '{{ column }}', {{ column }} + {%- if not loop.last -%}, {% endif -%} + {%- endfor -%} + )::VARCHAR + {% elif target.type == 'databricks' -%} + to_json( + named_struct( + {%- for column in columns -%} + '{{ column }}', {{ column }} + {%- if not loop.last -%}, {% endif -%} + {%- endfor -%} + ) + ) + {% endif -%} +{% endmacro -%} diff --git a/models/intermediate/hubspot/int_rag_hubspot__company_document.sql b/models/intermediate/hubspot/int_rag_hubspot__company_document.sql new file mode 100644 index 0000000..3bf68ea --- /dev/null +++ b/models/intermediate/hubspot/int_rag_hubspot__company_document.sql @@ -0,0 +1,77 @@ +{{ config(enabled=var('rag__using_hubspot', True)) }} + +WITH owners AS ( + SELECT + *, + COALESCE( + owner_email, + 'UNKNOWN' + ) AS safe_email, + COALESCE( + first_name, + '' + ) AS safe_first_name, + COALESCE( + last_name, + '' + ) AS safe_last_name + FROM + {{ ref('stg_rag_hubspot__owner') }} +), +deals AS ( + SELECT + *, + COALESCE({{ cast('closed_date', dbt.type_string()) }}, 'not closed yet') AS safe_close_date + FROM + {{ ref('stg_rag_hubspot__deal') }} +), +company AS ( + SELECT + * + FROM + {{ ref('stg_rag_hubspot__company') }} +), +deal_company AS ( + SELECT + * + FROM + {{ ref('stg_rag_hubspot__deal_company') }} +), +deal_descriptions AS ( + SELECT + DISTINCT deal_id, + source_relation, + safe_close_date AS closed_date, + {{ create_json(['deal_id', 'title', 'safe_close_date']) }} AS deal_description + FROM + deals +), +company_with_deal_description AS ( + SELECT + company.company_id AS company_id, + company.source_relation AS source_relation, + {{ dbt.concat([ + "'['", + dbt.listagg("dd.deal_description", "','", "order by dd.closed_date"), + "']'" + ]) }} AS deal_descriptions + FROM + company + LEFT JOIN deal_company dc + ON dc.company_id = company.company_id + AND dc.source_relation = company.source_relation + LEFT JOIN deal_descriptions dd + ON dd.deal_id = dc.deal_id + AND dc.source_relation = dd.source_relation + GROUP BY + 1, + 2 +) +SELECT + cdd.deal_descriptions AS deals, + company.* +FROM + company + JOIN company_with_deal_description cdd + ON cdd.company_id = company.company_id + AND cdd.source_relation = company.source_relation diff --git a/models/intermediate/hubspot/int_rag_hubspot__deal_document.sql b/models/intermediate/hubspot/int_rag_hubspot__deal_document.sql index e9927aa..dccbe08 100644 --- a/models/intermediate/hubspot/int_rag_hubspot__deal_document.sql +++ b/models/intermediate/hubspot/int_rag_hubspot__deal_document.sql @@ -13,11 +13,19 @@ contacts as ( ), companies as ( - - select * + select + *, + {{ create_json(['company_id', 'company_name']) }} AS company_desc from {{ ref('stg_rag_hubspot__company') }} ), +deal_company AS ( + SELECT + * + FROM + {{ ref('stg_rag_hubspot__deal_company') }} +), + engagements as ( select * from {{ ref('stg_rag_hubspot__engagement') }} @@ -41,6 +49,11 @@ engagement_deals as ( from {{ ref('stg_rag_hubspot__engagement_deal') }} ), +owners AS ( + select * + from {{ ref('stg_rag_hubspot__owner') }} +), + engagement_detail_prep as ( select @@ -52,7 +65,8 @@ engagement_detail_prep as ( {{ unified_rag.coalesce_cast(["contacts.contact_name", "'UNKNOWN'"], dbt.type_string()) }} as contact_name, {{ unified_rag.coalesce_cast(["contacts.email", "'UNKNOWN'"], dbt.type_string()) }} as created_by, {{ unified_rag.coalesce_cast(["companies.company_name", "'UNKNOWN'"], dbt.type_string()) }} as company_name, - {{ unified_rag.coalesce_cast(["deals.created_date", "'1970-01-01 00:00:00'"], dbt.type_timestamp()) }} AS created_on + {{ unified_rag.coalesce_cast(["deals.created_date", "'1970-01-01 00:00:00'"], dbt.type_timestamp()) }} AS created_on, + {{ dbt.concat(["coalesce(owners.first_name, '')", "' '", "coalesce(owners.last_name, '')", "' ('", "coalesce(owners.owner_email, '')", "')'"]) }} AS owner_details from deals left join engagement_deals on deals.deal_id = engagement_deals.deal_id @@ -72,6 +86,9 @@ engagement_detail_prep as ( left join companies on engagement_companies.company_id = companies.company_id and engagement_companies.source_relation = companies.source_relation + left join owners + on deals.owner_id = owners.owner_id + and deals.source_relation = owners.source_relation ), engagement_details as ( @@ -84,7 +101,8 @@ engagement_details as ( {{ fivetran_utils.string_agg(field_to_agg="distinct engagement_type", delimiter="', '") }} as engagement_type, {{ fivetran_utils.string_agg(field_to_agg="distinct contact_name", delimiter="', '") }} as contact_name, {{ fivetran_utils.string_agg(field_to_agg="distinct created_by", delimiter="', '") }} as created_by, - {{ fivetran_utils.string_agg(field_to_agg="distinct company_name", delimiter="', '") }} as company_name + {{ fivetran_utils.string_agg(field_to_agg="distinct company_name", delimiter="', '") }} as company_name, + {{ fivetran_utils.string_agg(field_to_agg="distinct owner_details", delimiter="', '") }} as owner_details from engagement_detail_prep group by 1,2,3,4,5 ), @@ -92,18 +110,31 @@ engagement_details as ( engagement_markdown as ( select - deal_id, - title, - source_relation, - url_reference, + ed.deal_id, + ed.title, + ed.source_relation, + ed.url_reference, cast( {{ dbt.concat([ "'Deal Name : '", "title", "'\\n\\n'", "'Created By : '", "contact_name", "' ('", "created_by", "')\\n'", "'Created On : '", "created_on", "'\\n'", - "'Company Name: '", "company_name", "'\\n'", - "'Engagement Type: '", "engagement_type", "'\\n'" - ]) }} as {{ dbt.type_string() }}) as comment_markdown - from engagement_details + "'Company Name: '", "ed.company_name", "'\\n'", + "'Engagement Type: '", "engagement_type", "'\\n'", + "'Deal Owner: '", "owner_details", "'\\n'" + ]) }} as {{ dbt.type_string() }}) as comment_markdown, + {{ dbt.concat([ + "'['", + dbt.listagg("cc.company_desc", "','"), + "']'" + ]) }} AS companies + from engagement_details ed + left join deal_company dc + on dc.deal_id = ed.deal_id + and dc.source_relation = ed.source_relation + left join companies cc + on dc.company_id = cc.company_id + and dc.source_relation = cc.source_relation + group by 1,2,3,4,5 ), engagement_tokens as ( diff --git a/models/staging/hubspot_staging/src_rag_hubspot.yml b/models/staging/hubspot_staging/src_rag_hubspot.yml index 72a42bf..9d00152 100644 --- a/models/staging/hubspot_staging/src_rag_hubspot.yml +++ b/models/staging/hubspot_staging/src_rag_hubspot.yml @@ -269,3 +269,16 @@ sources: description: The type of owner. - name: updated_at description: Timestamp representing when the owner was last updated. + + - name: deal_company + identifier: "{{ var('rag_hubspot_deal_company_identifier', 'deal_company')}}" + description: Each record represents a 'link' between a deal and a company. + config: + enabled: "{{ var('rag_hubspot_sales_enabled', true) and var('rag_hubspot_company_enabled', true) and var('rag_hubspot_deal_enabled', true) }}" + columns: + - name: _fivetran_synced + description: '{{ doc("_fivetran_synced") }}' + - name: deal_id + description: The ID of the related contact. + - name: company + description: The ID of the related company. \ No newline at end of file diff --git a/models/staging/hubspot_staging/stg_rag_hubspot__company.sql b/models/staging/hubspot_staging/stg_rag_hubspot__company.sql index a1b3970..ca37f04 100644 --- a/models/staging/hubspot_staging/stg_rag_hubspot__company.sql +++ b/models/staging/hubspot_staging/stg_rag_hubspot__company.sql @@ -1,59 +1,29 @@ -{{ config(enabled=var('rag__using_hubspot', True)) }} - -with base as ( - - {{ - fivetran_utils.union_data( - table_identifier='company', - database_variable='rag_hubspot_database', - schema_variable='rag_hubspot_schema', - default_database=target.database, - default_schema='rag_hubspot', - default_variable='hubspot_company', - union_schema_variable='rag_hubspot_union_schemas', - union_database_variable='rag_hubspot_union_databases' - ) - }} -), - -fields as ( - - select - {{ - fivetran_utils.fill_staging_columns( - source_columns=adapter.get_columns_in_relation(source('rag_hubspot','company')), - staging_columns=get_hubspot_company_columns() - ) - }} - - {{ fivetran_utils.source_relation( - union_schema_variable='rag_hubspot_union_schemas', - union_database_variable='rag_hubspot_union_databases') - }} - from base -), - -final as ( - - select - company_id, - source_relation, - is_company_deleted, - cast(_fivetran_synced as {{ dbt.type_timestamp() }}) as _fivetran_synced, - company_name, - description, - created_date, - industry, - street_address, - street_address_2, - city, - state, - country, - company_annual_revenue - - from fields - -) - -select * -from final \ No newline at end of file +{{ config(enabled = var('rag__using_hubspot', True)) }} + +WITH FINAL AS ( + + SELECT + {{ dbt_utils.star( + from = ref('stg_rag_hubspot__company_fields'), + except = ['id', '_fivetran_synced', 'is_deleted', 'property_name', 'property_description', 'property_createdate', 'property_industry', 'property_address', 'property_address_2', 'property_city', 'property_state', 'property_country', 'property_annualrevenue' ] + ) }}, + id AS company_id, + CAST(_fivetran_synced AS {{ dbt.type_timestamp() }}) AS _fivetran_synced, + is_deleted AS is_company_deleted, + property_name AS company_name, + property_description AS description, + property_createdate AS created_date, + property_industry AS industry, + property_address AS street_address, + property_address_2 AS street_address_2, + property_city AS city, + property_state AS state, + property_country AS country, + property_annualrevenue AS company_annual_revenue + FROM + {{ ref('stg_rag_hubspot__company_fields') }} +) +SELECT + * +FROM + FINAL diff --git a/models/staging/hubspot_staging/stg_rag_hubspot__company_fields.sql b/models/staging/hubspot_staging/stg_rag_hubspot__company_fields.sql new file mode 100644 index 0000000..a2632cc --- /dev/null +++ b/models/staging/hubspot_staging/stg_rag_hubspot__company_fields.sql @@ -0,0 +1,30 @@ +{{ config(enabled=var('rag__using_hubspot', True)) }} + +with base as ( + + {{ + fivetran_utils.union_data( + table_identifier='company', + database_variable='rag_hubspot_database', + schema_variable='rag_hubspot_schema', + default_database=target.database, + default_schema='rag_hubspot', + default_variable='hubspot_company', + union_schema_variable='rag_hubspot_union_schemas', + union_database_variable='rag_hubspot_union_databases' + ) + }} +), + +fields as ( + + select + * + {{ fivetran_utils.source_relation( + union_schema_variable='rag_hubspot_union_schemas', + union_database_variable='rag_hubspot_union_databases') + }} + from base +) + +select * from fields \ No newline at end of file diff --git a/models/staging/hubspot_staging/stg_rag_hubspot__deal_company.sql b/models/staging/hubspot_staging/stg_rag_hubspot__deal_company.sql new file mode 100644 index 0000000..4ec431e --- /dev/null +++ b/models/staging/hubspot_staging/stg_rag_hubspot__deal_company.sql @@ -0,0 +1,46 @@ +{{ config(enabled=var('rag__using_hubspot', True)) }} + +with base as ( + + {{ + fivetran_utils.union_data( + table_identifier='deal_company', + database_variable='rag_hubspot_database', + schema_variable='rag_hubspot_schema', + default_database=target.database, + default_schema='rag_hubspot', + default_variable='hubspot_deal_company', + union_schema_variable='rag_hubspot_union_schemas', + union_database_variable='rag_hubspot_union_databases' + ) + }} +), + +fields as ( + + select + {{ + fivetran_utils.fill_staging_columns( + source_columns=adapter.get_columns_in_relation(source('rag_hubspot','deal_company')), + staging_columns=get_hubspot_deal_company_columns() + ) + }} + + {{ fivetran_utils.source_relation( + union_schema_variable='rag_hubspot_union_schemas', + union_database_variable='rag_hubspot_union_databases') + }} + from base +), + +final as ( + + select + deal_id, + company_id, + source_relation + from fields +) + +select * +from final diff --git a/models/unstructured/rag_hubspot__document.sql b/models/unstructured/rag_hubspot__document.sql index ece1210..d246927 100644 --- a/models/unstructured/rag_hubspot__document.sql +++ b/models/unstructured/rag_hubspot__document.sql @@ -18,18 +18,19 @@ final as ( cast(deal_document.deal_id as {{ dbt.type_string() }}) as document_id, coalesce(deal_document.title, grouped.title) as title, deal_document.url_reference, + deal_document.companies, 'hubspot' as platform, deal_document.source_relation, grouped.most_recent_chunk_update, - grouped.chunk_index, + coalesce(grouped.chunk_index, 0) as chunk_index, grouped.chunk_tokens as chunk_tokens_approximate, {{ dbt.concat([ "deal_document.comment_markdown", "'\\n\\n## COMMENTS\\n\\n'", - "grouped.comments_group_markdown"]) }} + "coalesce(grouped.comments_group_markdown, '')"]) }} as chunk from deal_document - join grouped + left join grouped on grouped.deal_id = deal_document.deal_id and grouped.source_relation = deal_document.source_relation )