-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Dekaf control-plane components #1665
Conversation
c82e06c
to
56c04af
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Some comments for you.
} | ||
} | ||
|
||
pub fn pg_client(&self) -> postgrest::Postgrest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a down-side to re-building the client from scratch every time: we will no longer re-use the underlying connection pool, and will re-dial connections every time.
That's the primary reason that the refresh flow lived outside of this Client implementation, and resulted in a new Client being built.
I think it'd be better to have the refresh
routine be a free function in this crate that operates on a Client, and perhaps to provide a garden path for how to build a next Client after performing such a refresh, but to keep it a bit more arms-length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a down-side to re-building the client from scratch every time: we will no longer re-use the underlying connection pool, and will re-dial connections every time.
Riiight, yeah. So what if Client
kept around a single Postgrest
client that it hands out whenever somebody calls pg_client()
(postgrest::Client
is a thin wrapper around an Arc<InnerClient>
so it can be shared easily), and which only gets recreated upon refresh. In fact, we could even check if a refresh is needed inside the call to pg_client()
so you don't even need to think about it: any time you call .pg_client()
, you'll get a client with valid access credentials.
Ideally I'd love to wrap Postgrest
in a wrapper that checks and auto-refreshes its access token every time you make an API call. That way it could modify its own header map and not need to get re-created at all. But I think my proposed solution above is a decent compromise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's far too much coupling between Client
, which we should seek to keep very un-opinionated, with a highly opinionated refresh flow.
For example, refresh() will create a refresh token if it doesn't already have one, even if there's a perfectly valid access token available. Is that appropriate for flowctl? I think "yes", though even that is possibly debatable. Is it always appropriate? Definitely not.
Or what if we introduce a new mechanism for refresh tokens? We easily could -- we're currently ripping out gateway_auth_token after all.
I don't think Client should know anything about the manner in which tokens are obtained, and that that business logic belongs in free function(s) that operate on a Client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this what you meant? I think I got a bit lost between flow_client::Client
and postgrest::Postgrest
when reading your comments, but this should:
- Move opinionated refresh functionality to bare function to avoid coupling
flow_client::Client
to specific token refresh logic - Create a single
postgrest::Postgrest
atflow_client::Client
creation time and hand out clones of it with auth headers set to the current access token upon request. This should allow all users ofpg_client()
to share a connection pool, while still allowingflow_client::Client
to hand outpostgrest::Postgrest
instances to all of the places that need them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back up a step, and look at how flowctl on master
updates it's configuration and then writes it out on exit. This change, as is, breaks that workflow because token refresh is treated as in interior detail of flow_client::Client
, and I'm saying: it shouldn't be.
flow_client::Client
should be immutable exactly because of the kind of bug which currently exists in flowctl
within this branch.
This is how I think it should be structured:
flow_client::Client
, once created, is immutable. We don't do any interior mutation, ever.- Have a
flow_client::refresh_authorizations(client: &Client, access_token: Option<String>, refresh_token: Option<RefreshToken>) -> anyhow::Result<(String, RefreshToken)>
routine which takes an optional access and/or refresh token, and returns a guaranteed access and refresh token (or an error).- This routine should be the only place we parse the access token and do an expiration check -- currently this logic is also still duplicated in flowctl/lib.rs.
- Feel free to implement a further subroutine for getting an access token from a refresh token, if that's useful for dekaf.
- Use this method in
flowctl
to update the configured access and refresh token, and to then re-build the client, as it had been doing prior to this PR.
IIRC there isn't a reason for a flow_client::Client::claims()
routine aside from checking expiration, but I might be missing something. In any case, IMO this too should be a free function if it's still needed and Client should not have an opinion on the shape of an access token.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the problem, and why the raw access/refresh tokens need to be exposed -- in order for them to get set on flowctl::Config
+ written back to disk. I believe this fixes flowctl
writing updated creds back to disk. I tested it locally by removing user_access_token
from my local.json
profile, ran cargo run -p flowctl auth roles list
and it wrote back a new access token
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that works 👍
Fixes #1657 |
29145d8
to
6b056cb
Compare
A materialization of this kind is purely descriptive, and cannot be directly started. It is used to convey the intent to expose a set of bindings through Dekaf, which will look it up to determine things like binding names and field selection/projections.
…very journals, but they _should_ get ops stats+logs collections
…rticular dekaf binding
a69e659
to
e78de08
Compare
Largely retain the same functionality as `flowctl::Client`, just in a new home so that it can be shared without introducing inconvenient dependencies
e78de08
to
d0f5ba7
Compare
…, and re-use Postgrest connection pool instead of creating a new one on each call to `pg_client()`
d0f5ba7
to
1ac9c90
Compare
@jgraettinger re-requested your review. I believe I've addressed everything, except #1665 (comment) if I didn't understand exactly what you wanted there. |
`flowctl` needs these credentials exposed in order to store them in `flowctl::Config` + on disk for the next flowctl invocation
f9fcd6c
to
14c50a8
Compare
232af77
to
d55ecee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…seconds to respond
…_gazette_client()` gRPC connections seem to lose stability after a while, this temporarily fixes that
33d0741
to
e1362fd
Compare
This PR builds ontop of #1642, but is different enough that I didn't want to push to that PR after it was already reviewed. It is the first couple parts of #1622. We have:
materialize_unary
that is/will be responsible for validating these endpoint/resource configs.DekafConfig
andDekafResourceConfig
. I'm pretty sure it looks like teachingdekaf::materialize_unary
about the spec RPC, but not 100% sure.connector_tags
row and set it to discover/cache the schema? That's probably possible, but probably also won't just work OOTB.agent::ProxyConnectors
runtime::Runtime
.flowctl::Client
logic to a new crate:flow_client
that bothflowctl
anddekaf
can depend on/subjects
, return materialization bindings instead of all collectionsI think the above TODOs should be resolved by a follow-up PR in order to get this new materialization type to the UI team ASAP.
This change is