Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(hubspot): Company document (requested by Bonfire for Fivetran AI) #24

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
1 change: 1 addition & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ vars:
rag_hubspot_engagement_company_identifier: "hubspot_engagement_company"
rag_hubspot_engagement_contact_identifier: "hubspot_engagement_contact"
rag_hubspot_engagement_deal_identifier: "hubspot_engagement_deal"
rag_hubspot_engagement_deal_company: "hubspot_deal_company"
rag_hubspot_company_identifier: "hubspot_company"
rag_hubspot_contact_identifier: "hubspot_contact"
rag_hubspot_owner_identifier: "hubspot_owner"
Expand Down
11 changes: 11 additions & 0 deletions macros/staging/hubspot/get_hubspot_deal_company_columns.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{% macro get_hubspot_deal_company_columns() %}

{% set columns = [
{"name": "_fivetran_synced", "datatype": dbt.type_timestamp()},
{"name": "deal_id", "datatype": dbt.type_int()},
{"name": "company_id", "datatype": dbt.type_int()}
] %}

{{ return(columns) }}

{% endmacro %}
38 changes: 38 additions & 0 deletions macros/utility/create_json.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{% macro create_json(columns) -%}
{% if target.type == 'bigquery' -%}
TO_JSON_STRING(
STRUCT(
{%- for column in columns -%}
{{ column }} AS {{ column }}
{%- if not loop.last -%}, {% endif -%}
{%- endfor -%}
)
)
{% elif target.type == 'snowflake' -%}
CAST(
OBJECT_CONSTRUCT(
{%- for column in columns -%}
'{{ column }}', {{ column }}
{%- if not loop.last -%}, {% endif -%}
{%- endfor -%}
)
AS STRING
)
{% elif target.type == 'redshift' -%}
json_build_object(
{%- for column in columns -%}
'{{ column }}', {{ column }}
{%- if not loop.last -%}, {% endif -%}
{%- endfor -%}
)::VARCHAR
{% elif target.type == 'databricks' -%}
to_json(
named_struct(
{%- for column in columns -%}
'{{ column }}', {{ column }}
{%- if not loop.last -%}, {% endif -%}
{%- endfor -%}
)
)
{% endif -%}
{% endmacro -%}
77 changes: 77 additions & 0 deletions models/intermediate/hubspot/int_rag_hubspot__company_document.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{{ config(enabled=var('rag__using_hubspot', True)) }}

WITH owners AS (
SELECT
*,
COALESCE(
owner_email,
'UNKNOWN'
) AS safe_email,
COALESCE(
first_name,
''
) AS safe_first_name,
COALESCE(
last_name,
''
) AS safe_last_name
FROM
{{ ref('stg_rag_hubspot__owner') }}
),
deals AS (
SELECT
*,
COALESCE({{ cast('closed_date', dbt.type_string()) }}, 'not closed yet') AS safe_close_date
FROM
{{ ref('stg_rag_hubspot__deal') }}
),
company AS (
SELECT
*
FROM
{{ ref('stg_rag_hubspot__company') }}
),
deal_company AS (
SELECT
*
FROM
{{ ref('stg_rag_hubspot__deal_company') }}
),
deal_descriptions AS (
SELECT
DISTINCT deal_id,
source_relation,
safe_close_date AS closed_date,
{{ create_json(['deal_id', 'title', 'safe_close_date']) }} AS deal_description
FROM
deals
),
company_with_deal_description AS (
SELECT
company.company_id AS company_id,
company.source_relation AS source_relation,
{{ dbt.concat([
"'['",
dbt.listagg("dd.deal_description", "','", "order by dd.closed_date"),
"']'"
]) }} AS deal_descriptions
FROM
company
LEFT JOIN deal_company dc
ON dc.company_id = company.company_id
AND dc.source_relation = company.source_relation
LEFT JOIN deal_descriptions dd
ON dd.deal_id = dc.deal_id
AND dc.source_relation = dd.source_relation
GROUP BY
1,
2
)
SELECT
cdd.deal_descriptions AS deals,
company.*
FROM
company
JOIN company_with_deal_description cdd
ON cdd.company_id = company.company_id
AND cdd.source_relation = company.source_relation
55 changes: 43 additions & 12 deletions models/intermediate/hubspot/int_rag_hubspot__deal_document.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,19 @@ contacts as (
),

