From c7d50d0ec77a0f3b3ddb4c0e55c0f9134469aa16 Mon Sep 17 00:00:00 2001 From: souvik Date: Fri, 22 Sep 2023 14:57:22 +0530 Subject: [PATCH] feat: updating adapters --- src/adapters/http/client.ts | 8 ++++---- src/adapters/http/server.ts | 6 +++--- src/adapters/kafka/index.ts | 6 +++--- src/adapters/mqtt/index.ts | 36 +++++++++++++++++++----------------- src/adapters/ws/client.ts | 11 +++++------ src/adapters/ws/server.ts | 6 +++--- 6 files changed, 37 insertions(+), 36 deletions(-) diff --git a/src/adapters/http/client.ts b/src/adapters/http/client.ts index efb53b4c2..23696796f 100644 --- a/src/adapters/http/client.ts +++ b/src/adapters/http/client.ts @@ -24,13 +24,13 @@ class HttpClientAdapter extends Adapter { const authConfig = await clientAuthConfig(this.serverName) const serverUrl = this.serverUrlExpanded for (const channelName of this.channelNames) { - const channelInfo = this.parsedAsyncAPI.channel(channelName) - const httpChannelBinding = channelInfo.binding('http') - const channelServers = channelInfo.servers() + const channelInfo = this.parsedAsyncAPI.channels().get(channelName) + const httpChannelBinding = channelInfo.bindings().get('http') + const channelServers = channelInfo.servers().all().map(e => e.id()) const isChannelServers = !channelServers.length || channelServers.includes(message.serverName) if (httpChannelBinding && isChannelServers) { - const method = httpChannelBinding.method + const method = httpChannelBinding.json().method let url = `${serverUrl}/${channelName}` const gleeAuth = new GleeAuth( this.AsyncAPIServer, diff --git a/src/adapters/http/server.ts b/src/adapters/http/server.ts index be9c27975..f6e4a7184 100644 --- a/src/adapters/http/server.ts +++ b/src/adapters/http/server.ts @@ -93,7 +93,7 @@ class HttpAdapter extends Adapter { this.httpResponses.set(this.serverName, res) let { pathname } = new URL(req.url, serverUrl) pathname = pathname.startsWith('/') ? pathname.substring(1) : pathname - if (!this.parsedAsyncAPI.channel(pathname)) { + if (!this.parsedAsyncAPI.channels().get(pathname)) { res.end('HTTP/1.1 404 Not Found1\r\n\r\n') const err = new Error( `A client attempted to connect to channel ${pathname} but this channel is not defined in your AsyncAPI file. here` @@ -105,8 +105,8 @@ class HttpAdapter extends Adapter { const searchParams = { query } const payload = body const httpChannelBinding = this.parsedAsyncAPI - .channel(pathname) - .binding('http') + .channels().get(pathname) + .bindings().get('http') if (httpChannelBinding) { this._checkHttpBinding( req, diff --git a/src/adapters/kafka/index.ts b/src/adapters/kafka/index.ts index 27fd8f1d4..836e76925 100644 --- a/src/adapters/kafka/index.ts +++ b/src/adapters/kafka/index.ts @@ -15,10 +15,10 @@ class KafkaAdapter extends Adapter { 'kafka' ) const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth) - const securityRequirements = (this.AsyncAPIServer.security() || []).map( + const securityRequirements = this.AsyncAPIServer.security().map( (sec) => { - const secName = Object.keys(sec.json())[0] - return this.parsedAsyncAPI.components().securityScheme(secName) + const secName = Object.keys(sec.values())[0] + return this.parsedAsyncAPI.components().securitySchemes().get(secName) } ) const userAndPasswordSecurityReq = securityRequirements.find( diff --git a/src/adapters/mqtt/index.ts b/src/adapters/mqtt/index.ts index 379288646..fe1ec54d6 100644 --- a/src/adapters/mqtt/index.ts +++ b/src/adapters/mqtt/index.ts @@ -2,7 +2,7 @@ import mqtt, { IPublishPacket, MqttClient, QoS } from 'mqtt' import Adapter from '../../lib/adapter.js' import GleeMessage from '../../lib/message.js' import { MqttAuthConfig, MqttAdapterConfig } from '../../lib/index.js' -import { SecurityScheme } from '@asyncapi/parser' +import { SecuritySchemesInterface as SecurityScheme } from '@asyncapi/parser' import { logLineWithIcon } from '../../lib/logger.js' interface IMQTTHeaders { @@ -42,22 +42,24 @@ class MqttAdapter extends Adapter { } private getSecurityReqs() { - const securityRequirements = (this.AsyncAPIServer.security() || []).map( - (sec) => { - const secName = Object.keys(sec.json())[0] - return this.parsedAsyncAPI.components().securityScheme(secName) + + const parsedSecurityScehemes = this.parsedAsyncAPI.components().securitySchemes().all() + + let userAndPasswordSecurityReq + let X509SecurityReq + + for (const security of parsedSecurityScehemes) { + if (security.type() === 'userPassword') { + userAndPasswordSecurityReq = security } - ) - const userAndPasswordSecurityReq = securityRequirements.find( - (sec) => sec.type() === 'userPassword' - ) - const X509SecurityReq = securityRequirements.find( - (sec) => sec.type() === 'X509' - ) + if (security.type() === 'x509') { + X509SecurityReq = security + } + } return { userAndPasswordSecurityReq, - X509SecurityReq, + X509SecurityReq } } @@ -127,7 +129,7 @@ class MqttAdapter extends Adapter { private subscribe(channels: string[]) { channels.forEach((channel) => { - const operation = this.parsedAsyncAPI.channel(channel).publish() + const operation = this.parsedAsyncAPI.channels().get(channel).json().subscribe() const binding = operation.binding('mqtt') this.client.subscribe(channel, { qos: binding?.qos ? binding.qos : 0, @@ -154,8 +156,8 @@ class MqttAdapter extends Adapter { ) const auth: MqttAuthConfig = await this.getAuthConfig(mqttOptions?.auth) const subscribedChannels = this.getSubscribedChannels() - const mqttServerBinding = this.AsyncAPIServer.binding('mqtt') - const mqtt5ServerBinding = this.AsyncAPIServer.binding('mqtt5') + const mqttServerBinding = this.AsyncAPIServer.bindings().get('mqtt') + const mqtt5ServerBinding = this.AsyncAPIServer.bindings().get('mqtt5') const { userAndPasswordSecurityReq, X509SecurityReq } = this.getSecurityReqs() @@ -205,7 +207,7 @@ class MqttAdapter extends Adapter { _send(message: GleeMessage): Promise { return new Promise((resolve, reject) => { - const operation = this.parsedAsyncAPI.channel(message.channel).subscribe() + const operation = this.parsedAsyncAPI.channels().get(message.channel).json().subscribe() const binding = operation ? operation.binding('mqtt') : undefined this.client.publish( message.channel, diff --git a/src/adapters/ws/client.ts b/src/adapters/ws/client.ts index a6949a5fd..ef5553f16 100644 --- a/src/adapters/ws/client.ts +++ b/src/adapters/ws/client.ts @@ -47,7 +47,7 @@ class WsClientAdapter extends Adapter { this.clients.push({ channel, client: new ws(url, { headers }), - binding: this.parsedAsyncAPI.channel(channel).binding('ws'), + binding: this.parsedAsyncAPI.channels().get(channel).bindings().get('ws'), }) } @@ -77,13 +77,12 @@ class WsClientAdapter extends Adapter { private getWsChannels() { const channels = [] for (const channel of this.channelNames) { - if (this.parsedAsyncAPI.channel(channel).hasBinding('ws')) { - if (this.parsedAsyncAPI.channel(channel).hasServers()) { + if (this.parsedAsyncAPI.channels().get(channel).bindings().get('ws')) { + if (this.parsedAsyncAPI.channels().get(channel).servers().length < 0) { // NOSONAR if ( this.parsedAsyncAPI - .channel(channel) - .servers() - .includes(this.serverName) + .channels().get(channel) + .servers().get(this.serverName) ) { channels.push(channel) } diff --git a/src/adapters/ws/server.ts b/src/adapters/ws/server.ts index ea46db276..47594b9ef 100644 --- a/src/adapters/ws/server.ts +++ b/src/adapters/ws/server.ts @@ -99,7 +99,7 @@ class WebSocketsAdapter extends Adapter { pathname = pathname.substring(1) } - if (!this.parsedAsyncAPI.channel(pathname)) { + if (!this.parsedAsyncAPI.channels().get(pathname)) { this.emitPathnameError(socket, pathname) } @@ -232,8 +232,8 @@ class WebSocketsAdapter extends Adapter { ) const wsChannelBinding = this.parsedAsyncAPI - .channel(pathname) - .binding('ws') + .channels().get(pathname) + .bindings().get('ws') if (wsChannelBinding) { const correctBindings = await this.checkBindings(socket, {