Skip to content

Commit

Permalink
Revert "feat(data-warehouse): V2 pipeline release " (#27791)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Jan 22, 2025
1 parent 953a776 commit c0f6a80
Show file tree
Hide file tree
Showing 20 changed files with 1,012 additions and 367 deletions.
25 changes: 13 additions & 12 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argume
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible types in assignment (expression has type "list[str] | None", variable has type "list[str]") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "setup_incremental_object" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "dict[str, Any]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "base_url" to "RESTClient" has incompatible type "str | None"; expected "str" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "convert_types" has incompatible type "dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] | None"; expected "dict[str, dict[str, Any]] | None" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "exclude_keys" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "Mapping[str, Any]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible default for argument "resolved_param" (default has type "ResolvedParam | None", argument has type "ResolvedParam") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "convert_types" has incompatible type "dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] | None"; expected "dict[str, dict[str, Any]] | None" [arg-type]
posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
Expand Down Expand Up @@ -599,10 +597,6 @@ posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: d
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: def get(self, Type, Sequence[str], /) -> Sequence[str]
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: def [_T] get(self, Type, _T, /) -> Sequence[str] | _T
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: error: Argument "source_id" to "sync_old_schemas_with_new_schemas" has incompatible type "str"; expected "UUID" [arg-type]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "DataWarehouseCredential | Combinable | None") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "str | int | Combinable") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "dict[str, dict[str, str | bool]] | dict[str, str]", variable has type "dict[str, dict[str, str]]") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Item "None" of "dict[str, str] | None" has no attribute "get" [union-attr]
posthog/taxonomy/property_definition_api.py:0: error: Item "AnonymousUser" of "User | AnonymousUser" has no attribute "organization" [union-attr]
posthog/taxonomy/property_definition_api.py:0: error: Item "None" of "Organization | Any | None" has no attribute "is_feature_available" [union-attr]
posthog/taxonomy/property_definition_api.py:0: error: Item "ForeignObjectRel" of "Field[Any, Any] | ForeignObjectRel | GenericForeignKey" has no attribute "cached_col" [union-attr]
Expand Down Expand Up @@ -760,6 +754,13 @@ posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict k
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 20 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 21 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 22 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "FilesystemDestinationClientConfiguration" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "type[FilesystemDestinationClientConfiguration]" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "DataWarehouseCredential | Combinable | None") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "str | int | Combinable") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Right operand of "and" is never evaluated [unreachable]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Statement is unreachable [unreachable]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Name "raw_db_columns" already defined on line 0 [no-redef]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
Expand Down Expand Up @@ -789,13 +790,8 @@ posthog/api/plugin_log_entry.py:0: error: Name "timezone.datetime" is not define
posthog/api/plugin_log_entry.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined]
posthog/api/plugin_log_entry.py:0: error: Name "timezone.datetime" is not defined [name-defined]
posthog/api/plugin_log_entry.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined]
posthog/temporal/data_imports/external_data_job.py:0: error: Argument "status" to "update_external_job_status" has incompatible type "str"; expected "Status" [arg-type]
posthog/api/sharing.py:0: error: Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable) [union-attr]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/data_imports/test_end_to_end.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/external_data_job.py:0: error: Argument "status" to "update_external_job_status" has incompatible type "str"; expected "Status" [arg-type]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_execute_calls" (hint: "_execute_calls: list[<type>] = ...") [var-annotated]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_execute_async_calls" (hint: "_execute_async_calls: list[<type>] = ...") [var-annotated]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_cursors" (hint: "_cursors: list[<type>] = ...") [var-annotated]
Expand All @@ -812,6 +808,11 @@ posthog/api/test/test_capture.py:0: error: Dict entry 0 has incompatible type "s
posthog/api/test/test_capture.py:0: error: Dict entry 0 has incompatible type "str": "float"; expected "str": "int" [dict-item]
posthog/api/test/test_capture.py:0: error: Dict entry 0 has incompatible type "str": "float"; expected "str": "int" [dict-item]
posthog/api/test/test_capture.py:0: error: Dict entry 0 has incompatible type "str": "float"; expected "str": "int" [dict-item]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/data_imports/test_end_to_end.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment]
posthog/api/test/batch_exports/conftest.py:0: error: Signature of "run" incompatible with supertype "Worker" [override]
posthog/api/test/batch_exports/conftest.py:0: note: Superclass:
Expand Down
1 change: 1 addition & 0 deletions posthog/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ class FlagRequestType(StrEnum):

