Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add person distinct ID overrides squash job (as dagster job) #27710

Merged
merged 45 commits into from
Jan 22, 2025

Conversation

tkaemming
Copy link
Contributor

@tkaemming tkaemming commented Jan 20, 2025

Problem

Whenever a person merge occurs, we write the latest person_id that should be associated with the affected distinct_id(s) to the person_distinct_id_overrides table, and use these records to "override" the existing value in the events (or sharded_events) table's person_id column. Another helpful way to think of this overrides table is that it represents a queue of pending person_id updates to that haven't yet been applied to the events table.

The number of overrides within this table can accumulate over time to a degree where read performance can be negatively impacted due to the size of the overrides join used to update stale values at query time. This can also lead to a situation where all of a team's queries that use person_id can run into MemoryLimitExceeded errors when using an overrides-based PoE mode if the number of overrides for a team grows large enough.

Changes

This automates the manual squash process as a Dagster job.

Briefly,

  1. The subset of overrides to be squashed are loaded into a new replicated MergeTree table that is created on all hosts that will require it for executing mutations. Using a purpose-specific replicated table for this task ensures that the overrides state can be synchronized across all hosts without needing to pause replication or use other mechanisms to ensure all hosts are using the same data as a basis for running mutations.
  2. This table is then loaded into a dictionary for use in mutations. Since the dictionary must be built on each host independently, we also checksum the table contents to ensure that all hosts are synchronized before executing the mutations for an additional layer of safety.
  3. This dictionary is used in mutations that update the person_id column for all events that with a distinct_id in the overrides dictionary, and afterwards the same overrides dictionary is used to identify the rows that can be safely deleted from the main overrides table as they are no longer needed to "override" anything (the override rows now match the values in the base table.)
  4. Once all mutations have completed, the dictionary and snapshot table are dropped on all hosts where they were created.

A few implementation notes:

  1. The table and dictionary are run-specific and include the run ID as part of the table (or dictionary) name. This reduces the possibility for interference if two jobs are rum simultaneously, and makes it easier to find mutations in the mutations system table as we can search by the table name associated with the specific run.
  2. Mutations apply to the full dataset and cannot be limited by partition or team ID. This allows deleting from the override table by row version alone without other considerations.
  3. Mutations are targeted directly at relevant hosts, instead of ON CLUSTER with distributed DDL. This allows partial failure modes to be cleanly handled on retries without inadvertently scheduling redundant mutations. Mutations that are killed can be restarted, however, without needing to restart the whole job from the beginning.
  4. Clickhouse client options can be provided as part of the job run configuration, providing more control over query execution without requiring code or configuration deploys.

I also tried to keep a lot of the design and implementation relatively orchestrator-agnostic in case we need to replatform this again.

Does this work well for both Cloud and self-hosted?

Yes.

How did you test this code?

Added test (we also ran this manually to verify the queries and general idea work.)

dags/person_overrides.py Show resolved Hide resolved

def wait(self, client: Client) -> None:
while not self.is_done(client):
time.sleep(15.0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrarily chosen number, could be more, could be less.

dags/person_overrides.py Show resolved Hide resolved
Comment on lines -151 to -156
def get_cluster() -> ClickhouseCluster:
extra_hosts = []
for host_config in map(copy, CLICKHOUSE_PER_TEAM_SETTINGS.values()):
extra_hosts.append(ConnectionInfo(host_config.pop("host")))
assert len(host_config) == 0, f"unexpected values: {host_config!r}"
return ClickhouseCluster(default_client(), extra_hosts=extra_hosts)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to the non-ee package.

@@ -156,8 +157,7 @@ def default_client(host=settings.CLICKHOUSE_HOST):
)


@cache
def make_ch_pool(**overrides) -> ChPool:
def _make_ch_pool(*, client_settings: Mapping[str, str] | None = None, **overrides) -> ChPool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rename was necessary to allow bypassing the pool cache: the client settings map isn't hashable and therefore not usable in a cache key.

@@ -7,7 +7,7 @@
from posthog.test.base import PostHogTestCase, run_clickhouse_statement_in_parallel


def create_clickhouse_tables(num_tables: int):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change (and the other test changes to/around create_clickhouse_tables) wasn't actually necessary: I thought I as going to need to do more invasive fixture surgery than just the dag_tests/conftest.py edit and found the coupling to django_db_setup confusing so I simplified the logic here before realizing I wouldn't need to call this function directly.

@tkaemming tkaemming requested a review from a team January 21, 2025 00:46
@tkaemming tkaemming marked this pull request as ready for review January 21, 2025 00:46
Copy link
Contributor

@Daesgar Daesgar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good!

I tried to follow along with the current manual squash process. Love your extremely clean code.

Left some questions to understand the process better, but nothing blocking!

dags/person_overrides.py Show resolved Hide resolved
dags/person_overrides.py Show resolved Hide resolved
dags/person_overrides.py Show resolved Hide resolved
dags/person_overrides.py Show resolved Hide resolved
dags/person_overrides.py Show resolved Hide resolved
dags/person_overrides.py Show resolved Hide resolved
@tkaemming tkaemming merged commit db3e85d into master Jan 22, 2025
92 checks passed
@tkaemming tkaemming deleted the squash-dag branch January 22, 2025 16:59
fuziontech added a commit that referenced this pull request Jan 22, 2025
* master: (103 commits)
  feat(postgres-estimated-rows): pg Estimated Rows on Data Warehouse Sync (#27634)
  fix: revert darkmode class toggle, updated content on fills (#27783)
  chore: upgrade posthog-js (#27790)
  chore(editor-3001): add back join actions (#27740)
  feat: Add person distinct ID overrides squash job (as dagster job) (#27710)
  fix(created-by-sources): Adding `created_by` to sources (#27751)
  Revert "feat(data-warehouse): V2 pipeline release " (#27791)
  fix: typo for feature flags (#27786)
  fix(defer-unmounting): Defer unmounting of react elements (#27742)
  feat(data-warehouse): V2 pipeline release (#27732)
  fix(data-warehouse): Ensure dates are actual datetime formats (#27777)
  fix: enable hot reload for the products dir (#27746)
  fix: assignee selector when null (#27737)
  chore: clarify rrweb imports (#27776)
  chore(deps): Update posthog-js to 1.207.3 (#27779)
  feat(retention): filters on start/return event (#27770)
  fix(experiments): only show supported math functions (#27589)
  feat(web-analytics): Set unique conversions graph when adding conversions goal (#27774)
  chore: color design system part 1: banner and accents (#27756)
  chore(experiments): Add tests for funnel attribution options (#27752)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants