-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move stream.subscribe()
to be part of the constructor
#1997
Comments
I think we want this to be a declarative list of subscribers, soo something like: type ShapeStreamSubscriber = {
onMessages: (messages: Message[]) => void | Promise<void>,
onError: (err: unknown) => void
}
subscribers: ShapeStreamSubscriber[] | ShapeStreamSubscriber |
Is there not a slight chicken and egg thing of needing to instantiate a Shape to have the subscribers to instantiate the ShapeStream which is needed to instantiate the Shape ...? |
The alternative is an explicit .start() method on a shape stream. It solves two issues:
.start() would return a promise that resolves/fails depending on initial connection - such as auth failing. This is imposible to do with the sync constructor (the alternative is an async static .create() but that doesn't solve lazy subscription) After calling .start() we can lock the subscriptions and throw on any further .subscribe() calls. (Although there is an argument for late subscription for power users and edge cases...) |
This is part of the problem in how we currently use The way to do it with the declarative approach would be that There is also another way, using a sort of
We had a discussion around this with @KyleAMathews @kevin-dp @balegas - currently we are mixing two patterns, a declarative one and a builder one. If we opt for the fully declarative one as the core pattern, then we can very easily code up a const builder = new ShapeStreamBuilder()
..setUrl(url)
..setShapeDefinition(shapeDef)
..addSubscriber(sub)
// ... stuff happens ...
builder.addSubscriber(sub)
const shapeStream = builder.build() // or .run()
// internally the build call would create a declarative `ShapeStream` This solves the cases where people might want to lazily add subscribers without 1) having unclear semantics as to when the stream starts or whether they can add more subscribers after etc, 2) having any externally mutable state on the stream itself and having to deal with special cases and flags and what not.
We had discussed that this might not make that much sense, since a stream is a continuous thing and you might get auth errors or invalid relation errors at any point during the runtime of the stream - so separating the "initial connection" is extra overhead that could be handled by one generic declarative global We can have a static
I am of the opinion that if someone is this much of a power user and requires this sort of undefined behaviour they should be able to fairly easily write up their custom client implementation of the protocol - but also I think it's better to optimize the client for the intended use cases with minimal API surface and clear semantics and wait until we get requests for anything else! |
Initial reaction to builder concept was a -1, but it actually translates really nicely to a builder pattern for client side processing of the streams - so I'm a +1 now: Includes:const comments = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
const issues = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
.query() // Start building a query (may not be needed)
.include({
// Client side include of the two shapes
shape: comments,
key: 'id', // column on issues
on: 'issue_id' // column on comments
as: 'comments' // added the comments as a 'comments' array prop on all issues
})
.subscribe((msg) => doStuff(msg))
.run() // implies .run() on comments Joins:const issues = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
const comments = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
.query() // Start building a query (may not be needed)
.join({
// Client side join of the two shapes
shape: join,
key: 'issue_id', // column on comments
on: 'id' // column on issues
as: 'issue' // added the issue as a 'issue' prop on all comments
})
.subscribe((msg) => doStuff(msg))
.run() // implies .run() on issues |
@samwillis huh that is pretty compelling |
@samwillis happy that it works with that pattern as well! (although I'm unclear if the I do want to stress the point that the declarative pattern is easier to deal with and does not preclude us from having the builder pattern/ lazy sub as well - but I think our "core" API being declarative makes it easier to test/maintain (and it's my personal preference hehe... objective opinions only) The main issue with our current approach is that we allow people to subscribe to an ongoing stream, for which there are very limited and particular use cases and is much more likely to cause hard to catch bugs (it's always possible to just start a stream with a given offset if someone want's to "resume" mid-stream) |
@msfstef agreed, in general I prefer a declarative api. Thinking about it further, we don't need the implied "deep run", but not that the output of the join include won't start until both streams have started. I wander if the solution of passing a const stream = new ShapeStream({
url: `http://localhost:3000/v1/shape`,
table: 'items'
})
const shape = new Shape(stream) we do: const shape = new Shape({
shape: {
url: `http://localhost:3000/v1/shape`,
table: 'items'
}
}) This is what we did with the PGlite sync plugin: https://pglite.dev/docs/sync#using-the-sync-plugin-alpha |
@KyleAMathews note that we could add multiple subscribe calls at intermediate steps in a shape query: const comments = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
.query()
// we don't need to save this bit, just subscribe
// count all comments
comments.count(). subscribe((msg) => ...)
const issues = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
.query() // Start building a query (may not be needed)
.include({
// Client side include of the two shapes
shape: comments,
key: 'id', // column on issues
on: 'issue_id' // column on comments
as: 'comments' // added the comments as a 'comments' array prop on all issues
})
.subscribe((msg) => doStuff(msg))
.count()
.subscribe((msg) => ...) // count all issues
.run() // implies .run() on comments |
@samwillis re:
Ultimately a shape owns the stream, as it needs to be a subscriber to it from the start and start from offset -1 - perhaps you could even just specify |
Here's another thought — we don't want either declarative or builder — the nature of our system is we create streams than process streams.
import { ShapeStream } from '@electric-sql/client';
import { reduce } from '@electric-sql/stream';
const issueStream = new ShapeStream({...})
issueStream
.pipe(
reduce()
)
.subscribe(issues) => console.log(issues)); // Output: [issue1, issue2, issue3]
// A declarative Shape
function shape(shapeStreamOptions) {
const stream = new ShapeStream(shapeStreamOptions)
return stream.pipe(reduce())
}
const issues = shape({...})
issues.subscribe(issues) => console.log(issues)); // Output: [issue1, issue2, issue3] This relates to what we were talking about on discord about a generic stream lib. We have stream readers (one of which of course is ShapeStream) which provide a stream of operations against a fixed schema. And then stream operators which can map, filter, reduce, join, various aggregators. On autostart or not — in the stream processing world — most streams don't start moving bytes until there's a reader — so creating a stream and then not using it until it's piped in a processing pipeline or subscribed to directly makes sense to me. There's also the idea of replayable streams — we have that already with our offset-based caching scheme — so the result of any stream operator would also be URL-addressable and http-cachable as each stream operator just emits processed operations e.g. reduce just keeps |
@KyleAMathews I like the idea of the stream being "pull based" - and if you want multiple subscribers, you can first setup a subscriber to forward messages to e.g. 3 other subscribers "down the pipe" and then subscribe to the stream and you achieve the same effect. Making our stream match a standard stream library behaviour would be really nice |
I'm not sure we can w/o a lot of acrobatics — but it's definitely worth giving it a serious look cause yeah, it'd save a lot of work |
"tee" is a typical term here for splitting the stream — then if you have one fork in the stream that's reading faster, the tee will buffer for the slower reader. If we all our stream stuff can be fronted with http and can cache logs to disk or other places, then the buffering & replay happens pretty much automatically. |
I like the reduce API. |
This is what I requested in #2018 (comment) The client now uses the new onError to handle 401/403 and reconnect with a new auth token. One observation: with the onError being defined before the shape is instantiated, I'm not sure what the best control flow is to handle a `405` must refetch. Certainly it's hard to handle in the onError callback. I guess this is what's being discussed in #1997 etc.
Bumping this issue to move it along - how are we feeling about If we make the change such that We could also not block it and allow mid-stream subscribing like we already do, and push the responsibility of correct usage to the developer (and allow "advanced" uses). In that case our API stays exactly the same, and the changes are only internal such that the stream starts on the first If we want to rename it to |
Waiting for the first subscribe call seems pretty reasonable — but still allowing mid-stream subscribes is also something I think we should allow e.g. that's what we're using for I think pipe could be a separate function perhaps e.g. import { pipe, map, sum } from "@electric-sql/stream"
const stream = new Shape({...})
const mappedStream = pipe(stream, map((value) => {...value, count: value.count * 2}))
// Sam's idea to add built-in `materialize` function to do what Shape does now.
mappedStream.materialize().subscribe(rows => {
// do something with materialized mapped rows
})
const sumStream = pipe(stream, sum(value => value.count))
subStream.subscribe() // sum of count |
I think materialise may be a separate import too, we are likely to have multiple implementations: in-memory, SQLite, PGlite, any other persistence. We would then also likely have a .toStream() on a materialized shape. In the case of a shape materialised to SQLite it would then provide an api that can be wrapped in a http api to subscribe to the shape. We have to persist the same in a way that a client can then request all changes from an offset. We also need to take a look at the underlying D2 lib, it may need some refactoring to enable the pipe style api, and currently needs a way to finalise a full graph covering all branches of interconnected queries/operators, before then being manually steps though as things are written to its inputs. All possible, but needs consideration. |
@msfstef whether we end up with the pipe api, or something similar, it will be a separate lib/import, and internally subscribe. As the underlying fetch is async, if multiple subscribes are added syncruonasly at the same time (in the same micro task), they won't miss any messages. A setTimeout(start, 0) would be belts and braces as it would ensure the start happens in a new task after any micro tasks. This will benimportant when constructing a pipeline what uses the same stream so do multiple computations. (See @KyleAMathews example above with one doing a map and the other doing the count) |
@samwillis re:microtasks - indeed this is the only reason that our test suite and most use cases are mostly working, my issue is that this behaviour will undoubtedly lead to weird, hard-to-discover bugs in the future, both for us and anyone using it. It seems to me that as an immediate action, we can change the I think this issue should stay open for us to decide on more major changes, but I think it's worth actioning the above and potentially save us annoying issues asap, for single sub cases at least. |
👍 to implementing immediately waiting for the first subscriber to start fetching. |
Addresses part of #1997 Had to change some tests around - obviously this is not a full move towards the desired behaviour, e.g. we might even want for the stream to stop when it has _no_ subscribers, but I think this provides some beneficial properties to the stream (e.g. if subscribing to it after async gap no messages lost) If we think this just complicates the client and that if we want to address the issue we should do it in one comprehensive go, I'm happy to close this as well
e.g.
new ShapeStream({ subscribe: () => {} })
The problem with
.subscribe()
is then people get the sense that they can subscribe whenever and they'll get the full stream where if you delay calling .subscribe then you'll miss the early messages. So to avoid this footgun and in general, keep construction of streams fixed & declarative, let's move subscriptions to the constructor.The text was updated successfully, but these errors were encountered: