Skip to content

Commit

Permalink
Adding Http transport (#13)
Browse files Browse the repository at this point in the history
Co-authored-by: Mohaban, Assaf <[email protected]>
  • Loading branch information
assafkm and Mohaban, Assaf authored Apr 6, 2021
1 parent 989edb6 commit 8ad92a0
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 3 deletions.
9 changes: 6 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
"devDependencies": {
"@types/jest": "^24.0.15",
"jest": "^24.8.0",
"rxjs": "^6.2.2",
"ts-jest": "^24.0.2",
"typescript": "^3.4.5",
"rxjs": "^6.2.2"
"typescript": "^3.4.5"
},
"peerDependencies": {
"rxjs": "^6.2.2"
Expand All @@ -47,5 +47,8 @@
"src/lib/**/*",
"*.json",
"LICENSE"
]
],
"dependencies": {
"axios": "^0.21.1"
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export * from './lib/rxrpc-transport';
export * from './lib/rxrpc-websocket-transport';
export * from './lib/rxrpc-reconnectable-transport';
export * from './lib/rxrpc-types';
export * from './lib/rxrpc-http-transport';
6 changes: 6 additions & 0 deletions src/lib/rxrpc-http-attributes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

export class HttpAttributes {
static ClientIdAttribute = "X-RPC-CLIENT-ID";
static ClientPollingPeriod = 500;
static ClientPollingRetryCount = 10;
}
81 changes: 81 additions & 0 deletions src/lib/rxrpc-http-transport.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import {RxRpcHttpConnection, RxRpcHttpTransport} from './rxrpc-http-transport';
import axios from 'axios';

jest.mock('axios');
const mockedAxios = axios as jest.Mocked<typeof axios>

function delay(ms: number){
return new Promise( resolve => setTimeout(resolve, ms) );
}

describe('RxRpc Http Transport test suite', function () {
let transport: RxRpcHttpTransport;
let clientId: string;
let incomingMessages: any[];
let resp: {}

beforeEach(() => {
transport = new RxRpcHttpTransport("http://funnyName/");
clientId = "12345678";
incomingMessages = [];
resp = {
headers: {"x-rpc-client-id": clientId}
};
});

it('Connect', async () => {
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));

transport.connect().subscribe(connection => incomingMessages.push(connection['clientId']))
await delay(1000);
expect(incomingMessages[0]).toEqual(clientId)
})

it('Poll with data as a string', async () => {
const data1 = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #0\",\"error\":null}}"
const data2 = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #1\",\"error\":null}}"
resp['data'] = `${data1}\n${data2}`
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));

transport.connect().subscribe(connection => connection.poll().subscribe(msg => {
incomingMessages.push(msg);
}))
await delay(1000);
expect(incomingMessages.length).toEqual(2);
expect(incomingMessages[0]).toEqual(JSON.parse(data1));
expect(incomingMessages[1]).toEqual(JSON.parse(data2));
})

it('Poll with data as an object', async () => {
const data = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #0\",\"error\":null}}"
resp['data'] = JSON.parse(data)
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));

transport.connect().subscribe(connection => connection.poll().subscribe(msg => {
incomingMessages.push(msg);
}))
await delay(1000);
expect(incomingMessages.length).toEqual(1);
expect(incomingMessages[0]).toEqual(JSON.parse(data));
})

it('Close', async () => {
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));

transport.connect().subscribe(connection => incomingMessages.push(connection))
await delay(1000);
const connection = incomingMessages[0] as RxRpcHttpConnection;
connection.close()
expect(connection['pollingSubscription'].closed).toEqual(true)
})

it('Error', async () => {
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));

transport.connect().subscribe(connection => incomingMessages.push(connection))
await delay(1000);
const connection = incomingMessages[0] as RxRpcHttpConnection;
connection.error(null)
expect(connection['pollingSubscription'].closed).toEqual(true)
})
})
72 changes: 72 additions & 0 deletions src/lib/rxrpc-http-transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {RxRpcConnection, RxRpcTransport} from './rxrpc-transport';
import {from, interval, observable, Observable, of, Subject, Subscription} from 'rxjs';
import {mergeMap, map, retry, tap, filter} from "rxjs/operators";
import {HttpAttributes} from "./rxrpc-http-attributes";
import axios, {AxiosResponse} from 'axios';
import {fromPromise} from "rxjs/internal-compatibility";
import {fromArray} from "rxjs/internal/observable/fromArray";


export class RxRpcHttpConnection implements RxRpcConnection {
readonly messages: Observable<any>;
private pollingSubscription: Subscription;
private readonly incoming = new Subject<any>();

constructor(private readonly uri: string, private readonly clientId: string) {
this.messages = this.incoming;
this.pollingSubscription = interval(HttpAttributes.ClientPollingPeriod)
.pipe(
mergeMap(() => this.poll()),
retry(HttpAttributes.ClientPollingRetryCount))
.subscribe(
() => {},
err => this.incoming.error(err),
() => this.incoming.complete());
}

close() {
this.pollingSubscription.unsubscribe();
}

error(error: any) {
this.close();
}

send(msg: any) {
this.post('message', msg).subscribe();
}

poll(): Observable<any> {
return this.post('polling')
.pipe(
map(resp => resp.data),
filter(data => data !== ""),
mergeMap(data => {
if(typeof data === 'string') {
return fromArray(data.split("\n").map(s => JSON.parse(s)));
}
return of(data);
}),
tap(obj => this.incoming.next(obj)));
}

post(path: string, msg?: any): Observable<AxiosResponse<string>> {
const headers = {};
headers[HttpAttributes.ClientIdAttribute] = this.clientId;
return fromPromise(axios.post<string>(`${this.uri}/${path}`, msg, {headers: headers}))
}
}

export class RxRpcHttpTransport implements RxRpcTransport {

constructor(private readonly uri: string) {}

connect(): Observable<RxRpcHttpConnection> {
return fromPromise(axios.post<string>(`${this.uri}/connect`))
.pipe(
map( res => {
const clientId = res.headers[HttpAttributes.ClientIdAttribute.toLowerCase()];
return new RxRpcHttpConnection(this.uri, clientId);
}));
}
}
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,13 @@ aws4@^1.8.0:
resolved "https://registry.yarnpkg.com/aws4/-/aws4-1.8.0.tgz#f0e003d9ca9e7f59c7a508945d7b2ef9a04a542f"
integrity sha512-ReZxvNHIOv88FlT7rxcXIIC0fPt4KZqZbOlivyWtXLt8ESx84zd3kMC6iK5jVeS2qt+g7ftS7ye4fi06X5rtRQ==

axios@^0.21.1:
version "0.21.1"
resolved "https://registry.yarnpkg.com/axios/-/axios-0.21.1.tgz#22563481962f4d6bde9a76d516ef0e5d3c09b2b8"
integrity sha512-dKQiRHxGD9PPRIUNIWvZhPTPpl1rf/OxTYKsqKUDjBwYylTvV7SjSHJb9ratfyzM6wCdLCOYLzs73qpg5c4iGA==
dependencies:
follow-redirects "^1.10.0"

babel-jest@^24.8.0:
version "24.8.0"
resolved "https://registry.yarnpkg.com/babel-jest/-/babel-jest-24.8.0.tgz#5c15ff2b28e20b0f45df43fe6b7f2aae93dba589"
Expand Down Expand Up @@ -1141,6 +1148,11 @@ find-up@^3.0.0:
dependencies:
locate-path "^3.0.0"

follow-redirects@^1.10.0:
version "1.13.3"
resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.13.3.tgz#e5598ad50174c1bc4e872301e82ac2cd97f90267"
integrity sha512-DUgl6+HDzB0iEptNQEXLx/KhTmDb8tZUHSeLqpnjpknR70H0nC2t9N73BK6fN4hOvJ84pKlIQVQ4k5FFlBedKA==

for-in@^1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/for-in/-/for-in-1.0.2.tgz#81068d295a8142ec0ac726c6e2200c30fb6d5e80"
Expand Down

0 comments on commit 8ad92a0

Please sign in to comment.