ENRICHED_DASHBOARD_INSIGHT_IDENTIFIER = "Feature Viewed"
DATA_WAREHOUSE_TASK_QUEUE = "data-warehouse-task-queue"
DATA_WAREHOUSE_TASK_QUEUE_V2 = "v2-data-warehouse-task-queue"
BATCH_EXPORTS_TASK_QUEUE = "batch-exports-task-queue"
SYNC_BATCH_EXPORTS_TASK_QUEUE = "no-sandbox-python-django"
GENERAL_PURPOSE_TASK_QUEUE = "general-purpose-task-queue"
Expand Down
20 changes: 17 additions & 3 deletions posthog/hogql/database/s3_table.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import re
from typing import Optional
from typing import TYPE_CHECKING, Optional

from posthog.clickhouse.client.escape import substitute_params
from posthog.hogql.context import HogQLContext
from posthog.hogql.database.models import FunctionCallTable
from posthog.hogql.errors import ExposedHogQLError
from posthog.hogql.escape_sql import escape_hogql_identifier

if TYPE_CHECKING:
from posthog.warehouse.models import ExternalDataJob


def build_function_call(
url: str,
Expand All @@ -15,7 +18,10 @@ def build_function_call(
access_secret: Optional[str] = None,
structure: Optional[str] = None,
context: Optional[HogQLContext] = None,
pipeline_version: Optional["ExternalDataJob.PipelineVersion"] = None,
) -> str:
from posthog.warehouse.models import ExternalDataJob

raw_params: dict[str, str] = {}

def add_param(value: str, is_sensitive: bool = True) -> str:
Expand All @@ -36,10 +42,18 @@ def return_expr(expr: str) -> str:

# DeltaS3Wrapper format
if format == "DeltaS3Wrapper":
query_folder = "__query_v2" if pipeline_version == ExternalDataJob.PipelineVersion.V2 else "__query"

if url.endswith("/"):
escaped_url = add_param(f"{url[:-1]}__query/*.parquet")
if pipeline_version == ExternalDataJob.PipelineVersion.V2:
escaped_url = add_param(f"{url[:-5]}{query_folder}/*.parquet")
else:
escaped_url = add_param(f"{url[:-1]}{query_folder}/*.parquet")
else:
escaped_url = add_param(f"{url}__query/*.parquet")
if pipeline_version == ExternalDataJob.PipelineVersion.V2:
escaped_url = add_param(f"{url[:-4]}{query_folder}/*.parquet")
else:
escaped_url = add_param(f"{url}{query_folder}/*.parquet")

if structure:
escaped_structure = add_param(structure, False)
Expand Down
3 changes: 3 additions & 0 deletions posthog/management/commands/start_temporal_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from posthog.constants import (
BATCH_EXPORTS_TASK_QUEUE,
DATA_WAREHOUSE_TASK_QUEUE,
DATA_WAREHOUSE_TASK_QUEUE_V2,
GENERAL_PURPOSE_TASK_QUEUE,
SYNC_BATCH_EXPORTS_TASK_QUEUE,
)
Expand All @@ -31,12 +32,14 @@
SYNC_BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_WORKFLOWS,
BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_WORKFLOWS,
DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS,
DATA_WAREHOUSE_TASK_QUEUE_V2: DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS,
GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_WORKFLOWS + DELETE_PERSONS_WORKFLOWS,
}
ACTIVITIES_DICT = {
SYNC_BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_ACTIVITIES,
BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_ACTIVITIES,
DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES,
DATA_WAREHOUSE_TASK_QUEUE_V2: DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES,
GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_ACTIVITIES + DELETE_PERSONS_ACTIVITIES,
}

Expand Down
2 changes: 2 additions & 0 deletions posthog/temporal/data_imports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
update_external_data_job_model,
check_billing_limits_activity,
sync_new_schemas_activity,
trigger_pipeline_v2,
)

WORKFLOWS = [ExternalDataJobWorkflow]
Expand All @@ -17,4 +18,5 @@
create_source_templates,
check_billing_limits_activity,
sync_new_schemas_activity,
trigger_pipeline_v2,
]
60 changes: 60 additions & 0 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import asyncio
import dataclasses
import datetime as dt
import json
import re
import threading
import time

from django.conf import settings
from django.db import close_old_connections
import posthoganalytics
import psutil
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import WorkflowAlreadyStartedError


from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2

# TODO: remove dependency
from posthog.settings.base_variables import TEST
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.common.client import sync_connect
from posthog.temporal.data_imports.workflow_activities.check_billing_limits import (
CheckBillingLimitsActivityInputs,
check_billing_limits_activity,
Expand Down Expand Up @@ -140,6 +150,32 @@ def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInputs) ->
)


@activity.defn
def trigger_pipeline_v2(inputs: ExternalDataWorkflowInputs):
logger = bind_temporal_worker_logger_sync(team_id=inputs.team_id)
logger.debug("Triggering V2 pipeline")

