-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(data-warehouse): Some minor memory optimisations #28621
Conversation
Hey @Gilbert09! 👋 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR updates SQL-related dependencies and improves database connection handling in the data import pipelines.
- Fixed critical SQLAlchemy engine execution options bug in
/posthog/temporal/data_imports/pipelines/sql_database/__init__.py
by properly chaining method calls - Added memory optimization in
/posthog/temporal/data_imports/pipelines/sql_database/helpers.py
withmax_row_buffer
andstream_results
settings - Updated PostgreSQL connection string to explicitly use psycopg driver with 'postgresql+psycopg://' format
- Upgraded core SQL packages including SQLAlchemy (2.0.38), psycopg (3.2.4), and related dependencies while maintaining backwards compatibility
- Reorganized imports and removed redundancies in arrow_helpers.py for better code organization
5 file(s) reviewed, 2 comment(s)
Edit PR Review Bot Settings | Greptile
engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) | ||
engine = engine_from_credentials(credentials, may_dispose_after_use=True).execution_options( | ||
stream_results=True, max_row_buffer=2 * chunk_size | ||
) | ||
metadata = metadata or MetaData(schema=schema) | ||
|
||
table_obj: Table | None = metadata.tables.get("table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: table lookup uses hardcoded string 'table' instead of the table parameter
table_obj: Table | None = metadata.tables.get("table") | |
table_obj: Table | None = metadata.tables.get(table) |
result = conn.execution_options( | ||
yield_per=self.chunk_size, max_row_buffer=DEFAULT_CHUNK_SIZE * 2, stream_results=True | ||
).execute(query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Setting max_row_buffer to 2x chunk_size could still cause OOM issues with very large chunk sizes. Consider adding an upper bound or making this configurable.
Changes