-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allows multiple requests per server per request ID #13742
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13742 +/- ##
============================================
+ Coverage 61.75% 63.79% +2.04%
- Complexity 207 1611 +1404
============================================
Files 2436 2710 +274
Lines 133233 151272 +18039
Branches 20636 23348 +2712
============================================
+ Hits 82274 96500 +14226
- Misses 44911 47539 +2628
- Partials 6048 7233 +1185
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
…posed to map entries
…tting QueryOptionKey.SERVER_RETURN_FINAL_RESULT
@siddharthteotia @jackjlli @vvivekiyer I'd love to get some guidance on the regression suites. I believe I understand why the failures exist, because depending on the order of which components get upgraded first, brokers and servers may be incompatible across versions with this change as it is currently proposed. If servers were guaranteed to be upgraded first, upgrades should be smooth. However, if brokers were upgraded first then server responses of data tables would be lacking the expected metadata field What's the preferred approach for this kind of change? It may be possible to create intentional throw-away code that would provide a compatibility bridge, but this would imply that getting to the final state would require multiple upgrades (possibly spanning multiple minor versions, which feels very drawn out). |
@@ -771,15 +756,14 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S | |||
// - Compile time function invocation | |||
// - Literal only queries | |||
// - Any rewrites | |||
if (pinotQuery.isExplain()) { | |||
// if (pinotQuery.isExplain()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why is it commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -49,12 +50,12 @@ enum Status { | |||
/** | |||
* Returns the current server responses without blocking. | |||
*/ | |||
Map<ServerRoutingInstance, ServerResponse> getCurrentResponses(); | |||
Map<ServerRoutingInstance, List<ServerResponse>> getCurrentResponses(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the code change in this PR is backward compatible. We definitely need to keep the existing signatures as is. Can we introduce new signatures and mark the existing ones Deprecated
, so that the old signatures can be cleaned up in the next official release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fully recognize that I may be missing some important details here, but can you help me understand the implications of changing this interface? I see that the interface QueryResponse
and its only implementation AsyncQueryResponse
are defined in the org.apache.pinot.core.transport
package, but all usages are isolated to org.apache.pinot.broker
.
I agree that the change needs to be backward compatible with the data sent from Pinot servers to the brokers so that mismatched versions of code running on brokers Vs servers can interoperate during upgrade. But I'm not yet certain that this interface needs to remain unchanged in order to accomplish that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackjlli I believe I've found a way to maintain compatibility such that brokers with this version could roll out and communicate with servers on a prior version, while still gathering the information required. This is done through borrow a single digit of the request ID (the least significant digit, such that it does not increase the probability of a broker ID hash collision) in order to use that bit to identify if a query pertained to a REALTIME or OFFLINE table.
Because in this version servers will only receive 1 or 2 queries, always targeting the same table "base" (i.e. foo of foo_OFFLINE), we can infer from the table type REALTIME or OFFLINE which request the server response pertains to. This system will not work in the future when a server will return responses for many different tables, but as soon as we have server versions deployed which always return the table name in the dataTable metadata, we can fully dismantle this requestId hijacking. Using the request ID to convey table type only needs to exist for the moments (or hours, depending on size of cluster) where brokers are expecting table name in dataTable metadata, but servers are not yet running the version which does so.
I feel like this provides a good bridge to be able to move forward with subsequent Logical Table implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes specifically can be viewed like so (a subset of overall commits): https://github.com/apache/pinot/pull/13742/files/2ffd65d8f869f5f707af708f1680cbb45fb3a2ea..734189bc7da58705f6090102abbec76758ab84ee
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also wanted to add that the way the server responds is not changing such that the servers return a list and therefore the brokers need to change what they expect. It's only that all responses from the same server are grouped as a list upon receipt within the brokers.
*/ | ||
@ThreadSafe | ||
public final class ServerRoutingInstance { | ||
private static final String SHORT_OFFLINE_SUFFIX = "_O"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with this change we kind of lose the way of knowing how many servers get queried/respond for each table type. I understand that the number of queries being hit to the same server can be reduced, but is it possible to include this table type information in the request and response, so that we don't lose this stat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linking comments: https://github.com/apache/pinot/pull/13742/files#r1716084206
// instance within the same query | ||
return 31 * 31 * _hostname.hashCode() + 31 * Integer.hashCode(_port) + _tableType.hashCode(); | ||
return 31 * _hostname.hashCode() + Integer.hashCode(_port); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will make the hashcode inconsistent between two different versions of broker and servers. Can you double check if this has any side effects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will have one major intentional implication, which is that we would no longer open 2 channels from each broker to each server (1 for REALTIME, 1 for OFFLINE). In turn, that would cause the peak network traffic on a channel to at worst double, assuming all queries were to hybrid tables. I'm not certain whether that presents an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth stating that the motivation in doing so is to be able to have a single broker request fanout to an arbitrary number of requests to any single server involved in query processing. This unblocks a major hurdle in the ability to support Logical Tables[1].
[1] #10712
return _responseMap.get(serverRoutingInstance).getResponseDelayMs(); | ||
// TODO(egalpin): How to get query hash here? | ||
return -1L; | ||
// return _responseMap.get(serverRoutingInstance).getResponseDelayMs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be uncommented, right?
|
||
// TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables | ||
if (realtimeBrokerRequest != null && (!pinotQuery.isExplain() || offlineBrokerRequest != null)) { | ||
// Don't send explain queries to realtime for OFFLINE or HYBRID tables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackjlli I'll remove lines 759-766 as it's handled here now instead.
…server always sends tableName in meta
This reverts commit 23a980f.
@siddharthteotia @jackjlli @vvivekiyer I recognize that this PR makes changes on a critical path so would it be mutually beneficial to schedule a walk-through of this review over video conference? I'd be happy to do so |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late review. The overall direction looks good.
I want to see what is the final desired way to handle multiple requests per request id. Do you plan to use a single request id on broker side and let server split it on a per table basis? I think that works, and actually opens up opportunity to do first level merge on the server side before sending the response to broker.
* hostname, port, and table type it serves. | ||
* <p>Different table types on same host and port are counted as different instances. Therefore, one single Pinot Server | ||
* might be treated as two different routing target instances based on the types of table it serves. | ||
* hostname and port. | ||
*/ | ||
@ThreadSafe | ||
public final class ServerRoutingInstance { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the table type info is no longer maintained here, we should be able to remove this class and directly use ServerInstance
as the key of the map
partitionId: Int, | ||
pinotSplit: PinotSplit, | ||
dataSourceOptions: PinotDataSourceReadOptions) | ||
partitionId: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(format) The format doesn't seem correct. Can you change it to follow the old format?
@@ -94,6 +94,7 @@ public class DataTableImplV4 implements DataTable { | |||
protected ByteBuffer _fixedSizeData; | |||
protected byte[] _variableSizeDataBytes; | |||
protected ByteBuffer _variableSizeData; | |||
// TODO(egalpin): add query hash to metadata, alongside requestId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this still apply?
@@ -52,4 +52,5 @@ struct InstanceRequest { | |||
4: optional bool enableTrace; | |||
5: optional string brokerId; | |||
6: optional list<string> optionalSegments; | |||
// 7: required i64 queryHash; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't add commented field here as it usually indicates that the field is deprecated, and we should not reuse the same id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah thanks, this is from an earlier concept of using query hash for unique identification, but physical table name is much more human readable and manageable with the same benefit. I'll remove.
@@ -57,6 +57,8 @@ public InstanceResponseBlock execute() | |||
InstanceResponseBlock instanceResponseBlock = instanceResponseOperator.nextBlock(); | |||
long endTime2 = System.currentTimeMillis(); | |||
LOGGER.debug("InstanceResponseOperator.nextBlock() took: {}ms", endTime2 - endTime1); | |||
// instanceResponseBlock.addMetadata(DataTable.MetadataKey.TABLE.getName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unrelated. Could you go over the changes again and clean up the unrelated changes and todos that doesn't apply?
|
||
// TODO(egalpin): Remove the concept of truncated request ID in next major release after all servers are expected | ||
// to send back tableName in DataTable metadata | ||
private final long _canonicalRequestId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the overall plan to use request id + table name as the unique identifier of the query? Can you share how do you plan to manage request id?
E.g. only have one request id on broker side, and server splits it into multiple requests? I'm also considering whether to enhance InstanceRequest
to have a list of <query + searchSegments + optionalSegments>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is query
in this case ? Is it the whole query string ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is it the PinotQuery
object ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jackie-Jiang ya I could definitely see how modifying InstanceRequest
(or making a v2, whatever approach) would result in the fewest total data exchanges between brokers and servers. There might also be advantages to servers being able to have the complete set of queries that they should process within a single context, because that might open the door to further optimizations or "reduce" phases that can be executed on the servers before returning to brokers.
How do you envision "enhancing InstanceRequest
to have a list of <query + searchSegments + optionalSegments>" might be accomplished in terms of managing upgrades? Would you recommend a distinct InstanceRequestV2
or some kind of way to make the existing InstanceRequest
hold a list of <query + searchSegments + optionalSegments>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vrajat query
in this context I believe would be a BrokerRequest
[1]
[1]
pinot/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java
Line 43 in fe7e9e2
private @org.apache.thrift.annotation.Nullable BrokerRequest query; // required |
package org.apache.pinot.common.utils.request; | ||
|
||
public class BrokerRequestIdUtils { | ||
public static final int TABLE_TYPE_OFFSET = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this is the core change to split one request into multiple ids. Currently this one will put a hard limit on the total tables queried. Do we need to reverse engineer the original request id? If not, we can probably simply using consecutive ids to represent each request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya so this is definitely a little funkier than I'd like haha. This whole idea around borrowing the last digit of the request ID is only for backward compatibility during deployment.
The end goal for how multiple server queries per broker query would work is via requestId + physical table name being a unique identifier. That's implemented in this PR[1]. The challenge is that having the servers identify which physical table their response DataTable pertains to requires a change on the server code (to always include table name in the DataTable headers[2]). During upgrade, there will be a time where brokers are upgraded and expecting servers to return physical table name in DataTable headers, but servers won't yet be upgraded and won't be guaranteed to include that table name data.
To bridge that period of time where brokers are updated but servers are not yet, my concept here is to use the request ID itself to inform brokers of the physical table name. With this first iteration of the Logical Table code in this PR, there is still only support for 1 broker request to hit 1 table: 1 realtime, 1 offline, or 1 hybrid. In the case of hybrid, 2 queries are sent to the server; this is where we use the request ID last digit to indicate whether the request was to a REALTIME or OFFLINE table. In the AsyncQueryResponse, we know the name of the table that the request was sent to based on the raw table name in the broker query, so we know any server response must pertain to that single raw table name. What we wouldn't know is REALTIME vs OFFLINE. So we set the last digit of the request ID only when it goes to the server based on the physical table type. We can then map that request ID from the server back to the canonical request ID that the broker understands (zero-out the last digit) in order to reduce the server responses properly in the broker.
In a future release version, after servers are already guaranteed to be sending back table name in DataTable metadata, we will need to remove this entire concept of hijacking the request ID to determine offline vs realtime.
I'd be happy to jump on a call/video call and walk through this more deeply as I admit it's fairly convoluted haha.
[1] https://github.com/apache/pinot/pull/13742/files#diff-c7c33a86af22a88bd07f57bf2856504c7e6fa159e6cbc366b971e12051c90f23R243-R250
[2] https://github.com/apache/pinot/pull/13742/files#diff-2bff83abd3f6e831acfe4b6d31a022f228710def4eea47db3929c6d90b3147ecR157
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the plan is to remove this request id special handling and make server return the physical table name in the final version, I'd suggest not introducing request id handling at first place.
There are 2 ways to finish the migration:
- Finish it in 2 releases: 1st release make server send back the table name; 2nd release change broker
- Finish it in 1 release: Make server send back the table name, and use a broker config flag to control broker behavior. Only enable the flag when all servers are deployed
Happy to jump on a call to get consensus
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, both make sense as good options! I would probably prefer option 2 (broker flag) so as to require the fewest number of releases that a cluster must flow through Ex. it would not be safe to skip over a "release 1" in option 1, clusters must have release 1 and then can be upgraded to a later version; feels more flexible with broker flag.
My concept is to have a singular request ID associated with all the server requests that are issued. Each request sent to servers would be sent in the same way that it is today, where each tables settings of configs would be respected. From the perspective of the server, it would just be a set of queries like any other, as if a query to each table had been made in quick succession. Then, using the shared request ID, the broker would be able to reduce all the server responses from each targeted table down to a singular result table. We could definitely use multiple request IDs and track them from the Broker side, I hadn't previously considered that. The current approach, when in its final form, would also instantiate a distinct BrokerRequest per table associated with a logical table. That would be done because certain table settings might cause query re-writes, like enforcing approximation for distinct queries based on table-level settings. We could definitely get more intelligent about this and merge into a single server request any queries that have the same query contents (i.e. same re-write or no re-writes). Here's a rough example of how that might work (changes shown between this PR branch and another dev branch I have): https://github.com/egalpin/pinot/compare/egalpin/refactor-broker-request...egalpin:pinot:egalpin/refactor-broker-request-logical-table-expansion?expand=1 |
*/ | ||
@ThreadSafe | ||
public final class ServerRoutingInstance { | ||
private static final String SHORT_OFFLINE_SUFFIX = "_O"; | ||
private static final String SHORT_REALTIME_SUFFIX = "_R"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of removing the TableType
, is another option to add table name ? Then the rest of the code after QueryRunner
would work as is since this code does not hard-code offline/realtime blocks ?
Relates to #10712
This PR proposes to remove the concept of separate "servers" for OFFLINE and REALTIME query handling. Instead, queries are uniquely identified based on the physical table that they target in the actual query (myTable_OFFLINE or myTable_REALTIME). The physical table name (i.e. including _OFFLINE or _REALTIME suffix) is unique per-server, per-broker-request.
This change helps pave the way for Logical Table support by allowing a single broker request to more easily "fanout" into arbitrarily many requests issued to each required server.