temporal = sync_connect()
try:
asyncio.run(
temporal.start_workflow(
workflow="external-data-job",
arg=dataclasses.asdict(inputs),
id=f"{inputs.external_data_schema_id}-V2",
task_queue=str(DATA_WAREHOUSE_TASK_QUEUE_V2),
retry_policy=RetryPolicy(
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=1,
non_retryable_error_types=["NondeterminismError"],
),
)
)
except WorkflowAlreadyStartedError:
pass

logger.debug("V2 pipeline triggered")


@dataclasses.dataclass
class CreateSourceTemplateInputs:
team_id: int
Expand All @@ -151,6 +187,22 @@ def create_source_templates(inputs: CreateSourceTemplateInputs) -> None:
create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id)


def log_memory_usage():
process = psutil.Process()
logger = bind_temporal_worker_logger_sync(team_id=0)

while True:
memory_info = process.memory_info()
logger.info(f"Memory Usage: RSS = {memory_info.rss / (1024 * 1024):.2f} MB")

time.sleep(10) # Log every 10 seconds


if settings.TEMPORAL_TASK_QUEUE == DATA_WAREHOUSE_TASK_QUEUE_V2:
thread = threading.Thread(target=log_memory_usage, daemon=True)
thread.start()


# TODO: update retry policies
@workflow.defn(name="external-data-job")
class ExternalDataJobWorkflow(PostHogWorkflow):
Expand All @@ -163,6 +215,14 @@ def parse_inputs(inputs: list[str]) -> ExternalDataWorkflowInputs:
async def run(self, inputs: ExternalDataWorkflowInputs):
assert inputs.external_data_schema_id is not None

if settings.TEMPORAL_TASK_QUEUE != DATA_WAREHOUSE_TASK_QUEUE_V2 and not TEST:
await workflow.execute_activity(
trigger_pipeline_v2,
inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(maximum_attempts=1),
)

update_inputs = UpdateExternalDataJobStatusInputs(
job_id=None,
status=ExternalDataJob.Status.COMPLETED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def _get_credentials(self):

def _get_delta_table_uri(self) -> str:
normalized_resource_name = NamingConvention().normalize_identifier(self._resource_name)
return f"{settings.BUCKET_URL}/{self._job.folder_path()}/{normalized_resource_name}"
# Appended __v2 on to the end of the url so that data of the V2 pipeline isn't the same as V1
return f"{settings.BUCKET_URL}/{self._job.folder_path()}/{normalized_resource_name}__v2"

def _evolve_delta_schema(self, schema: pa.Schema) -> deltalake.DeltaTable:
delta_table = self.get_delta_table()
Expand Down
8 changes: 1 addition & 7 deletions posthog/temporal/data_imports/pipelines/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
import deltalake as deltalake
from posthog.temporal.common.logger import FilteringBoundLogger
from posthog.temporal.data_imports.pipelines.pipeline.utils import (
_handle_null_columns_with_definitions,
_update_incremental_state,
_get_primary_keys,
_evolve_pyarrow_schema,
_append_debug_column_to_pyarrows_table,
_update_job_row_count,
_update_last_synced_at_sync,
table_from_py_list,
)
from posthog.temporal.data_imports.pipelines.pipeline.delta_table_helper import DeltaTableHelper
Expand Down Expand Up @@ -135,7 +133,6 @@ def _process_pa_table(self, pa_table: pa.Table, index: int):

pa_table = _append_debug_column_to_pyarrows_table(pa_table, self._load_id)
pa_table = _evolve_pyarrow_schema(pa_table, delta_table.schema() if delta_table is not None else None)
pa_table = _handle_null_columns_with_definitions(pa_table, self._resource)

table_primary_keys = _get_primary_keys(self._resource)
delta_table = self._delta_table_helper.write_to_deltalake(
Expand Down Expand Up @@ -176,14 +173,11 @@ def _post_run_operations(self, row_count: int):
process.kill()

file_uris = delta_table.file_uris()
self._logger.debug(f"Preparing S3 files - total parquet files: {len(file_uris)}")
self._logger.info(f"Preparing S3 files - total parquet files: {len(file_uris)}")
prepare_s3_files_for_querying(
self._job.folder_path(), self._resource_name, file_uris, ExternalDataJob.PipelineVersion.V2
)

self._logger.debug("Updating last synced at timestamp on schema")
_update_last_synced_at_sync(self._schema, self._job)

self._logger.debug("Validating schema and updating table")

validate_schema_and_update_table_sync(
Expand Down
Loading

0 comments on commit c0f6a80

Please sign in to comment.