Skip to content

Commit

Permalink
feat: updating adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
Souvikns committed Sep 22, 2023
1 parent c6d4136 commit c7d50d0
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 36 deletions.
8 changes: 4 additions & 4 deletions src/adapters/http/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ class HttpClientAdapter extends Adapter {
const authConfig = await clientAuthConfig(this.serverName)
const serverUrl = this.serverUrlExpanded
for (const channelName of this.channelNames) {
const channelInfo = this.parsedAsyncAPI.channel(channelName)
const httpChannelBinding = channelInfo.binding('http')
const channelServers = channelInfo.servers()
const channelInfo = this.parsedAsyncAPI.channels().get(channelName)
const httpChannelBinding = channelInfo.bindings().get('http')
const channelServers = channelInfo.servers().all().map(e => e.id())
const isChannelServers =
!channelServers.length || channelServers.includes(message.serverName)
if (httpChannelBinding && isChannelServers) {
const method = httpChannelBinding.method
const method = httpChannelBinding.json().method
let url = `${serverUrl}/${channelName}`
const gleeAuth = new GleeAuth(
this.AsyncAPIServer,
Expand Down
6 changes: 3 additions & 3 deletions src/adapters/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class HttpAdapter extends Adapter {
this.httpResponses.set(this.serverName, res)
let { pathname } = new URL(req.url, serverUrl)
pathname = pathname.startsWith('/') ? pathname.substring(1) : pathname
if (!this.parsedAsyncAPI.channel(pathname)) {
if (!this.parsedAsyncAPI.channels().get(pathname)) {
res.end('HTTP/1.1 404 Not Found1\r\n\r\n')
const err = new Error(
`A client attempted to connect to channel ${pathname} but this channel is not defined in your AsyncAPI file. here`
Expand All @@ -105,8 +105,8 @@ class HttpAdapter extends Adapter {
const searchParams = { query }
const payload = body
const httpChannelBinding = this.parsedAsyncAPI
.channel(pathname)
.binding('http')
.channels().get(pathname)
.bindings().get('http')
if (httpChannelBinding) {
this._checkHttpBinding(
req,
Expand Down
6 changes: 3 additions & 3 deletions src/adapters/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class KafkaAdapter extends Adapter {
'kafka'
)
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions?.auth)
const securityRequirements = (this.AsyncAPIServer.security() || []).map(
const securityRequirements = this.AsyncAPIServer.security().map(
(sec) => {
const secName = Object.keys(sec.json())[0]
return this.parsedAsyncAPI.components().securityScheme(secName)
const secName = Object.keys(sec.values())[0]
return this.parsedAsyncAPI.components().securitySchemes().get(secName)
}
)
const userAndPasswordSecurityReq = securityRequirements.find(
Expand Down
36 changes: 19 additions & 17 deletions src/adapters/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import mqtt, { IPublishPacket, MqttClient, QoS } from 'mqtt'
import Adapter from '../../lib/adapter.js'
import GleeMessage from '../../lib/message.js'
import { MqttAuthConfig, MqttAdapterConfig } from '../../lib/index.js'
import { SecurityScheme } from '@asyncapi/parser'
import { SecuritySchemesInterface as SecurityScheme } from '@asyncapi/parser'
import { logLineWithIcon } from '../../lib/logger.js'

interface IMQTTHeaders {
Expand Down Expand Up @@ -42,22 +42,24 @@ class MqttAdapter extends Adapter {
}

private getSecurityReqs() {
const securityRequirements = (this.AsyncAPIServer.security() || []).map(
(sec) => {
const secName = Object.keys(sec.json())[0]
return this.parsedAsyncAPI.components().securityScheme(secName)

const parsedSecurityScehemes = this.parsedAsyncAPI.components().securitySchemes().all()

let userAndPasswordSecurityReq
let X509SecurityReq

for (const security of parsedSecurityScehemes) {
if (security.type() === 'userPassword') {
userAndPasswordSecurityReq = security
}
)
const userAndPasswordSecurityReq = securityRequirements.find(
(sec) => sec.type() === 'userPassword'
)
const X509SecurityReq = securityRequirements.find(
(sec) => sec.type() === 'X509'
)
if (security.type() === 'x509') {
X509SecurityReq = security
}
}

return {
userAndPasswordSecurityReq,
X509SecurityReq,
X509SecurityReq
}
}

Expand Down Expand Up @@ -127,7 +129,7 @@ class MqttAdapter extends Adapter {

private subscribe(channels: string[]) {
channels.forEach((channel) => {
const operation = this.parsedAsyncAPI.channel(channel).publish()
const operation = this.parsedAsyncAPI.channels().get(channel).json().subscribe()
const binding = operation.binding('mqtt')
this.client.subscribe(channel, {
qos: binding?.qos ? binding.qos : 0,
Expand All @@ -154,8 +156,8 @@ class MqttAdapter extends Adapter {
)
const auth: MqttAuthConfig = await this.getAuthConfig(mqttOptions?.auth)
const subscribedChannels = this.getSubscribedChannels()
const mqttServerBinding = this.AsyncAPIServer.binding('mqtt')
const mqtt5ServerBinding = this.AsyncAPIServer.binding('mqtt5')
const mqttServerBinding = this.AsyncAPIServer.bindings().get('mqtt')
const mqtt5ServerBinding = this.AsyncAPIServer.bindings().get('mqtt5')

const { userAndPasswordSecurityReq, X509SecurityReq } =
this.getSecurityReqs()
Expand Down Expand Up @@ -205,7 +207,7 @@ class MqttAdapter extends Adapter {

_send(message: GleeMessage): Promise<void> {
return new Promise((resolve, reject) => {
const operation = this.parsedAsyncAPI.channel(message.channel).subscribe()
const operation = this.parsedAsyncAPI.channels().get(message.channel).json().subscribe()
const binding = operation ? operation.binding('mqtt') : undefined
this.client.publish(
message.channel,
Expand Down
11 changes: 5 additions & 6 deletions src/adapters/ws/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class WsClientAdapter extends Adapter {
this.clients.push({
channel,
client: new ws(url, { headers }),
binding: this.parsedAsyncAPI.channel(channel).binding('ws'),
binding: this.parsedAsyncAPI.channels().get(channel).bindings().get('ws'),
})
}

Expand Down Expand Up @@ -77,13 +77,12 @@ class WsClientAdapter extends Adapter {
private getWsChannels() {
const channels = []
for (const channel of this.channelNames) {
if (this.parsedAsyncAPI.channel(channel).hasBinding('ws')) {
if (this.parsedAsyncAPI.channel(channel).hasServers()) {
if (this.parsedAsyncAPI.channels().get(channel).bindings().get('ws')) {
if (this.parsedAsyncAPI.channels().get(channel).servers().length < 0) { // NOSONAR
if (
this.parsedAsyncAPI
.channel(channel)
.servers()
.includes(this.serverName)
.channels().get(channel)
.servers().get(this.serverName)
) {
channels.push(channel)
}
Expand Down
6 changes: 3 additions & 3 deletions src/adapters/ws/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class WebSocketsAdapter extends Adapter {
pathname = pathname.substring(1)
}

if (!this.parsedAsyncAPI.channel(pathname)) {
if (!this.parsedAsyncAPI.channels().get(pathname)) {
this.emitPathnameError(socket, pathname)
}

Expand Down Expand Up @@ -232,8 +232,8 @@ class WebSocketsAdapter extends Adapter {
)

const wsChannelBinding = this.parsedAsyncAPI
.channel(pathname)
.binding('ws')
.channels().get(pathname)
.bindings().get('ws')

if (wsChannelBinding) {
const correctBindings = await this.checkBindings(socket, {
Expand Down

0 comments on commit c7d50d0

Please sign in to comment.