companies as (

select *
select
*,
{{ create_json(['company_id', 'company_name']) }} AS company_desc
from {{ ref('stg_rag_hubspot__company') }}
),

deal_company AS (
SELECT
*
FROM
{{ ref('stg_rag_hubspot__deal_company') }}
),

engagements as (
select *
from {{ ref('stg_rag_hubspot__engagement') }}
Expand All @@ -41,6 +49,11 @@ engagement_deals as (
from {{ ref('stg_rag_hubspot__engagement_deal') }}
),

owners AS (
select *
from {{ ref('stg_rag_hubspot__owner') }}
),

engagement_detail_prep as (

select
Expand All @@ -52,7 +65,8 @@ engagement_detail_prep as (
{{ unified_rag.coalesce_cast(["contacts.contact_name", "'UNKNOWN'"], dbt.type_string()) }} as contact_name,
{{ unified_rag.coalesce_cast(["contacts.email", "'UNKNOWN'"], dbt.type_string()) }} as created_by,
{{ unified_rag.coalesce_cast(["companies.company_name", "'UNKNOWN'"], dbt.type_string()) }} as company_name,
{{ unified_rag.coalesce_cast(["deals.created_date", "'1970-01-01 00:00:00'"], dbt.type_timestamp()) }} AS created_on
{{ unified_rag.coalesce_cast(["deals.created_date", "'1970-01-01 00:00:00'"], dbt.type_timestamp()) }} AS created_on,
{{ dbt.concat(["coalesce(owners.first_name, '')", "' '", "coalesce(owners.last_name, '')", "' ('", "coalesce(owners.owner_email, '')", "')'"]) }} AS owner_details
from deals
left join engagement_deals
on deals.deal_id = engagement_deals.deal_id
Expand All @@ -72,6 +86,9 @@ engagement_detail_prep as (
left join companies
on engagement_companies.company_id = companies.company_id
and engagement_companies.source_relation = companies.source_relation
left join owners
on deals.owner_id = owners.owner_id
and deals.source_relation = owners.source_relation
),

engagement_details as (
Expand All @@ -84,26 +101,40 @@ engagement_details as (
{{ fivetran_utils.string_agg(field_to_agg="distinct engagement_type", delimiter="', '") }} as engagement_type,
{{ fivetran_utils.string_agg(field_to_agg="distinct contact_name", delimiter="', '") }} as contact_name,
{{ fivetran_utils.string_agg(field_to_agg="distinct created_by", delimiter="', '") }} as created_by,
{{ fivetran_utils.string_agg(field_to_agg="distinct company_name", delimiter="', '") }} as company_name
{{ fivetran_utils.string_agg(field_to_agg="distinct company_name", delimiter="', '") }} as company_name,
{{ fivetran_utils.string_agg(field_to_agg="distinct owner_details", delimiter="', '") }} as owner_details
from engagement_detail_prep
group by 1,2,3,4,5
),

engagement_markdown as (

select
deal_id,
title,
source_relation,
url_reference,
ed.deal_id,
ed.title,
ed.source_relation,
ed.url_reference,
cast( {{ dbt.concat([
"'Deal Name : '", "title", "'\\n\\n'",
"'Created By : '", "contact_name", "' ('", "created_by", "')\\n'",
"'Created On : '", "created_on", "'\\n'",
"'Company Name: '", "company_name", "'\\n'",
"'Engagement Type: '", "engagement_type", "'\\n'"
]) }} as {{ dbt.type_string() }}) as comment_markdown
from engagement_details
"'Company Name: '", "ed.company_name", "'\\n'",
"'Engagement Type: '", "engagement_type", "'\\n'",
"'Deal Owner: '", "owner_details", "'\\n'"
]) }} as {{ dbt.type_string() }}) as comment_markdown,
{{ dbt.concat([
"'['",
dbt.listagg("cc.company_desc", "','"),
"']'"
]) }} AS companies
from engagement_details ed
left join deal_company dc
on dc.deal_id = ed.deal_id
and dc.source_relation = ed.source_relation
left join companies cc
on dc.company_id = cc.company_id
and dc.source_relation = cc.source_relation
group by 1,2,3,4,5
),

