-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Current state of project and how to deploy it on k8s? #6
Comments
Hey ... The awareness issue is sorted ... It's a bit late night for me right now, tomorrow will share with you the test code where the resolution of awareness was tested. There is also another update which I have been meaning to push for some time which I'll push tomorrow. And yes k8s, horizontal pod scaling etc are all possible. Woul d answer your questions tomorrow. In the meantime.. can you consider a request ... If you are able to get the k8s setup done .. would you share its details with me? |
Hi @kapv89 many thanks for getting back to me. Yes, it would be great if you could get back to me regarding my questions. 😀 With the help of this, I can try to setup a helm chart for deploying everything in k8s and I will share it with you. 😀 |
@kapv89 Generally, I believe it should not be too much of a problem to build a helm chart for k8s deployment. However, what I would need to understand is:
Long story short: I need some more knowledge how to use the current version and how my personal preferences can be implemented. Then I think I know enough to write some helm charts for it. 😀 |
Hey .. https://github.com/kapv89/yjs-scalable-ws-backend-test ... this is the repo for inspecting sync of awareness across 2 separate processes. You can take a look at it, meanwhile let me get to your other questions |
Added something to provide better performance of persistence of updates by applying eventual-consistency ... copying that code from my private project and pushing that too ... basically no transactions and redis queues help with better persistence |
@junoriosity just updated the code and changed peristence logic to not use transactions and use redis queues for better persistence performance using eventual-consistency, and for better behaviour of client when a new user connects in an active, in-use document. Coming to your questions now |
In a production environment, the crdt-wss (crdt-websocket-server) will need access to a redis-cluster (under a key-prefix), and access to your api server. In a real world app, your api server will have the logic of persistence of updates, and of retrieval of updates ( PS: you'd need to use
Redis should be a service like GCP memorystore or AWS elasticache. You are better off with your cloud-provider managing it and providing observability to it.
You can see the environment variables used in this repo in
In a production app, you wouldn't have the crdt-wss connecting directly to DB, so in that case your .env would look like
Yes, but there is a gotcha when it comes to horizontally scaling websocket-servers. Basically, after the load-balancer has routed a request from a client to a particular websocket-server, all incoming packets from that client need to be routed to the same websocket-server. This is accomplished using a
Updates to a document come fast, like in a collaborative editing environment, let's say 10 users are on the same document, sending 3 edits per second, that's 30 edits per second on the same document -> lot's of inserts and deletes per-second in the db under the logic of persistence used in this crdt-wss. S3 won't be able to handle this rate of writes. But yes, s3 can be used as a long term storage of documents, which can be loaded into a fast primary db on demand. In this repo, and in my project, the primary db used is Posgresql because I have my eye on Yugabyte which can allow my db to scale horizontally. However there is a restriction, max size of Now coming to uploading documents to S3, you can use
Yes. It all comes down to being able to pass the auth-token (JWT) of the user to the crdt-wss, storing it against the connection, and using the auth-token in read and write operations to perform access control. I'll paste relevant bits of code from my project's codebase, if you need further help, I'm open to doing a call. export const getDocIdFromReq = (req: any): string => {
return req.url.slice(1).split('?')[0] as string;
}
// what's `doc.id` in my project code is `doc.name` in this repo export const getTokenFromReq = (req: any): string => {
const [, query] = req.url.split('?');
const data = qs.parse(query);
return data.token as string;
} export const app = express();
export const server = http.createServer(app);
export const wss = new WebSocketServer({noServer: true});
wss.on('connection', async (ws, req) => {
await setupWSConnection(ws, req);
});
server.on('upgrade', async (req, socket, head) => {
const token = getTokenFromReq(req);
try {
await checkAuth(token);
wss.handleUpgrade(req, socket as Socket, head, (ws) => {
wss.emit('connection', ws, req);
})
} catch (err) {
serverLogger.error(err);
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
}
}); export const connTokens = new Map<WebSocket, string>(); export default async function setupWSConnection(conn: WebSocket, req: http.IncomingMessage): Promise<void> {
conn.binaryType = 'arraybuffer';
const docId = getDocIdFromReq(req);
const token = getTokenFromReq(req); // this token can be used in `getUpdates`, which can call your API server
connTokens.set(conn, token);
const [doc, isNew] = getYDoc(docId);
doc.conns.set(conn, new Set());
...
// rest of the project code is similar to what can be found in this repo export const closeConn = (doc: WSSharedDoc, conn: WebSocket): void => {
const controlledIds = doc.conns.get(conn);
if (controlledIds) {
doc.conns.delete(conn);
awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null);
if (doc.conns.size == 0) {
doc.destroy();
docs.delete(doc.id);
}
}
connTokens.delete(conn); // this is the relevant line for managing tokens
conn.close();
} export const updateHandler = async (update: Uint8Array, origin: any, doc: WSSharedDoc): Promise<void> => {
const isOriginWSConn = origin instanceof WebSocket && doc.conns.has(origin)
let postFailed = false;
if (isOriginWSConn) {
try {
Promise.all([
pub.publishBuffer(doc.id, Buffer.from(update)),
pushDocUpdatesToQueue(doc, update)
]) // do not await
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeUpdate(encoder, update);
const message = encoding.toUint8Array(encoder);
doc.conns.forEach((_, conn) => send(doc, conn, message));
await persistUpdate(doc.id, update, connTokens.get(origin) as string); // notice use of token for persistence here
} catch {
postFailed = true;
}
}
if (postFailed && isOriginWSConn) {
closeConn(doc, origin);
}
}
I think I have already answered this
Not sure what you are looking for here -
Think I have already answered this, let me know if you have more questions
Think I have already answered this. @junoriosity let me know if you need anything else ... looking forward to that helm-chart 😄 |
Hi @kapv89, many thanks for all your input. That is a lot of information to digest. Perhaps we should continue our discussion on Slack or something as this might be better suited for some back and forth to get the backend as I need it and also to setup a decent helm chart. How does that sound to you? |
Hey @junoriosity … slack sounds good … I am kapv89 on slack too |
@kapv89 I could not find you there, so I added your e-mail as an external invite 😀 |
@junoriosity please continue the conversation here. Slack was too intrusive on my personal space. Just for reference, till now I've answered your questions in this issue, and I've explained several different parts of the architecture you want. And you have added helm charta to this repo which I need to verify if they are using sticky-sessions or not |
@kapv89 Sticky sessions depend on the reverse proxy configuration, which is not part of the helm chart. The helm chart brings your service into kubernetes. Just for the reference, we agreed on the idea that I would provide the code for the helm chart (which is there already) and you would provide (by 01/11) the required amendments on the backend to not persist in Postgresql, but rather Redis itself plus some API calls to clarify rights of the users and to load the binary into Redis, if not available. |
I didn't promise a date ... I gave you a clear description of each piece of your architecture, with the last piece related to S3 persistence left a little vague because of time constraints, and then various events during the festive season took my time, during which time you formulated your own approach towards the distributed backend you need and narrowed down the requirements to what you have stated. Anyways ... It seems like the following pieces are needed for your requirements: An api server with the following endpoints
a crdt-wss with the following features
Will it be enough if I only provide the crdt-wss with an api interface that you can fulfill with your own api-server? |
Probably another api endpoint willl he required to fetch the document from s3 |
From my understanding, this is a promise to fulfill it by the end of 01/11 Anyway, the idea is to separate the authorization and persistence from your backend.
That way everyone can provide user-based access to its service by using its own API your backend is calling and also the persistence is decoupled from it. That suits a microservice architecture quite well. |
Yeah ... that message you shared was after you started acting a bit aggressively after me postponing my slot a few times due to festivities ... I'd call it me making a promise under duress, which I am not good at handling :) Coming to the ask, I will provide the crdt-wss part only, and cover the interaction with backend via unit tests. The work will be in a branch of this repo. Will point you to the file containing the api client which you'll need to satisfy with an api server. The festivities ended yesterday, and I have work to do at my job. Will give 2 hours or so tonight to this, and push the progress to a branch, and drop a comment here. |
I do not see anything aggressive from my side after you postponed things further and further (four or five times, if I recall correctly) and I even narrowed down the efforts to make your life simpler (and your work more generally applicable), and then you promised something. Anyway, let us focus on the code. What we need is to clarify a few more.
|
Have made some progress ... https://github.com/kapv89/yjs-scalable-ws-backend/tree/redis_permissions_s3 ... the wss now communicates with an api-endpoint, which in tests, uses an array for persistence. Redis is still being used but PG is not. These changes are localized to this branch, will probably move them to a separate repo later. As you can see, the work is quite extensive and will take some time |
Hi @kapv89 many thanks for sharing your input with me. It looks very good, but I think we can make it a bit easier: At the current stage we need one API interaction:
The persistence could be done to the corresponding doc_id Redis queue (like here: https://github.com/kapv89/yjs-scalable-ws-backend/blob/main/src/setupWSConnection.ts#L170) without any API call, only with the key value trick as outlined above.
What do you think? |
https://github.com/kapv89/yjs-scalable-ws-backend/tree/redis_permissions_s3 ... this branch has been updated with Regarding loading doc from s3 to redis if it isn't already present there, that is out the scope of the crdt-wss. That should be handled by the api-server, in it's api endpoint The logic to save an update to redis will look something like this: export const getDocRedisKey = (docId: string) => `doc:${docId}:all_updates`
export const docRedisLifetimeInS = 3600;
export const saveDocUpdateInRedis = async (docId: string, update: Uint8Array) => {
const key = getDocRedisKey(docId);
const len = await redis.llen(key);
if (len > 100) {
const res = await redis.pipeline()
.lrangeBuffer(key, 0, 100)
.rpushBuffer(key, Buffer.from(update))
.ltrim(key, 0, 100)
.expire(key, docRedisLifetimeInS)
.exec()
;
const [, leftUpdates]: [Error | null, Buffer[]] = res[0];
const doc = new Y.Doc();
for (const u of leftUpdates) {
Y.applyUpdate(doc, u);
}
const combinedUpdate = Y.encodeStateAsUpdate(doc);
await redis.lpushBuffer(key, Buffer.from(combinedUpdate));
} else {
await redis.pipeline()
.rpushBuffer(key, Buffer.from(update))
.expire(key, docRedisLifetimeInS)
.exec()
;
}
} |
Ah, that looks truly great. 😀 I have a few questions, could you be a bit more specific on what the three API endpoints should be receiving in terms of format and in terms of reply. So, this would mean
For the two functions
This would at least remove the load from the API endpoints. Nevertheless, this brings us a huge leap forward: Now one could build a persistence connector with API endpoints for any storage type and connect it to this websocket backend. 😀 For enforcing a maximum size on the documents, what would have to be done here? |
@kapv89 It could also be, that a user does not come along with a token and might end up with read or no rights. Is that covered here as well? |
No. Would you want such a user to establish a connection to your websocket server? |
@kapv89 In some cases, yes. So it should be two-fold: For users providing a token and users without it (so which are not logged in, but still can see the document) |
There are 2 levels here ... 1. Non logged in users receiving realtime updates as users with write permission change the document and 2. Non logged in users receive the latest persisted snapshot of the document but don't receive realtime updates |
Ah, these are already implemented. I imagine it as follows for non-loged-in users: They make a request similar to logged-in users. Depending on the reply (might be different between documents), they
Could you also get back to me regarding my questions on the API endpoints and document size from above? |
Will probably get back on those sometime early coming week |
Hi @kapv89 could you get back to me on that matter, please? 😀 |
@kapv89 Do you have some input regarding my questions? |
Hey @junoriosity ... Caught up with a lot of work in job. Will try to take out 30 minutes this weekend to answer your questions |
Hey @kapv89 that would be very helpful. Also, could you please tell me, whether users with read resp. write rights are by any means differently treated? As far as I can see, there is no difference here. |
what do you mean by "read resp. write"? |
They are treated differently, otherwise how will the tests work … check for usage of https://github.com/kapv89/yjs-scalable-ws-backend/blob/redis_permissions_s3/src/setupWSConnection.ts |
@kapv89 That looks terrific, many thanks for it. Would still be great, if you could give me some input on my other questions later. 😀 |
@kapv89 Hi Kapil, could you perhaps get back to me questions raised above. That would be very helpful. |
@kapv89 Could you please get back to the questions? |
Hi @kapv89
many thanks for your project. I would love to try it out myself. Before can you tell me whether the awareness issue you raised here, is still a problem?
If not, could you tell me how I could deploy it in k8s. For instance:
It would be great if you could help me with these questions. 😀
The text was updated successfully, but these errors were encountered: