Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix channel's payload.data deserialization in RSocketMachine #277

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions packages/rsocket-core/src/RSocketMachine.js
Original file line number Diff line number Diff line change
Expand Up @@ -784,13 +784,14 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
}

_handleRequestChannel(streamId: number, frame: RequestChannelFrame): void {
const payload = this._deserializePayload(frame);
Copy link
Author

@palamccc palamccc Mar 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all _handleXXXX methods, the frame is deserialized first. So in this method also, the payload should be deserialized first, before using it.

const existingSubscription = this._subscriptions.get(streamId);
if (existingSubscription) {
//Likely a duplicate REQUEST_CHANNEL frame, ignore per spec
return;
}

const payloads = new Flowable(subscriber => {
const payloads = new Flowable<Payload<D, M>>(subscriber => {
Copy link
Author

@palamccc palamccc Mar 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this bug (bug is explained in another code comment) was introduced mainly due to the missing type of this Flowable. so I added the type now.

let firstRequest = true;

subscriber.onSubscribe({
Expand Down Expand Up @@ -823,15 +824,13 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
//critically, if n is 0 now, that's okay because we eagerly decremented it
if (firstRequest && n >= 0) {
firstRequest = false;
//release the initial frame we received in frame form due to map operator
subscriber.onNext(frame);
//release the initial payload we received in frame form due to map operator
subscriber.onNext(payload);
}
},
});
}, MAX_REQUEST_N);
const framesToPayloads = new FlowableProcessor(payloads, frame =>
this._deserializePayload(frame),
);
const framesToPayloads = new FlowableProcessor(payloads);
this._receivers.set(streamId, framesToPayloads);
Copy link
Author

@palamccc palamccc Mar 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug:
this._receivers is of type Map<number, ISubject<Payload<D, M>>>.
The subject is of type Payload<D, M>. i.e. deserialized payload. The subscribers of this subject can use the payload directly, its already deserliazed. The bug is, in the subscriber the payload is deserialized again and it's causing deserialization errors.


this._requestHandler.requestChannel(framesToPayloads).subscribe({
Expand Down Expand Up @@ -892,13 +891,14 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
flags |= FLAGS.COMPLETE;
this._subscriptions.delete(streamId);
}
let metadata;
if (payload.metadata !== undefined &&
payload.metadata !== null) {
// eslint-disable-next-line no-bitwise
flags |= FLAGS.METADATA;
metadata = this._serializers.metadata.serialize(payload.metadata);
}
const data = this._serializers.data.serialize(payload.data);
const metadata = this._serializers.metadata.serialize(payload.metadata);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor bug fix: don't try to serialize metadata when metadata is not there. serialize it only when it exists.

this._connection.sendOne({
data,
flags,
Expand Down
63 changes: 63 additions & 0 deletions packages/rsocket-core/src/__tests__/RSocketServer-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,69 @@ describe('RSocketServer', () => {
});

describe('RequestHandler', () => {
it('deserializes and serializes the channel\'s payload.data', () => {
console.error = jest.fn();
const transport = genMockTransportServer();

const makePayload = (data) => ({
type: FRAME_TYPES.PAYLOAD,
streamId: 1,
flags: 32, // next bit - invoke onNext
data,
});

const server = new RSocketServer({
getRequestHandler: () => {
return {
requestChannel: (incomingFlowable) => {
// If the payload.data has 'name', reply with custom response
return incomingFlowable.map(payload => {
if (payload && payload.data && payload.data.name) {
return { data: { say: 'hello ' + payload.data.name } };
} else {
return payload;
}
});
},
};
},
serializers:JsonSerializers,
transport,
});

server.start();
transport.mock.connect();
connection.receive.mock.publisher.onNext({
type: FRAME_TYPES.SETUP,
data: undefined,
dataMimeType: '<dataMimeType>',
flags: 0,
keepAlive: 42,
lifetime: 2017,
metadata: undefined,
metadataMimeType: '<metadataMimeType>',
resumeToken: null,
streamId: 0,
majorVersion: 1,
minorVersion: 0,
});
connection.receive.mock.publisher.onNext({
type: FRAME_TYPES.REQUEST_CHANNEL,
flags: 0,
requestN: 100,
streamId: 1,
// data along with first REQUEST_CHANNEL frame
data: JSON.stringify({ name: 'Alex' }),
});
// data as separate PAYLOAD frame
connection.receive.mock.publisher.onNext(makePayload(JSON.stringify({ name: 'Bob' })));
jest.runOnlyPendingTimers();

expect(connection.sendOne.mock.calls.length).toBe(3);
expect(connection.sendOne.mock.calls[1][0]).toEqual(makePayload(JSON.stringify({ say: 'hello Alex' })));
expect(connection.sendOne.mock.calls[2][0]).toEqual(makePayload(JSON.stringify({ say: 'hello Bob' })));
});

it('sends error if getRequestHandler throws', () => {
const transport = genMockTransportServer();
const server = new RSocketServer({
Expand Down
Loading