Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feart(streaming): query cache #2392

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open

feart(streaming): query cache #2392

wants to merge 14 commits into from

Conversation

hekike
Copy link
Contributor

@hekike hekike commented Mar 8, 2025

Query cache that merges cached data with fresh data:

  1. Look up cached rows (old usage, > 24h)
  2. Query new cachable data if any (old enough usage but not cached yet: cached < x < h24)
  3. Store new cachable data for future queries
  4. Queries fresh usage from events table (< 24h)
  5. Merges cached and fresh data (aggregates in memory if needed)

Only enabled when cachable is set true, for HTTP requests when clientId is present it is set to true automatically (UI queries).

Summary by CodeRabbit

  • New Features
    • Enhanced caching for meter queries, delivering faster and more reliable responses when unique identifiers are provided.
    • Improved merging of cached and fresh data for consistent query performance.
    • Introduced deterministic parameter processing for reliable caching behavior.
  • Tests
    • Expanded test coverage for caching, data merging, and error handling to ensure robust and dependable performance.

Copy link

coderabbitai bot commented Mar 8, 2025

Walkthrough

The pull request introduces caching capabilities and enhancements for meter queries. It adds conditional caching logic in the query handler when a ClientID is provided and extends the query parameters with a deterministic hash function. New functions and SQL generation methods are implemented for caching and merging meter query rows for ClickHouse. Additionally, mock implementations for ProgressManager and ClickHouse interactions are added, and the Connector’s query handling is refactored to include progress tracking and improved error handling. Comprehensive unit tests have been introduced to validate these new functionalities.

Changes

File(s) Summary
openmeter/meter/httphandler/query.go Added a conditional check in QueryMeter to enable caching when ClientID is provided.
openmeter/progressmanager/adapter/mock.go Introduced a new mock for the ProgressManager service with MockProgressManager and its methods (GetProgress, DeleteProgressByRuntimeID, UpsertProgress, and constructor).
openmeter/streaming/clickhouse/mock.go Added mocks for ClickHouse: MockClickHouse and MockRows with methods to simulate database operations (e.g., Query, QueryRow, Select, etc.).
openmeter/.../raw_events/cache.go, cachemerge.go, cachequery.go, connector.go, meter_query.go Implemented caching, query merging, and SQL generation for meter queries. Refactored the Connector to split query functions (adding queryMeterWithProgress and updated queryMeter), create cache tables, and improve row scanning error handling.
openmeter/.../raw_events/cache_test.go, cachemerge_test.go, cachequery_test.go, connector_test.go Added comprehensive unit tests covering caching logic, merging of cached and fresh rows, SQL generation, and Connector query behavior including error scenarios.
openmeter/streaming/query_params.go Extended QueryParams with a new Hash() method and a Cachable field to generate a deterministic hash based on time, filtering, grouping, and window parameters.
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@hekike hekike changed the title Feat/query cache feart(streaming): query cache Mar 8, 2025
@hekike hekike added area/processor release-note/feature Release note: Exciting New Features labels Mar 8, 2025
@hekike hekike marked this pull request as ready for review March 9, 2025 02:33
@hekike hekike requested a review from a team as a code owner March 9, 2025 02:33
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🔭 Outside diff range comments (3)
pkg/gosundheit/logger.go (1)

29-40: 🧹 Nitpick (assertive)

Consider using a log level check instead of commenting out code.

Rather than commenting out the log statements, consider using a conditional check based on log level. This approach is more maintainable as log levels can be adjusted at runtime.

func (c checkListener) OnCheckStarted(name string) {
-	// c.logger.Debug("starting health check", slog.String("check", name))
+	if c.logger.Enabled(context.Background(), slog.LevelDebug) {
+		c.logger.Debug("starting health check", slog.String("check", name))
+	}
}

func (c checkListener) OnCheckCompleted(name string, result health.Result) {
	if result.Error != nil {
		c.logger.Warn("health check failed", slog.String("check", name), slog.Any("error", result.Error))

		return
	}

-	// c.logger.Debug("health check completed", slog.String("check", name))
+	if c.logger.Enabled(context.Background(), slog.LevelDebug) {
+		c.logger.Debug("health check completed", slog.String("check", name))
+	}
}
openmeter/streaming/clickhouse/raw_events/cachemerge_test.go (1)

232-327: 🧹 Nitpick (assertive)

