From f13ddeb837430fab347d312ce595d5ce4ad317fc Mon Sep 17 00:00:00 2001 From: Palani C Date: Sun, 31 Mar 2024 23:50:02 +1100 Subject: [PATCH] fix channel's payload.data deserialization --- packages/rsocket-core/src/RSocketMachine.js | 14 ++--- .../src/__tests__/RSocketServer-test.js | 63 +++++++++++++++++++ 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/packages/rsocket-core/src/RSocketMachine.js b/packages/rsocket-core/src/RSocketMachine.js index a1dcb52..103ad6e 100644 --- a/packages/rsocket-core/src/RSocketMachine.js +++ b/packages/rsocket-core/src/RSocketMachine.js @@ -784,13 +784,14 @@ class RSocketMachineImpl implements RSocketMachine { } _handleRequestChannel(streamId: number, frame: RequestChannelFrame): void { + const payload = this._deserializePayload(frame); const existingSubscription = this._subscriptions.get(streamId); if (existingSubscription) { //Likely a duplicate REQUEST_CHANNEL frame, ignore per spec return; } - const payloads = new Flowable(subscriber => { + const payloads = new Flowable>(subscriber => { let firstRequest = true; subscriber.onSubscribe({ @@ -823,15 +824,13 @@ class RSocketMachineImpl implements RSocketMachine { //critically, if n is 0 now, that's okay because we eagerly decremented it if (firstRequest && n >= 0) { firstRequest = false; - //release the initial frame we received in frame form due to map operator - subscriber.onNext(frame); + //release the initial payload we received in frame form due to map operator + subscriber.onNext(payload); } }, }); }, MAX_REQUEST_N); - const framesToPayloads = new FlowableProcessor(payloads, frame => - this._deserializePayload(frame), - ); + const framesToPayloads = new FlowableProcessor(payloads); this._receivers.set(streamId, framesToPayloads); this._requestHandler.requestChannel(framesToPayloads).subscribe({ @@ -892,13 +891,14 @@ class RSocketMachineImpl implements RSocketMachine { flags |= FLAGS.COMPLETE; this._subscriptions.delete(streamId); } + let metadata; if (payload.metadata !== undefined && payload.metadata !== null) { // eslint-disable-next-line no-bitwise flags |= FLAGS.METADATA; + metadata = this._serializers.metadata.serialize(payload.metadata); } const data = this._serializers.data.serialize(payload.data); - const metadata = this._serializers.metadata.serialize(payload.metadata); this._connection.sendOne({ data, flags, diff --git a/packages/rsocket-core/src/__tests__/RSocketServer-test.js b/packages/rsocket-core/src/__tests__/RSocketServer-test.js index 4ebaab9..b7b6887 100644 --- a/packages/rsocket-core/src/__tests__/RSocketServer-test.js +++ b/packages/rsocket-core/src/__tests__/RSocketServer-test.js @@ -142,6 +142,69 @@ describe('RSocketServer', () => { }); describe('RequestHandler', () => { + it('deserializes and serializes the channel\'s payload.data', () => { + console.error = jest.fn(); + const transport = genMockTransportServer(); + + const makePayload = (data) => ({ + type: FRAME_TYPES.PAYLOAD, + streamId: 1, + flags: 32, // next bit - invoke onNext + data, + }); + + const server = new RSocketServer({ + getRequestHandler: () => { + return { + requestChannel: (incomingFlowable) => { + // If the payload.data has 'name', reply with custom response + return incomingFlowable.map(payload => { + if (payload?.data?.name) { + return { data: { say: 'hello ' + payload.data.name } }; + } else { + return payload; + } + }); + }, + }; + }, + serializers:JsonSerializers, + transport, + }); + + server.start(); + transport.mock.connect(); + connection.receive.mock.publisher.onNext({ + type: FRAME_TYPES.SETUP, + data: undefined, + dataMimeType: '', + flags: 0, + keepAlive: 42, + lifetime: 2017, + metadata: undefined, + metadataMimeType: '', + resumeToken: null, + streamId: 0, + majorVersion: 1, + minorVersion: 0, + }); + connection.receive.mock.publisher.onNext({ + type: FRAME_TYPES.REQUEST_CHANNEL, + flags: 0, + requestN: 100, + streamId: 1, + // data along with first REQUEST_CHANNEL frame + data: JSON.stringify({ name: 'Alex' }), + }); + // data as separate PAYLOAD frame + connection.receive.mock.publisher.onNext(makePayload(JSON.stringify({ name: 'Bob' }))); + jest.runOnlyPendingTimers(); + + expect(connection.sendOne.mock.calls.length).toBe(3); + expect(connection.sendOne.mock.calls[1][0]).toEqual(makePayload(JSON.stringify({ say: 'hello Alex' }))); + expect(connection.sendOne.mock.calls[2][0]).toEqual(makePayload(JSON.stringify({ say: 'hello Bob' }))); + }); + it('sends error if getRequestHandler throws', () => { const transport = genMockTransportServer(); const server = new RSocketServer({