Skip to content

Commit

Permalink
feat(experiments): Support all internal and user filters for dw (#27603)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Marius Andra <[email protected]>
  • Loading branch information
3 people authored Jan 29, 2025
1 parent 376f261 commit 3397f68
Show file tree
Hide file tree
Showing 5 changed files with 710 additions and 60 deletions.
21 changes: 18 additions & 3 deletions posthog/hogql/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def _use_person_id_from_person_overrides(database: Database) -> None:
"if(not(empty(override.distinct_id)), override.person_id, event_person_id)",
start=None,
),
isolate_scope=True,
)


Expand Down Expand Up @@ -407,10 +408,24 @@ def define_mappings(warehouse: dict[str, Table], get_table: Callable):
)

if "person_id" not in warehouse[warehouse_modifier.table_name].fields.keys():
warehouse[warehouse_modifier.table_name].fields["person_id"] = ExpressionField(
name="person_id",
expr=parse_expr(warehouse_modifier.distinct_id_field),
events_join = (
DataWarehouseJoin.objects.filter(
team_id=team.pk,
source_table_name=warehouse_modifier.table_name,
joining_table_name="events",
)
.exclude(deleted=True)
.first()
)
if events_join:
warehouse[warehouse_modifier.table_name].fields["person_id"] = FieldTraverser(
chain=[events_join.field_name, "person_id"]
)
else:
warehouse[warehouse_modifier.table_name].fields["person_id"] = ExpressionField(
name="person_id",
expr=parse_expr(warehouse_modifier.distinct_id_field),
)

return warehouse

Expand Down
53 changes: 52 additions & 1 deletion posthog/hogql/database/test/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from posthog.models.group_type_mapping import GroupTypeMapping
from posthog.models.organization import Organization
from posthog.models.team.team import Team
from posthog.schema import DatabaseSchemaDataWarehouseTable, HogQLQueryModifiers, PersonsOnEventsMode
from posthog.schema import (
DataWarehouseEventsModifier,
DatabaseSchemaDataWarehouseTable,
HogQLQueryModifiers,
PersonsOnEventsMode,
)
from posthog.test.base import BaseTest, QueryMatchingTest, FuzzyInt
from posthog.warehouse.models import DataWarehouseTable, DataWarehouseCredential, DataWarehouseSavedQuery
from posthog.hogql.query import execute_hogql_query
Expand Down Expand Up @@ -656,3 +661,49 @@ def test_database_warehouse_joins_persons_poe_old_properties(self):
assert "some_field" in person_on_event_table.join_table.fields.keys() # type: ignore

print_ast(parse_select("select person.some_field.key from events"), context, dialect="clickhouse")

def test_database_warehouse_person_id_field_with_events_join(self):
credentials = DataWarehouseCredential.objects.create(
access_key="test_key", access_secret="test_secret", team=self.team
)
DataWarehouseTable.objects.create(
name="warehouse_table",
format="Parquet",
team=self.team,
credential=credentials,
url_pattern="s3://test/*",
columns={"id": "String", "user_id": "String", "timestamp": "DateTime64(3, 'UTC')"},
)
DataWarehouseJoin.objects.create(
team=self.team,
source_table_name="warehouse_table",
source_table_key="user_id",
joining_table_name="events",
joining_table_key="distinct_id",
field_name="events_data",
)
modifiers = HogQLQueryModifiers(
dataWarehouseEventsModifiers=[
DataWarehouseEventsModifier(
table_name="warehouse_table",
id_field="id",
timestamp_field="timestamp",
distinct_id_field="user_id",
)
]
)
db = create_hogql_database(team_id=self.team.pk, modifiers=modifiers)

context = HogQLContext(
team_id=self.team.pk,
enable_select_queries=True,
database=db,
)

actual_table = db.get_table("warehouse_table")
person_id_field = actual_table.fields.get("person_id")

assert isinstance(person_id_field, FieldTraverser)
assert person_id_field.chain == ["events_data", "person_id"]

print_ast(parse_select("SELECT person_id FROM warehouse_table"), context, dialect="clickhouse")
5 changes: 4 additions & 1 deletion posthog/hogql/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,10 @@ def visit_field(self, node: ast.Field):
new_node: ast.Expr = ast.Alias(alias=node.type.name, expr=new_expr, hidden=True)

if node.type.isolate_scope:
self.scopes.append(ast.SelectQueryType(tables={node.type.name: node.type.table_type}))
table_type = node.type.table_type
while isinstance(table_type, ast.VirtualTableType):
table_type = table_type.table_type
self.scopes.append(ast.SelectQueryType(tables={node.type.name: table_type}))

new_node = self.visit(new_node)

Expand Down
Loading

0 comments on commit 3397f68

Please sign in to comment.