engagement_tokens as (
Expand Down
13 changes: 13 additions & 0 deletions models/staging/hubspot_staging/src_rag_hubspot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,16 @@ sources:
description: The type of owner.
- name: updated_at
description: Timestamp representing when the owner was last updated.

- name: deal_company
identifier: "{{ var('rag_hubspot_deal_company_identifier', 'deal_company')}}"
description: Each record represents a 'link' between a deal and a company.
config:
enabled: "{{ var('rag_hubspot_sales_enabled', true) and var('rag_hubspot_company_enabled', true) and var('rag_hubspot_deal_enabled', true) }}"
columns:
- name: _fivetran_synced
description: '{{ doc("_fivetran_synced") }}'
- name: deal_id
description: The ID of the related contact.
- name: company
description: The ID of the related company.
88 changes: 29 additions & 59 deletions models/staging/hubspot_staging/stg_rag_hubspot__company.sql
Original file line number Diff line number Diff line change
@@ -1,59 +1,29 @@
{{ config(enabled=var('rag__using_hubspot', True)) }}

with base as (

{{
fivetran_utils.union_data(
table_identifier='company',
database_variable='rag_hubspot_database',
schema_variable='rag_hubspot_schema',
default_database=target.database,
default_schema='rag_hubspot',
default_variable='hubspot_company',
union_schema_variable='rag_hubspot_union_schemas',
union_database_variable='rag_hubspot_union_databases'
)
}}
),

fields as (

select
{{
fivetran_utils.fill_staging_columns(
source_columns=adapter.get_columns_in_relation(source('rag_hubspot','company')),
staging_columns=get_hubspot_company_columns()
)
}}

{{ fivetran_utils.source_relation(
union_schema_variable='rag_hubspot_union_schemas',
union_database_variable='rag_hubspot_union_databases')
}}
from base
),

final as (

select
company_id,
source_relation,
is_company_deleted,
cast(_fivetran_synced as {{ dbt.type_timestamp() }}) as _fivetran_synced,
company_name,
description,
created_date,
industry,
street_address,
street_address_2,
city,
state,
country,
company_annual_revenue

from fields

)

select *
from final
{{ config(enabled = var('rag__using_hubspot', True)) }}

WITH FINAL AS (

SELECT
{{ dbt_utils.star(
from = ref('stg_rag_hubspot__company_fields'),
except = ['id', '_fivetran_synced', 'is_deleted', 'property_name', 'property_description', 'property_createdate', 'property_industry', 'property_address', 'property_address_2', 'property_city', 'property_state', 'property_country', 'property_annualrevenue' ]
) }},
id AS company_id,
CAST(_fivetran_synced AS {{ dbt.type_timestamp() }}) AS _fivetran_synced,
is_deleted AS is_company_deleted,
property_name AS company_name,
property_description AS description,
property_createdate AS created_date,
property_industry AS industry,
property_address AS street_address,
property_address_2 AS street_address_2,
property_city AS city,
property_state AS state,
property_country AS country,
property_annualrevenue AS company_annual_revenue
FROM
{{ ref('stg_rag_hubspot__company_fields') }}
)
SELECT
*
FROM
FINAL
30 changes: 30 additions & 0 deletions models/staging/hubspot_staging/stg_rag_hubspot__company_fields.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{ config(enabled=var('rag__using_hubspot', True)) }}

with base as (

{{
fivetran_utils.union_data(
table_identifier='company',
database_variable='rag_hubspot_database',
schema_variable='rag_hubspot_schema',
default_database=target.database,
default_schema='rag_hubspot',
default_variable='hubspot_company',
union_schema_variable='rag_hubspot_union_schemas',
union_database_variable='rag_hubspot_union_databases'
)
}}
),

fields as (

select
*
{{ fivetran_utils.source_relation(
union_schema_variable='rag_hubspot_union_schemas',
union_database_variable='rag_hubspot_union_databases')
}}
from base
)

select * from fields
Loading