Comprehensive aggregation tests.

The tests for aggregateMeterQueryRows cover the main aggregation types:

  • sum
  • count
  • min
  • max

And they verify both the aggregated values and the preservation of metadata like subject, window range, and group-by values.

Consider adding test cases for the remaining aggregation types:

  • avg (Average)
  • unique_count (Unique Count)

These might have different behaviors worth testing.

openmeter/streaming/clickhouse/raw_events/cachequery_test.go (1)

181-229: 🧹 Nitpick (assertive)

Row scanning test is effective; extend to check multiple rows if possible.
Current approach confirms handling of group-by keys with empty strings. Adding a multi-row fixture would better validate iterative scanning behavior.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c73a71a and 3e6c834.

📒 Files selected for processing (15)
  • etc/seed/seed.yaml (1 hunks)
  • openmeter/meter/httphandler/query.go (1 hunks)
  • openmeter/progressmanager/adapter/mock.go (1 hunks)
  • openmeter/streaming/clickhouse/mock.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachemerge.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachemerge_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachequery.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cachequery_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/connector.go (5 hunks)
  • openmeter/streaming/clickhouse/raw_events/connector_test.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/meter_query.go (3 hunks)
  • openmeter/streaming/query_params.go (2 hunks)
  • pkg/gosundheit/logger.go (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: CI
  • GitHub Check: Analyze (go)
🔇 Additional comments (34)
openmeter/progressmanager/adapter/mock.go (5)

12-12: Good practice using compile-time interface check.

Using a variable declaration with the blank identifier to verify interface implementation at compile-time is an excellent practice. This ensures the mock properly implements the expected interface.


14-16: Constructor implementation is clean.

This follows Go's standard constructor pattern for creating a new instance of the mock object.


18-21: Proper mock structure.

The MockProgressManager struct correctly embeds mock.Mock from the testify package, which is the standard approach for creating testify mocks.


28-31: Method implementation looks good.

The DeleteProgressByRuntimeID method correctly registers the call and returns the expected error.


33-36: Method implementation looks good.

The UpsertProgress method correctly registers the call and returns the expected error.

openmeter/streaming/clickhouse/mock.go (3)

1-12: Well-structured mock implementation with proper imports.

The mock implementation follows good practices by importing the necessary dependencies and implementing the required interfaces.


11-20: Good practice using interface assertion for compile-time verification.

The var _ clickhouse.Conn = &MockClickHouse{} line ensures that the mock properly implements the interface at compile time.


77-101: Good implementation of MockRows with proper interface assertion.

The implementation of the MockRows struct and its methods follows good practices and correctly asserts interface compliance.

pkg/gosundheit/logger.go (2)

40-40: Reduced log verbosity by commenting out debug statements.

Commenting out the debug log for health check completions will reduce log noise, especially for successful checks.


29-30:

✅ Verification successful

Reduced log verbosity by commenting out debug statements.

Commenting out the debug log for health check starts will reduce log noise, which is appropriate for high-frequency health checks.


🏁 Script executed:

#!/bin/bash
# Check how frequently health checks are performed
rg -A 5 "RegisterCheck|AddCheck" --type go

Length of output: 327


Approval: Removal of Debug Logging Verified

The debug log in pkg/gosundheit/logger.go (lines 29–30) has been commented out to reduce log noise for high-frequency health check events. The frequency observed in health check registration (in app/common/termination.go) confirms that this change appropriately minimizes unnecessary logging.

  • The change in OnCheckStarted reduces verbosity during frequent health checks.
  • The registration frequency in app/common/termination.go supports this adjustment.
etc/seed/seed.yaml (1)

12-13: Enhanced timestamp generation for better testing.

The new timestamp generation creates random times within the last 30 days, which is beneficial for testing caching functionality that depends on time-based patterns.

openmeter/meter/httphandler/query.go (2)

113-116: Added client-based caching enablement.

The code now enables caching when a client ID is provided, which aligns with the PR objective of implementing query caching for UI queries.


113-116:

❓ Verification inconclusive

Verify the impact of enabling caching on query performance.

The automatic enabling of caching for client ID-based requests is a significant change that could impact query performance and resource usage.


🌐 Web query:

What are the performance implications of query caching in ClickHouse?

💡 Result:

ClickHouse's query cache can significantly enhance performance for read-intensive workloads by reducing query execution time and server load, but its effectiveness depends on proper configuration and understanding of its design limitations. Here's a detailed analysis:

Key Performance Benefits

  • Latency Reduction: Cached query results can reduce response times from seconds to milliseconds for repeated queries, as seen in benchmarks where a query dropped from 8.7 seconds to 0.002 seconds after caching[3][4].
  • Resource Efficiency: Avoids redundant computation for frequent or expensive queries, lowering CPU and memory usage[1][12].
  • Scalability: Transactionally inconsistent design prevents invalidation bottlenecks, making it suitable for high-throughput OLAP workloads[1][4].

Configuration Considerations

  • Entry Eligibility: Queries must meet criteria like minimum execution time (use_query_cache_min_query_duration) or frequency (use_query_cache_min_query_runs) to be cached[1][4].
  • Size Limits: Default 1 MiB entry size may exclude moderately large results; adjust via max_entry_size in server config[1][3].
  • TTL Management: Default 60-second cache lifetime balances freshness with reuse; modify using query_cache_ttl[3][4].

Potential Limitations

  • Stale Results: Cache doesn't auto-invalidate on data changes, risking outdated results until entries expire. Suitable for slowly changing data but problematic for real-time use cases[4][6].
  • Security Isolation: By default, entries are user-specific to prevent cross-user data leaks. While configurable via query_cache_share_between_users, sharing reduces security[1][6].
  • Non-Deterministic Functions: Queries using now() or rand() are excluded by default; override with query_cache_store_results_of_queries_with_nondeterministic_functions[1][4].

Operational Best Practices

  • Selective Enablement: Activate caching per-query with SETTINGS use_query_cache=1 rather than globally to avoid unnecessary overhead[3][4].
  • Monitoring: Use system.query_cache for entries, system.events for hit/miss rates, and system.asynchronous_metrics for memory usage[4][7].
  • Clear Strategies: Manually invalidate stale entries with SYSTEM DROP QUERY CACHE or tag-based removal (SYSTEM DROP QUERY CACHE TAG)[4][7].

Performance Tradeoffs

  • Write Overhead: Caching adds minor latency (~5-10%) during initial query execution due to result storage[1][4].
  • Memory Pressure: Large caches may compete with other processes; monitor via QueryCacheBytes metric[4][7].

In summary, ClickHouse's query cache offers substantial performance gains for repetitive analytical queries but requires careful tuning of expiration policies, security settings, and eligibility rules. It complements other optimizations like proper indexing and projections but isn't a substitute for them[8][12].

Citations:


Action Required: Validate Query Caching Impact on Performance

After reviewing the detailed performance implications of ClickHouse's query caching, note that while enabling caching—especially for client ID-based requests—can significantly reduce query latency and lower server load under the right conditions, it also introduces several configuration-dependent tradeoffs. In particular:

  • Eligibility & Configuration: Verify that queries meet the necessary criteria (such as minimum execution time and frequency) to be effective candidates for caching.
  • Resource Overhead: Monitor the initial overhead when storing query results and potential memory pressure, especially if many queries are cached simultaneously.
  • Staleness & Security: Ensure that the benefits of lower latency don’t come at the cost of stale data or compromised isolation between users.
  • Benchmark Testing: Conduct targeted performance benchmarks to confirm that auto-enabling caching for client ID-based requests yields the expected performance improvements without adverse side effects.

Please validate these points in your current setup to confirm that the caching behavior is correctly tuned for your workload.

openmeter/streaming/query_params.go (1)

17-17: Good addition of the Cachable property.

Adding a Cachable flag to the QueryParams struct is a clean approach for controlling the caching behavior at the query parameter level.

openmeter/streaming/clickhouse/raw_events/meter_query.go (2)

141-141: Return nil instead of empty slice for invalid aggregation type.

This change maintains better error semantics by returning nil instead of an empty slice for the args parameter when there's an error. This improves consistency with error handling patterns in Go.


245-306: Well-structured row scanning implementation.

The scanRows method efficiently handles scanning database rows into the application data structure. It properly:

  • Initializes the GroupBy map
  • Handles nil values for measurements
  • Processes group-by fields correctly
  • Skips empty rows

Two minor concerns to address:

  1. There's a TODO comment about using decimal that should be resolved.
  2. The error handling could be more specific about which part of the scanning failed.

Regarding the TODO comment on line 272, do you plan to implement decimal support for more precise calculations? Float64 can have precision issues with financial or billing data.

openmeter/streaming/clickhouse/raw_events/connector_test.go (2)

20-102: Good test coverage for successful query execution.

This test effectively validates the query execution flow and result processing by:

  • Setting up a proper mock ClickHouse connection
  • Validating that the correct query parameters are passed
  • Confirming that rows are properly scanned
  • Verifying that the results match expected values

The test structure is clean and well-organized.


103-137: Comprehensive error handling test cases.

The test covers the key error scenarios:

  1. Basic query errors
  2. Meter not found errors (which should be wrapped appropriately)
  3. Row scanning errors

This provides good coverage of the error paths in the code.

openmeter/streaming/clickhouse/raw_events/cachemerge_test.go (2)

15-181: Good test coverage for merging cached and fresh rows.

The tests for mergeMeterQueryRows effectively cover various scenarios:

  • Empty cached rows
  • Window size handling
  • Aggregation with and without window size
  • Different subjects
  • Group by values

This provides a solid foundation for ensuring the merging functionality works correctly.


183-230: Well-designed key generation tests.

The tests for getMeterQueryRowKey verify that keys are correctly generated for different combinations of:

  • Subject-only grouping
  • Multiple group-by fields
  • Missing group-by fields

This ensures the cache lookup mechanism will work correctly with various grouping configurations.

openmeter/streaming/clickhouse/raw_events/cachemerge.go (5)

16-21: No concerns found with the fallback approach for missing cache.
The function returning freshRows when cachedRows is empty is a straightforward and sensible fallback.


22-32: Conditional return for windowed queries looks correct.
Concatenating and sorting when WindowSize is specified preserves expected query semantics without extra overhead.


50-60: Straightforward approach to store and reduce grouped results.
Returning the aggregated slice after processing each group keeps the logic easy to follow.


62-87: Key generation logic is robust and consistent.
Sorting params.GroupBy ensures deterministic ordering in the composite key. This approach avoids collisions and maintains clarity when merging rows.


89-134: Aggregation for Sum, Min, and Max is clear; check "Count" assumption.
Currently, "Count" is lumped in with "Sum," which implies that each row’s Value is assumed to represent exactly 1, or the total is meant to be the sum of those values. If “Count” should reflect the number of rows, then iterating over rows and adding 1 each time may be more accurate unless you guarantee each row’s Value is 1.

Would you like to confirm that each row’s Value is indeed 1 for “Count” scenarios? If not, we can revise the aggregator accordingly.

openmeter/streaming/clickhouse/raw_events/cachequery_test.go (1)

15-39: Solid validation of CREATE TABLE SQL.
Making sure column definitions and engine specs exist covers the key functionality. This test effectively prevents accidental regressions in table creation SQL.

openmeter/streaming/clickhouse/raw_events/connector.go (5)

80-82: Creation of meter query cache table on initialization is good practice.
Ensuring the cache table is set up upfront avoids runtime failures when querying or merging cached results.


115-122: Parameter validation is well-placed.
Checking the namespace and query inputs at the beginning of QueryMeter clarifies error handling early on.


124-129: Deterministic ordering of group-by keys aids testability and correctness.
Sorting params.GroupBy before building queries helps ensure consistent SQL generation and results.


163-179: Progress tracking for client-identified queries is beneficial.
This implementation cleanly separates the logic for progress-enabled queries from standard execution, making the flow easier to maintain.


457-478: Delegating progress tracking logic into a dedicated method is clean.
Separating counting logic via countSQL from the main query ensures better readability and maintainability.

openmeter/streaming/clickhouse/raw_events/cache.go (1)

21-42: Potential extension for aggregator coverage.
Currently, isQueryCachable returns true for Sum, Count, Min, and Max aggregations. If additional aggregations (e.g., Average) are introduced in the future, kindly verify whether they should be included here as well.

Would you like to confirm if aggregator types like Average or other custom metrics should also be cacheable via a broader aggregator check across the codebase?

openmeter/streaming/clickhouse/raw_events/cachequery.go (1)

106-112:

✅ Verification successful

Confirm type compatibility for UNIX timestamps.
Here, the code filters rows by doing comparisons against d.From.Unix() and d.To.Unix() when the column type is DateTime. Some ClickHouse drivers allow implicit casting, but it’s safer to verify or explicitly cast.

Consider confirming correctness by reviewing how the driver and schema handle UNIX integer timestamps for DateTime columns, for example:


🏁 Script executed:

#!/bin/bash
# Check if implicit casting from Int64 to DateTime works reliably
rg -A 10 "WHERE.*window_start" 

Length of output: 5543


Confirmed: Implicit Casting is Working Correctly

The integration tests in cachequery_test.go and cache_test.go confirm that the SQL queries are built with the expected UNIX timestamp values (using d.From.Unix() and d.To.Unix()). The tests verify that these integer values are correctly used in comparisons (e.g., "window_start >= ?" and "window_end <= ?"), indicating that the ClickHouse driver implicitly casts these Int64 values to DateTime without issue. No changes are necessary.

openmeter/streaming/clickhouse/raw_events/cache_test.go (1)

533-535: Truncation logic looks good.
Verifying hours, minutes, and seconds are zeroed correctly ensures day-level caching accuracy. This is a clean approach.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3e6c834 and adb225b.

📒 Files selected for processing (3)
  • openmeter/progressmanager/adapter/mock.go (1 hunks)
  • openmeter/streaming/clickhouse/mock.go (1 hunks)
  • openmeter/streaming/clickhouse/raw_events/cache.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: E2E
  • GitHub Check: CI
  • GitHub Check: Analyze (go)
🔇 Additional comments (10)
openmeter/progressmanager/adapter/mock.go (4)

1-12: Well-structured imports and interface validation.

The code has well-organized imports (standard library, third-party, internal) and includes compile-time verification that MockProgressManager implements the progressmanager.Service interface, which is a best practice.


14-16: Simple and effective constructor implementation.

The constructor follows Go's conventional naming pattern and correctly initializes the mock object.


18-21: Correct mock structure setup.

The MockProgressManager struct properly embeds mock.Mock to leverage the testify mocking capabilities.


23-32: Safe type assertion implementation.

The GetProgress method safely handles nil values by checking before performing type assertion, preventing potential panics during testing.

openmeter/streaming/clickhouse/raw_events/cache.go (3)

44-56: Ensure boundary conditions are handled in the new QueryMeter.
remainingQueryMeterFactory appropriately updates From to match the cachedQueryMeter.To. However, confirm whether From is inclusive or exclusive, and whether partial overlap with the cached period could occur. If the last cached window is inclusive, there is a risk of double counting or accidental gaps if it is exclusive. Documenting or verifying the intended behavior would prevent subtle off-by-one errors.


157-183: Ensure parameterized queries to prevent possible injection.
lookupCachedMeterRows uses a struct to build SQL statements and arguments. Confirm that toSQL() securely parameterizes inputs like hp.Database or hp.Namespace. Doing so helps prevent accidental SQL injection if any user-controlled strings are ever passed into these fields.


203-216: Confirm table creation method is idempotent.
createMeterQueryCacheTable calls Exec on a query to create the cache table if it does not exist, returning an error if creation fails. Verify that the underlying SQL includes an “IF NOT EXISTS” clause or equivalent so running this method multiple times won’t fail once the table is established.

openmeter/streaming/clickhouse/mock.go (3)

52-55: LGTM! Fixed the duplicate parameter issue.

The PrepareBatch method now correctly passes options only once to m.Called(), fixing the issue raised in a previous review.


62-65: LGTM! Consistent parameter handling.

The Exec method now uses the same pattern for handling parameters as other methods in the file, addressing the consistency concern from a previous review.


11-126: Add test coverage for the mock implementation.

While the mock implementation looks solid, there's currently no test coverage specifically for this mock implementation. To ensure the mocks behave as expected and prevent regressions, consider adding tests.

#!/bin/bash
# Check if there are any test files for the mock implementation
fd --extension go --exec grep -l "Test.*MockClickHouse\|Test.*MockRows" {} \;

Comment on lines +15 to +18
const (
minCachableDuration = 3 * 24 * time.Hour
minCacheableToAge = 24 * time.Hour
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider making cache duration constants configurable.
While minCachableDuration set to 3 days and minCacheableToAge set to 24 hours may suffice for now, making these thresholds adjustable via configuration (e.g., from environment variables or a config file) can accommodate changing requirements without a code deployment.

Comment on lines +20 to +42
// isQueryCachable returns true if the query params are cachable
func isQueryCachable(m meter.Meter, p streaming.QueryParams) bool {
// We only cache queries where cachable is set to true
if !p.Cachable {
return false
}

// We can only cache queries that have a from time
if p.From == nil {
return false
}

from := *p.From
to := lo.FromPtrOr(p.To, time.Now().UTC())
duration := to.Sub(from)

// It must be at least 3 days of usage to be cachable
if duration < minCachableDuration {
return false
}

return m.Aggregation == meter.MeterAggregationSum || m.Aggregation == meter.MeterAggregationCount || m.Aggregation == meter.MeterAggregationMin || m.Aggregation == meter.MeterAggregationMax
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Clarify or log why a query is not cacheable.
The checks ensure queries must be explicitly marked as cachable, must have p.From specified, must span at least 3 days, and must use one of the supported aggregations (SUM, COUNT, MIN, MAX). If the query fails any of these checks, the function simply returns false.

  • Consider logging or returning a diagnostic reason for debugging when a query is deemed not cacheable.
  • If future plans include new aggregations (e.g., AVERAGE), consider centralizing the supported aggregator checks in a table or map for easier expansion.

Comment on lines +58 to +121
// queryMeterCached queries the meter view and manages the cache, as:
// 1. Look up cached rows
// 2. Query new rows for the uncached time period
// 3. Cache the new results
// It returns the cached rows and the updated query meter.
func (c *Connector) queryMeterCached(ctx context.Context, hash string, originalQueryMeter queryMeter) (queryMeter, []meterpkg.MeterQueryRow, error) {
var values []meterpkg.MeterQueryRow

createReaminingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)

// Calculate the period to query from the cache
queryMeterCached, err := c.getQueryMeterForCachedPeriod(originalQueryMeter)
if err != nil {
return originalQueryMeter, values, err
}

// Step 1: Look up cached rows
cachedValues, err := c.lookupCachedMeterRows(ctx, hash, queryMeterCached)
if err != nil {
return originalQueryMeter, values, fmt.Errorf("failed to lookup cached meter rows: %w", err)
}

// If we have cached values, add them to the results
// Also, update the query range to query uncached periods
if len(cachedValues) > 0 {
c.config.Logger.Debug("cached rows found", "from", queryMeterCached.From, "to", queryMeterCached.To, "count", len(cachedValues))

values = append(values, cachedValues...)

// We use the last cached window as the start of the new query period
lastCachedWindow := cachedValues[len(cachedValues)-1].WindowEnd
queryMeterCached.From = &lastCachedWindow

// If we've covered the entire range with cached data, return early
if lastCachedWindow.Equal(*queryMeterCached.To) {
c.config.Logger.Debug("no new rows to query for cache period, returning cached data", "count", len(values))

return createReaminingQueryMeter(queryMeterCached), values, nil
}
}

// Step 2: Query new rows for the uncached time period
newRows, err := c.queryMeter(ctx, queryMeterCached)
if err != nil {
return originalQueryMeter, values, fmt.Errorf("query new meter rows: %w", err)
}

values = append(values, newRows...)

// Step 3: Cache the new results
if len(newRows) > 0 {
if err := c.insertRowsToCache(ctx, hash, queryMeterCached, newRows); err != nil {
// Log the error but don't fail the query
c.config.Logger.Error("failed to store new rows in cache", "error", err, "from", queryMeterCached.From, "to", queryMeterCached.To, "count", len(newRows))
} else {
c.config.Logger.Debug("new rows stored in cache", "from", queryMeterCached.From, "to", queryMeterCached.To, "count", len(newRows))
}
}

// Result
c.config.Logger.Debug("returning cached and new rows", "from", queryMeterCached.From, "to", queryMeterCached.To, "count", len(values))

return createReaminingQueryMeter(queryMeterCached), values, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Handle partial cache insert errors more explicitly.
queryMeterCached logs cache insertion errors but continues execution successfully. This design is often acceptable; however, if the cache’s consistency is critical, consider a more robust error-handling approach (e.g., retry logic or fallback). Otherwise, a partial failure could degrade subsequent query performance without being surfaced to the caller. Also, confirm the user-facing implications if caching fails silently.

Comment on lines +123 to +155
// adjustQueryTimeRange prepares the time range for a query meter operation
func (c *Connector) getQueryMeterForCachedPeriod(originalQueryMeter queryMeter) (queryMeter, error) {
cachedQueryMeter := originalQueryMeter
now := time.Now().UTC()

if originalQueryMeter.From == nil {
return cachedQueryMeter, fmt.Errorf("from is required for cached queries")
}

// Set the end time to now if not provided
if cachedQueryMeter.To == nil {
cachedQueryMeter.To = &now
}

// We do not cache data that is less than 24 hours old
toFresness := now.Sub(*cachedQueryMeter.To)

if toFresness < minCacheableToAge {
delta := minCacheableToAge - toFresness

cachedQueryMeter.To = lo.ToPtr(cachedQueryMeter.To.Add(-delta))
}

// We truncate to complete days to avoid partial days in the cache
cachedQueryMeter.To = lo.ToPtr(cachedQueryMeter.To.Truncate(time.Hour * 24))

// This is the window size that the cache will use if no window size is provided
if cachedQueryMeter.WindowSize == nil {
cachedQueryMeter.WindowSize = lo.ToPtr(meter.WindowSizeDay)
}

return cachedQueryMeter, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Truncating the query range might be surprising for users.
getQueryMeterForCachedPeriod reduces the requested To timestamp to ensure data under 24 hours old is never cached, then truncates to whole days. These rules are clear but may be unexpected if the user explicitly requests a range that includes recent data or non-midnight timestamps. Consider adding an informational log or comment clarifying this enforced boundary.

func (c *Connector) queryMeterCached(ctx context.Context, hash string, originalQueryMeter queryMeter) (queryMeter, []meterpkg.MeterQueryRow, error) {
var values []meterpkg.MeterQueryRow

createReaminingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Fix typographical error in the local variable name.
The variable createReaminingQueryMeter is misspelled and inconsistent with the function name remainingQueryMeterFactory.

-	createReaminingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)
+	createRemainingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
createReaminingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)
createRemainingQueryMeter := c.remainingQueryMeterFactory(originalQueryMeter)

Comment on lines +88 to +91
func (m *MockRows) Next() bool {
args := m.Called()
return args.Bool(0)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider adding a method for pre-configuring sequence-based responses.

For methods like Next() that are typically called in a sequence during iteration, it would be useful to have a helper method to set up a series of return values.

// Example helper method to add to MockRows
func (m *MockRows) SetupNextSequence(sequence ...bool) *MockRows {
    for _, val := range sequence {
        m.On("Next").Return(val).Once()
    }
    return m
}

This would make tests more readable when setting up mock behavior for row iteration.

Comment on lines +22 to +25
func (m *MockClickHouse) Query(ctx context.Context, query string, queryArgs ...interface{}) (driver.Rows, error) {
callArgs := m.Called(ctx, query, queryArgs)
return callArgs.Get(0).(driver.Rows), callArgs.Error(1)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Improve handling of variadic arguments for precise test expectations.

In methods accepting variadic arguments (like this one), passing queryArgs directly to m.Called() can make setting up precise test expectations difficult. When a slice is passed as the last argument to m.Called(), the slice is treated as a single argument rather than expanding its elements.

For more precise control when setting up test expectations, consider using a more explicit approach:

func (m *MockClickHouse) Query(ctx context.Context, query string, queryArgs ...interface{}) (driver.Rows, error) {
-	callArgs := m.Called(ctx, query, queryArgs)
+	args := []interface{}{ctx, query}
+	for _, arg := range queryArgs {
+		args = append(args, arg)
+	}
+	callArgs := m.Called(args...)
	return callArgs.Get(0).(driver.Rows), callArgs.Error(1)
}

This change would apply to all methods with variadic parameters (Query, QueryRow, Select, AsyncInsert, Exec).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (m *MockClickHouse) Query(ctx context.Context, query string, queryArgs ...interface{}) (driver.Rows, error) {
callArgs := m.Called(ctx, query, queryArgs)
return callArgs.Get(0).(driver.Rows), callArgs.Error(1)
}
func (m *MockClickHouse) Query(ctx context.Context, query string, queryArgs ...interface{}) (driver.Rows, error) {
args := []interface{}{ctx, query}
for _, arg := range queryArgs {
args = append(args, arg)
}
callArgs := m.Called(args...)
return callArgs.Get(0).(driver.Rows), callArgs.Error(1)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/processor release-note/feature Release note: Exciting New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant