Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broker & server api for realtime table freshness #13249

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

priyen
Copy link
Contributor

@priyen priyen commented May 28, 2024

One of the potential solutions to #12477 was a broker api that returns table freshness info.

This PR:

  • adds a new broker api /debug/tableFreshness/{table}?timeoutMs=1000 that returns the minimum ingestion lag timestamp reported by servers across all the consuming segments. Underneath, the broker uses a new server api, /tables/{table}/consumingSegmentsFreshnessInfo that takes in a list of segments and returns the ingestion lag timestamp for the consuming segments among them
  • the broker was chosen because it is a component designed to be highly available, fast, & the information needed is right there: the routing map.
  • the api will route to 1 of the replica's based on whatever config is set, just as it does for any normal query

Example usage from broker:

❯ curl -i 'http://localhost:8000/debug/tableFreshness/airlineStats_REALTIME?timeoutMs=1000'
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 30

{"timestamp-ms":1716915412399}%

Example server usage:

❯ curl -X POST -H "Content-Type: application/json" \
-d '["airlineStats__0__0__20240528T1641Z", "airlineStats__4__0__20240528T1641Z"]' \
http://localhost:7500/tables/airlineStats/consumingSegmentsFreshnessInfo

{"airlineStats__4__0__20240528T1641Z":1716915549395,"airlineStats__0__0__20240528T1641Z":1716915552629}%

Possible improvements in future PRs:

  • collect this data periodically via a "FreshnessManager" of sorts
    cc @Jackie-Jiang

Instructions:

  1. The PR has to be tagged with at least one of the following labels (*):
    1. feature
    2. bugfix
    3. performance
    4. ui
    5. backward-incompat
    6. release-notes (**)
  2. Remove these instructions before publishing the PR.

(*) Other labels to consider:

  • testing
  • dependencies
  • docker
  • kubernetes
  • observability
  • security
  • code-style
  • extension-point
  • refactor
  • cleanup

(**) Use release-notes label for scenarios like:

  • New configuration options
  • Deprecation of configurations
  • Signature changes to public methods/interfaces
  • New plugins added or old plugins removed

@codecov-commenter
Copy link

codecov-commenter commented May 28, 2024

Codecov Report

Attention: Patch coverage is 3.37079% with 86 lines in your changes are missing coverage. Please review.

Project coverage is 62.12%. Comparing base (59551e4) to head (dbf1aef).
Report is 500 commits behind head on master.

Files Patch % Lines
...che/pinot/broker/routing/BrokerRoutingManager.java 6.81% 41 Missing ⚠️
...che/pinot/server/api/resources/TablesResource.java 0.00% 26 Missing ⚠️
...e/pinot/broker/api/resources/PinotBrokerDebug.java 0.00% 19 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13249      +/-   ##
============================================
+ Coverage     61.75%   62.12%   +0.37%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2534      +98     
  Lines        133233   139101    +5868     
  Branches      20636    21549     +913     
============================================
+ Hits          82274    86417    +4143     
- Misses        44911    46227    +1316     
- Partials       6048     6457     +409     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 62.09% <3.37%> (+0.38%) ⬆️
java-21 61.99% <3.37%> (+0.36%) ⬆️
skip-bytebuffers-false 62.11% <3.37%> (+0.36%) ⬆️
skip-bytebuffers-true 61.97% <3.37%> (+34.24%) ⬆️
temurin 62.12% <3.37%> (+0.37%) ⬆️
unittests 62.12% <3.37%> (+0.37%) ⬆️
unittests1 46.68% <ø> (-0.21%) ⬇️
unittests2 27.79% <3.37%> (+0.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From high level, I'd suggest adding the API to the controller side and follow these steps:

  1. Look at IS of the table and find all consuming segments
  2. Ask servers hosting the consuming segments about the freshness of the target table
  3. Collect result and respond

Having partition level freshness could also be useful for debugging purpose

@ApiResponse(code = 404, message = "Table not found"),
@ApiResponse(code = 500, message = "Internal server error")
})
public Map<String, Long> getSegmentsFreshnessInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to get the table level freshness value (minimum of freshness value across all streaming partitions) and return it as one single long value. Do you see the need of return per segment freshness? Ideally it should be directly pulled from RealtimeTableDataManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right that the minimum freshness computation can be done here to avoid having to push the map of the segments as part of the response

this does go against the idea of having partition level freshness info for debugging purposes. What do you suggest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can provide 2 APIs (or combining 2 functionalities into one API), one returning the per partition info, one return the table level info. The API should only take the table name as input

@priyen
Copy link
Contributor Author

priyen commented May 29, 2024

@Jackie-Jiang regarding broker vs controller - I was relying on the broker being aware of which segments can be queried or not. (ie, we know some servers hosting segments are not ready as they are not yet caught up). Additionally, the broker routing map means we can cycle through the replica's based on the query strategy of the table to determine the freshness

we have some FUD about adding to controller for above reasons & controller is not generally a high QPS or reliable component but broker is expected to be

@ApiParam(value = "Name of the table", example = "myTable | myTable_REALTIME",
required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Timeout in milliseconds") @QueryParam("timeoutMs") @DefaultValue("10000") long timeoutMs) {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you call this API with an offline table, this line will just append _REALTIME to the table name. e.g. XXX_OFFLINE -> XXX_REALTIME
Then it will pass the offline check on line 134
I think we should check if it's an offline table first

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants