From 8ad92a08cd74bc75ec89435a0d55993d75f0859e Mon Sep 17 00:00:00 2001 From: assafkm <65414089+assafkm@users.noreply.github.com> Date: Tue, 6 Apr 2021 19:33:31 +0300 Subject: [PATCH] Adding Http transport (#13) Co-authored-by: Mohaban, Assaf --- package.json | 9 ++-- src/index.ts | 1 + src/lib/rxrpc-http-attributes.ts | 6 +++ src/lib/rxrpc-http-transport.spec.ts | 81 ++++++++++++++++++++++++++++ src/lib/rxrpc-http-transport.ts | 72 +++++++++++++++++++++++++ yarn.lock | 12 +++++ 6 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 src/lib/rxrpc-http-attributes.ts create mode 100644 src/lib/rxrpc-http-transport.spec.ts create mode 100644 src/lib/rxrpc-http-transport.ts diff --git a/package.json b/package.json index ad76d40..49410e9 100644 --- a/package.json +++ b/package.json @@ -34,9 +34,9 @@ "devDependencies": { "@types/jest": "^24.0.15", "jest": "^24.8.0", + "rxjs": "^6.2.2", "ts-jest": "^24.0.2", - "typescript": "^3.4.5", - "rxjs": "^6.2.2" + "typescript": "^3.4.5" }, "peerDependencies": { "rxjs": "^6.2.2" @@ -47,5 +47,8 @@ "src/lib/**/*", "*.json", "LICENSE" - ] + ], + "dependencies": { + "axios": "^0.21.1" + } } diff --git a/src/index.ts b/src/index.ts index a26e353..f6e1b00 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,3 +4,4 @@ export * from './lib/rxrpc-transport'; export * from './lib/rxrpc-websocket-transport'; export * from './lib/rxrpc-reconnectable-transport'; export * from './lib/rxrpc-types'; +export * from './lib/rxrpc-http-transport'; diff --git a/src/lib/rxrpc-http-attributes.ts b/src/lib/rxrpc-http-attributes.ts new file mode 100644 index 0000000..13d9018 --- /dev/null +++ b/src/lib/rxrpc-http-attributes.ts @@ -0,0 +1,6 @@ + +export class HttpAttributes { + static ClientIdAttribute = "X-RPC-CLIENT-ID"; + static ClientPollingPeriod = 500; + static ClientPollingRetryCount = 10; +} diff --git a/src/lib/rxrpc-http-transport.spec.ts b/src/lib/rxrpc-http-transport.spec.ts new file mode 100644 index 0000000..06231f5 --- /dev/null +++ b/src/lib/rxrpc-http-transport.spec.ts @@ -0,0 +1,81 @@ +import {RxRpcHttpConnection, RxRpcHttpTransport} from './rxrpc-http-transport'; +import axios from 'axios'; + +jest.mock('axios'); +const mockedAxios = axios as jest.Mocked + +function delay(ms: number){ + return new Promise( resolve => setTimeout(resolve, ms) ); +} + +describe('RxRpc Http Transport test suite', function () { + let transport: RxRpcHttpTransport; + let clientId: string; + let incomingMessages: any[]; + let resp: {} + + beforeEach(() => { + transport = new RxRpcHttpTransport("http://funnyName/"); + clientId = "12345678"; + incomingMessages = []; + resp = { + headers: {"x-rpc-client-id": clientId} + }; + }); + + it('Connect', async () => { + mockedAxios.post.mockImplementation(() => Promise.resolve(resp)); + + transport.connect().subscribe(connection => incomingMessages.push(connection['clientId'])) + await delay(1000); + expect(incomingMessages[0]).toEqual(clientId) + }) + + it('Poll with data as a string', async () => { + const data1 = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #0\",\"error\":null}}" + const data2 = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #1\",\"error\":null}}" + resp['data'] = `${data1}\n${data2}` + mockedAxios.post.mockImplementation(() => Promise.resolve(resp)); + + transport.connect().subscribe(connection => connection.poll().subscribe(msg => { + incomingMessages.push(msg); + })) + await delay(1000); + expect(incomingMessages.length).toEqual(2); + expect(incomingMessages[0]).toEqual(JSON.parse(data1)); + expect(incomingMessages[1]).toEqual(JSON.parse(data2)); + }) + + it('Poll with data as an object', async () => { + const data = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #0\",\"error\":null}}" + resp['data'] = JSON.parse(data) + mockedAxios.post.mockImplementation(() => Promise.resolve(resp)); + + transport.connect().subscribe(connection => connection.poll().subscribe(msg => { + incomingMessages.push(msg); + })) + await delay(1000); + expect(incomingMessages.length).toEqual(1); + expect(incomingMessages[0]).toEqual(JSON.parse(data)); + }) + + it('Close', async () => { + mockedAxios.post.mockImplementation(() => Promise.resolve(resp)); + + transport.connect().subscribe(connection => incomingMessages.push(connection)) + await delay(1000); + const connection = incomingMessages[0] as RxRpcHttpConnection; + connection.close() + expect(connection['pollingSubscription'].closed).toEqual(true) + }) + + it('Error', async () => { + mockedAxios.post.mockImplementation(() => Promise.resolve(resp)); + + transport.connect().subscribe(connection => incomingMessages.push(connection)) + await delay(1000); + const connection = incomingMessages[0] as RxRpcHttpConnection; + connection.error(null) + expect(connection['pollingSubscription'].closed).toEqual(true) + }) +}) diff --git a/src/lib/rxrpc-http-transport.ts b/src/lib/rxrpc-http-transport.ts new file mode 100644 index 0000000..388ef21 --- /dev/null +++ b/src/lib/rxrpc-http-transport.ts @@ -0,0 +1,72 @@ +import {RxRpcConnection, RxRpcTransport} from './rxrpc-transport'; +import {from, interval, observable, Observable, of, Subject, Subscription} from 'rxjs'; +import {mergeMap, map, retry, tap, filter} from "rxjs/operators"; +import {HttpAttributes} from "./rxrpc-http-attributes"; +import axios, {AxiosResponse} from 'axios'; +import {fromPromise} from "rxjs/internal-compatibility"; +import {fromArray} from "rxjs/internal/observable/fromArray"; + + +export class RxRpcHttpConnection implements RxRpcConnection { + readonly messages: Observable; + private pollingSubscription: Subscription; + private readonly incoming = new Subject(); + + constructor(private readonly uri: string, private readonly clientId: string) { + this.messages = this.incoming; + this.pollingSubscription = interval(HttpAttributes.ClientPollingPeriod) + .pipe( + mergeMap(() => this.poll()), + retry(HttpAttributes.ClientPollingRetryCount)) + .subscribe( + () => {}, + err => this.incoming.error(err), + () => this.incoming.complete()); + } + + close() { + this.pollingSubscription.unsubscribe(); + } + + error(error: any) { + this.close(); + } + + send(msg: any) { + this.post('message', msg).subscribe(); + } + + poll(): Observable { + return this.post('polling') + .pipe( + map(resp => resp.data), + filter(data => data !== ""), + mergeMap(data => { + if(typeof data === 'string') { + return fromArray(data.split("\n").map(s => JSON.parse(s))); + } + return of(data); + }), + tap(obj => this.incoming.next(obj))); + } + + post(path: string, msg?: any): Observable> { + const headers = {}; + headers[HttpAttributes.ClientIdAttribute] = this.clientId; + return fromPromise(axios.post(`${this.uri}/${path}`, msg, {headers: headers})) + } +} + +export class RxRpcHttpTransport implements RxRpcTransport { + + constructor(private readonly uri: string) {} + + connect(): Observable { + return fromPromise(axios.post(`${this.uri}/connect`)) + .pipe( + map( res => { + const clientId = res.headers[HttpAttributes.ClientIdAttribute.toLowerCase()]; + return new RxRpcHttpConnection(this.uri, clientId); + })); + } +} diff --git a/yarn.lock b/yarn.lock index 220c7c9..dad4f2c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -523,6 +523,13 @@ aws4@^1.8.0: resolved "https://registry.yarnpkg.com/aws4/-/aws4-1.8.0.tgz#f0e003d9ca9e7f59c7a508945d7b2ef9a04a542f" integrity sha512-ReZxvNHIOv88FlT7rxcXIIC0fPt4KZqZbOlivyWtXLt8ESx84zd3kMC6iK5jVeS2qt+g7ftS7ye4fi06X5rtRQ== +axios@^0.21.1: + version "0.21.1" + resolved "https://registry.yarnpkg.com/axios/-/axios-0.21.1.tgz#22563481962f4d6bde9a76d516ef0e5d3c09b2b8" + integrity sha512-dKQiRHxGD9PPRIUNIWvZhPTPpl1rf/OxTYKsqKUDjBwYylTvV7SjSHJb9ratfyzM6wCdLCOYLzs73qpg5c4iGA== + dependencies: + follow-redirects "^1.10.0" + babel-jest@^24.8.0: version "24.8.0" resolved "https://registry.yarnpkg.com/babel-jest/-/babel-jest-24.8.0.tgz#5c15ff2b28e20b0f45df43fe6b7f2aae93dba589" @@ -1141,6 +1148,11 @@ find-up@^3.0.0: dependencies: locate-path "^3.0.0" +follow-redirects@^1.10.0: + version "1.13.3" + resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.13.3.tgz#e5598ad50174c1bc4e872301e82ac2cd97f90267" + integrity sha512-DUgl6+HDzB0iEptNQEXLx/KhTmDb8tZUHSeLqpnjpknR70H0nC2t9N73BK6fN4hOvJ84pKlIQVQ4k5FFlBedKA== + for-in@^1.0.2: version "1.0.2" resolved "https://registry.yarnpkg.com/for-in/-/for-in-1.0.2.tgz#81068d295a8142ec0ac726c6e2200c30fb6d5e80"