-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(wren-ai-service): Optimize SQL Data Preprocessing with Progressive Data Reduction #1331
Conversation
WalkthroughThe changes introduce a new inner function Changes
Sequence Diagram(s)sequenceDiagram
participant Pre as preprocess()
participant RD as reduce_data_size()
participant Log as Logger
Pre->>RD: Call reduce_data_size(data, reduction_step)
loop while token count > 100,000
RD->>Pre: Remove reduction_step items from data
Pre->>Log: Log updated token count
end
Pre-->>Pre: Return sql_data, num_rows_used_in_llm, tokens
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
wren-ai-service/src/pipelines/retrieval/preprocess_sql_data.py (1)
16-43
: Consider architectural improvements for better flexibility and performance.The current implementation could benefit from several architectural improvements:
- Consider caching token counts to avoid recalculation
- Make the reduction strategy configurable
- Add metrics for monitoring the reduction process
Here are some suggestions:
- Add configuration parameters to the pipeline class:
class PreprocessSqlData(BasicPipeline): def __init__( self, llm_provider: LLMProvider, max_token_count: int = 100_000, reduction_strategy: str = "progressive", reduction_step: int = 50, **kwargs, ):
- Consider implementing a token count cache:
from functools import lru_cache @lru_cache(maxsize=1000) def calculate_token_count(data_str: str, encoding: tiktoken.Encoding) -> int: return len(encoding.encode(data_str))
- Add metrics for monitoring:
from prometheus_client import Counter, Histogram data_reduction_iterations = Counter( 'sql_data_reduction_iterations_total', 'Number of iterations needed to reduce data size' ) token_reduction_time = Histogram( 'sql_data_reduction_seconds', 'Time spent reducing data size' )Would you like me to create an issue to track these architectural improvements?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
wren-ai-service/src/pipelines/retrieval/preprocess_sql_data.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
d6effa3
to
215a4f6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
wren-ai-service/src/pipelines/retrieval/preprocess_sql_data.py (2)
22-46
: Consider returning a copy to prevent unintended modifications.While the function has been improved with input validation, documentation, and edge case handling, it could be further enhanced by:
- Returning a copy to prevent unintended modifications of the original data
- Being more specific in the docstring about the return type being a shallow copy
Apply this diff to implement these improvements:
def reduce_data_size(data: list, reduction_step: int = 50) -> list: """Reduce the size of data by removing elements from the end. Args: data: The input list to reduce reduction_step: Number of elements to remove (must be positive) Returns: - list: A list with reduced size + list: A shallow copy of the input list with reduced size Raises: ValueError: If reduction_step is not positive """ if reduction_step <= 0: raise ValueError("reduction_step must be positive") elements_to_keep = max(0, len(data) - reduction_step) - returned_data = data[:elements_to_keep] + returned_data = data[:elements_to_keep].copy() logger.info( f"Reducing data size by {reduction_step} rows. " f"Original size: {len(data)}, New size: {len(returned_data)}" ) return returned_data
52-66
:❓ Verification inconclusive
Enhance the data reduction loop implementation.
While the loop has been improved with infinite loop protection, consider these additional enhancements:
- Define the token limit as a constant
- Add more detailed logging including token reduction progress
- Consider caching or optimizing token count calculation
Apply this diff to implement these improvements:
+ MAX_TOKEN_COUNT = 100_000 + - while _token_count > 100_000: + while _token_count > MAX_TOKEN_COUNT: if iteration > 1000: - """ - Avoid infinite loop - If the token count is still too high after 1000 iterations, break - """ + logger.error( + f"Failed to reduce token count below {MAX_TOKEN_COUNT} " + f"after 1000 iterations. Current count: {_token_count}" + ) break iteration += 1 data = sql_data.get("data", []) + previous_token_count = _token_count sql_data["data"] = reduce_data_size(data) num_rows_used_in_llm = len(sql_data.get("data", [])) _token_count = len(encoding.encode(str(sql_data))) - logger.info(f"Token count: {_token_count}") + token_reduction = previous_token_count - _token_count + logger.info( + f"Iteration {iteration}: Token count reduced by {token_reduction} " + f"({previous_token_count} -> {_token_count})" + )Run this script to check if the token count reduction is effective:
🏁 Script executed:
#!/bin/bash # Description: Check if token count reduction is effective # Look for patterns where token count doesn't decrease significantly rg "Token count" | grep -E "Token count.*->.*" | \ awk -F'[()]' '{ split($2, counts, "->") before = counts[1] after = counts[2] reduction = before - after if (reduction < 1000) { print "Small reduction detected:", $0 } }'Length of output: 253
Enhance the data reduction loop in
wren-ai-service/src/pipelines/retrieval/preprocess_sql_data.py
.The improvements for infinite loop protection and logging are a good step forward. As further enhancements, please:
- Define the token limit as a constant (e.g.,
MAX_TOKEN_COUNT
) to improve readability and maintainability.- Enrich the logging to clearly show the progress of token reduction between iterations.
- Consider ways to optimize or cache the token count calculation if it becomes a performance concern.
The shell script provided to verify token count reduction did not produce output, so I recommend manual verification to ensure that each iteration is indeed making significant progress in reducing the token count.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
wren-ai-service/src/pipelines/retrieval/preprocess_sql_data.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
215a4f6
to
9825d2f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
wren-ai-service/src/pipelines/retrieval/preprocess_sql_data.py (2)
22-46
: Consider additional improvements to the helper function.While the function has been improved with input validation and documentation, consider these additional enhancements:
- Return a copy of the data to prevent unintended modifications
- Add type hints for better IDE support and static type checking
Apply this diff to implement these improvements:
- def reduce_data_size(data: list, reduction_step: int = 50) -> list: + def reduce_data_size(data: list[any], reduction_step: int = 50) -> list[any]: """Reduce the size of data by removing elements from the end. Args: data: The input list to reduce reduction_step: Number of elements to remove (must be positive) Returns: - list: A list with reduced size + list[any]: A new list with reduced size, preserving the original Raises: ValueError: If reduction_step is not positive """ if reduction_step <= 0: raise ValueError("reduction_step must be positive") elements_to_keep = max(0, len(data) - reduction_step) - returned_data = data[:elements_to_keep] + returned_data = data[:elements_to_keep].copy() logger.info( f"Reducing data size by {reduction_step} rows. " f"Original size: {len(data)}, New size: {len(returned_data)}" ) return returned_data
52-66
: Enhance the token reduction loop with constants and better logging.The loop implementation could benefit from these improvements:
- Define constants for magic numbers
- Enhance logging to show token count reduction
- Improve the break condition message
Apply this diff to implement these improvements:
+ MAX_TOKEN_COUNT = 100_000 + MAX_ITERATIONS = 1000 + + logger.info(f"Initial token count: {_token_count}, rows: {num_rows_used_in_llm}") - while _token_count > 100_000: - if iteration > 1000: + while _token_count > MAX_TOKEN_COUNT: + if iteration > MAX_ITERATIONS: """ Avoid infinite loop - If the token count is still too high after 1000 iterations, break + If the token count is still too high after MAX_ITERATIONS iterations, break """ + logger.error( + f"Failed to reduce token count below {MAX_TOKEN_COUNT} " + f"after {MAX_ITERATIONS} iterations. " + f"Final token count: {_token_count}, rows: {num_rows_used_in_llm}" + ) break iteration += 1 data = sql_data.get("data", []) + previous_token_count = _token_count sql_data["data"] = reduce_data_size(data) num_rows_used_in_llm = len(sql_data.get("data", [])) _token_count = len(encoding.encode(str(sql_data))) - logger.info(f"Token count: {_token_count}") + token_reduction = previous_token_count - _token_count + logger.info( + f"Iteration {iteration}: Token count reduced by {token_reduction} " + f"({previous_token_count} -> {_token_count})" + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
wren-ai-service/src/pipelines/retrieval/preprocess_sql_data.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pytest
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
🔇 Additional comments (1)
wren-ai-service/src/pipelines/retrieval/preprocess_sql_data.py (1)
18-72
: LGTM! The integration is well-structured.The overall integration of the progressive data reduction logic within the
preprocess
function is clean and maintains backward compatibility:
- Helper function is properly scoped
- Token count tracking is maintained
- Function signature and return values remain unchanged
This PR improves the SQL data preprocessing pipeline by implementing a progressive data reduction approach for handling large datasets.
Key Updates:
reduce_data_size
helper function that gradually reduces data size in stepsTechnical Details:
Benefits:
Testing:
Please verify:
Summary by CodeRabbit