Skip to content

Commit

Permalink
refactor: use single event emitter for all events (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
TimoGlastra authored May 17, 2021
1 parent 8d2cc2f commit ea814b2
Show file tree
Hide file tree
Showing 27 changed files with 335 additions and 266 deletions.
40 changes: 20 additions & 20 deletions src/__tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ import {
DidCommService,
DidDoc,
} from '../modules/connections'
import { ProofRecord, ProofState, ProofEventType, ProofStateChangedEvent } from '../modules/proofs'
import { ProofEventTypes, ProofRecord, ProofState, ProofStateChangedEvent } from '../modules/proofs'
import { SchemaTemplate, CredentialDefinitionTemplate } from '../modules/ledger'
import {
CredentialRecord,
CredentialOfferTemplate,
CredentialEventType,
CredentialStateChangedEvent,
CredentialState,
CredentialEventTypes,
} from '../modules/credentials'
import { BasicMessage, BasicMessageEventType, BasicMessageReceivedEvent } from '../modules/basic-messages'
import { BasicMessage, BasicMessageEventTypes, BasicMessageReceivedEvent } from '../modules/basic-messages'
import testLogger from './logger'
import { NodeFileSystem } from '../storage/fs/NodeFileSystem'

Expand Down Expand Up @@ -84,18 +84,18 @@ export async function waitForProofRecord(
): Promise<ProofRecord> {
return new Promise((resolve) => {
const listener = (event: ProofStateChangedEvent) => {
const previousStateMatches = previousState === undefined || event.previousState === previousState
const threadIdMatches = threadId === undefined || event.proofRecord.tags.threadId === threadId
const stateMatches = state === undefined || event.proofRecord.state === state
const previousStateMatches = previousState === undefined || event.payload.previousState === previousState
const threadIdMatches = threadId === undefined || event.payload.proofRecord.tags.threadId === threadId
const stateMatches = state === undefined || event.payload.proofRecord.state === state

if (previousStateMatches && threadIdMatches && stateMatches) {
agent.proofs.events.removeListener(ProofEventType.StateChanged, listener)
agent.events.off<ProofStateChangedEvent>(ProofEventTypes.ProofStateChanged, listener)

resolve(event.proofRecord)
resolve(event.payload.proofRecord)
}
}

agent.proofs.events.addListener(ProofEventType.StateChanged, listener)
agent.events.on<ProofStateChangedEvent>(ProofEventTypes.ProofStateChanged, listener)
})
}

Expand All @@ -113,18 +113,18 @@ export async function waitForCredentialRecord(
): Promise<CredentialRecord> {
return new Promise((resolve) => {
const listener = (event: CredentialStateChangedEvent) => {
const previousStateMatches = previousState === undefined || event.previousState === previousState
const threadIdMatches = threadId === undefined || event.credentialRecord.tags.threadId === threadId
const stateMatches = state === undefined || event.credentialRecord.state === state
const previousStateMatches = previousState === undefined || event.payload.previousState === previousState
const threadIdMatches = threadId === undefined || event.payload.credentialRecord.tags.threadId === threadId
const stateMatches = state === undefined || event.payload.credentialRecord.state === state

if (previousStateMatches && threadIdMatches && stateMatches) {
agent.credentials.events.removeListener(CredentialEventType.StateChanged, listener)
agent.events.off<CredentialStateChangedEvent>(CredentialEventTypes.CredentialStateChanged, listener)

resolve(event.credentialRecord)
resolve(event.payload.credentialRecord)
}
}

agent.credentials.events.addListener(CredentialEventType.StateChanged, listener)
agent.events.on<CredentialStateChangedEvent>(CredentialEventTypes.CredentialStateChanged, listener)
})
}

Expand All @@ -134,17 +134,17 @@ export async function waitForBasicMessage(
): Promise<BasicMessage> {
return new Promise((resolve) => {
const listener = (event: BasicMessageReceivedEvent) => {
const verkeyMatches = verkey === undefined || event.verkey === verkey
const contentMatches = content === undefined || event.message.content === content
const verkeyMatches = verkey === undefined || event.payload.verkey === verkey
const contentMatches = content === undefined || event.payload.message.content === content

if (verkeyMatches && contentMatches) {
agent.basicMessages.events.removeListener(BasicMessageEventType.MessageReceived, listener)
agent.events.off<BasicMessageReceivedEvent>(BasicMessageEventTypes.BasicMessageReceived, listener)

resolve(event.message)
resolve(event.payload.message)
}
}

agent.basicMessages.events.addListener(BasicMessageEventType.MessageReceived, listener)
agent.events.on<BasicMessageReceivedEvent>(BasicMessageEventTypes.BasicMessageReceived, listener)
})
}

Expand Down
14 changes: 9 additions & 5 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { EventEmitter } from 'events'
import { container as baseContainer, DependencyContainer } from 'tsyringe'

import { Logger } from '../logger'
Expand All @@ -21,6 +20,8 @@ import { LedgerModule } from '../modules/ledger/LedgerModule'
import { InMemoryMessageRepository } from '../storage/InMemoryMessageRepository'
import { Symbols } from '../symbols'
import { Transport } from './TransportService'
import { EventEmitter } from './EventEmitter'
import { AgentEventTypes, AgentMessageReceivedEvent } from './Events'

export class Agent {
protected agentConfig: AgentConfig
Expand All @@ -45,11 +46,9 @@ export class Agent {

this.agentConfig = new AgentConfig(initialConfig)
this.logger = this.agentConfig.logger
this.eventEmitter = new EventEmitter()

// Bind class based instances
this.container.registerInstance(AgentConfig, this.agentConfig)
this.container.registerInstance(EventEmitter, this.eventEmitter)

// Based on interfaces. Need to register which class to use
this.container.registerInstance(Symbols.Logger, this.logger)
Expand All @@ -76,6 +75,7 @@ export class Agent {
})

// Resolve instances after everything is registered
this.eventEmitter = this.container.resolve(EventEmitter)
this.messageSender = this.container.resolve(MessageSender)
this.messageReceiver = this.container.resolve(MessageReceiver)
this.wallet = this.container.resolve(Symbols.Wallet)
Expand All @@ -93,8 +93,8 @@ export class Agent {
}

private listenForMessages() {
this.eventEmitter.addListener('agentMessage', async (payload) => {
await this.receiveMessage(payload)
this.eventEmitter.on<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived, async (event) => {
await this.receiveMessage(event.payload.message)
})
}

Expand All @@ -110,6 +110,10 @@ export class Agent {
return this.messageSender.outboundTransporter
}

public get events() {
return this.eventEmitter
}

public async init() {
await this.wallet.init()

Expand Down
20 changes: 20 additions & 0 deletions src/agent/EventEmitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Lifecycle, scoped } from 'tsyringe'
import { EventEmitter as NativeEventEmitter } from 'events'
import { BaseEvent } from './Events'

@scoped(Lifecycle.ContainerScoped)
export class EventEmitter {
private eventEmitter = new NativeEventEmitter()

public emit<T extends BaseEvent>(data: T) {
this.eventEmitter.emit(data.type, data)
}

public on<T extends BaseEvent>(event: T['type'], listener: (data: T) => void | Promise<void>) {
this.eventEmitter.on(event, listener)
}

public off<T extends BaseEvent>(event: T['type'], listener: (data: T) => void | Promise<void>) {
this.eventEmitter.off(event, listener)
}
}
15 changes: 15 additions & 0 deletions src/agent/Events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export enum AgentEventTypes {
AgentMessageReceived = 'AgentMessageReceived',
}

export interface BaseEvent {
type: string
payload: Record<string, unknown>
}

export interface AgentMessageReceivedEvent extends BaseEvent {
type: typeof AgentEventTypes.AgentMessageReceived
payload: {
message: unknown
}
}
4 changes: 0 additions & 4 deletions src/decorators/signature/SignatureDecoratorUtils.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import indy from 'indy-sdk'
import { signData, unpackAndVerifySignatureDecorator } from './SignatureDecoratorUtils'
import { IndyWallet } from '../../wallet/IndyWallet'
import { SignatureDecorator } from './SignatureDecorator'
Expand All @@ -13,9 +12,6 @@ jest.mock('../../utils/timestamp', () => {
})

describe('Decorators | Signature | SignatureDecoratorUtils', () => {
const walletConfig = { id: 'wallet-1' + 'test1' }
const walletCredentials = { key: 'key' }

const data = {
did: 'did',
did_doc: {
Expand Down
15 changes: 15 additions & 0 deletions src/modules/basic-messages/BasicMessageEvents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { Verkey } from 'indy-sdk'
import { BaseEvent } from '../../agent/Events'
import { BasicMessage } from './messages'

export enum BasicMessageEventTypes {
BasicMessageReceived = 'BasicMessageReceived',
}

export interface BasicMessageReceivedEvent extends BaseEvent {
type: typeof BasicMessageEventTypes.BasicMessageReceived
payload: {
message: BasicMessage
verkey: Verkey
}
}
11 changes: 0 additions & 11 deletions src/modules/basic-messages/BasicMessagesModule.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { WalletQuery } from 'indy-sdk'
import { EventEmitter } from 'events'
import { Lifecycle, scoped } from 'tsyringe'

import { BasicMessageService } from './services'
Expand All @@ -19,16 +18,6 @@ export class BasicMessagesModule {
this.registerHandlers(dispatcher)
}

/**
* Get the event emitter for the basic message service. Will emit message received events
* when basic messages are received.
*
* @returns event emitter for basic message related events
*/
public get events(): EventEmitter {
return this.basicMessageService
}

public async sendMessage(connection: ConnectionRecord, message: string) {
const outboundMessage = await this.basicMessageService.send(message, connection)
await this.messageSender.sendMessage(outboundMessage)
Expand Down
20 changes: 12 additions & 8 deletions src/modules/basic-messages/__tests__/BasicMessageService.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import indy from 'indy-sdk'
import { IndyWallet } from '../../../wallet/IndyWallet'
import { Wallet } from '../../../wallet/Wallet'
import { Repository } from '../../../storage/Repository'
import { StorageService } from '../../../storage/StorageService'
import { IndyStorageService } from '../../../storage/IndyStorageService'
import { BasicMessageService, BasicMessageEventType } from '../services'
import { BasicMessageService } from '../services'
import { BasicMessageRecord } from '../repository/BasicMessageRecord'
import { BasicMessage } from '../messages'
import { InboundMessageContext } from '../../../agent/models/InboundMessageContext'
import { ConnectionRecord } from '../../connections'
import { AgentConfig } from '../../../agent/AgentConfig'
import { getBaseConfig } from '../../../__tests__/helpers'
import { EventEmitter } from '../../../agent/EventEmitter'
import { BasicMessageEventTypes, BasicMessageReceivedEvent } from '../BasicMessageEvents'

describe('BasicMessageService', () => {
const walletConfig = { id: 'test-wallet' + '-BasicMessageServiceTest' }
const walletCredentials = { key: 'key' }
const mockConnectionRecord = {
id: 'd3849ac3-c981-455b-a1aa-a10bea6cead8',
verkey: '71X9Y1aSPK11ariWUYQCYMjSewf2Kw2JFGeygEf9uZd9',
Expand All @@ -40,15 +39,17 @@ describe('BasicMessageService', () => {
describe('save', () => {
let basicMessageRepository: Repository<BasicMessageRecord>
let basicMessageService: BasicMessageService
let eventEmitter: EventEmitter

beforeEach(() => {
basicMessageRepository = new Repository<BasicMessageRecord>(BasicMessageRecord, storageService)
basicMessageService = new BasicMessageService(basicMessageRepository)
eventEmitter = new EventEmitter()
basicMessageService = new BasicMessageService(basicMessageRepository, eventEmitter)
})

it(`emits newMessage with connection verkey and message itself`, async () => {
const eventListenerMock = jest.fn()
basicMessageService.on(BasicMessageEventType.MessageReceived, eventListenerMock)
eventEmitter.on<BasicMessageReceivedEvent>(BasicMessageEventTypes.BasicMessageReceived, eventListenerMock)

const basicMessage = new BasicMessage({
id: '123',
Expand All @@ -66,8 +67,11 @@ describe('BasicMessageService', () => {
await basicMessageService.save(messageContext, mockConnectionRecord as ConnectionRecord)

expect(eventListenerMock).toHaveBeenCalledWith({
verkey: mockConnectionRecord.verkey,
message: messageContext.message,
type: 'BasicMessageReceived',
payload: {
verkey: mockConnectionRecord.verkey,
message: messageContext.message,
},
})
})
})
Expand Down
1 change: 1 addition & 0 deletions src/modules/basic-messages/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './messages'
export * from './services'
export * from './repository'
export * from './BasicMessageEvents'
30 changes: 11 additions & 19 deletions src/modules/basic-messages/services/BasicMessageService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Verkey, WalletQuery } from 'indy-sdk'
import { EventEmitter } from 'events'
import type { WalletQuery } from 'indy-sdk'
import { Lifecycle, scoped } from 'tsyringe'

import { OutboundMessage } from '../../../types'
Expand All @@ -9,23 +8,17 @@ import { ConnectionRecord } from '../../connections/repository/ConnectionRecord'
import { InboundMessageContext } from '../../../agent/models/InboundMessageContext'
import { BasicMessage } from '../messages'
import { BasicMessageRepository } from '../repository'

export enum BasicMessageEventType {
MessageReceived = 'messageReceived',
}

export interface BasicMessageReceivedEvent {
message: BasicMessage
verkey: Verkey
}
import { EventEmitter } from '../../../agent/EventEmitter'
import { BasicMessageEventTypes, BasicMessageReceivedEvent } from '../BasicMessageEvents'

@scoped(Lifecycle.ContainerScoped)
export class BasicMessageService extends EventEmitter {
export class BasicMessageService {
private basicMessageRepository: BasicMessageRepository
private eventEmitter: EventEmitter

public constructor(basicMessageRepository: BasicMessageRepository) {
super()
public constructor(basicMessageRepository: BasicMessageRepository, eventEmitter: EventEmitter) {
this.basicMessageRepository = basicMessageRepository
this.eventEmitter = eventEmitter
}

public async send(message: string, connection: ConnectionRecord): Promise<OutboundMessage<BasicMessage>> {
Expand Down Expand Up @@ -56,11 +49,10 @@ export class BasicMessageService extends EventEmitter {
})

await this.basicMessageRepository.save(basicMessageRecord)
const event: BasicMessageReceivedEvent = {
message,
verkey: connection.verkey,
}
this.emit(BasicMessageEventType.MessageReceived, event)
this.eventEmitter.emit<BasicMessageReceivedEvent>({
type: BasicMessageEventTypes.BasicMessageReceived,
payload: { message, verkey: connection.verkey },
})
}

public async findAllByQuery(query: WalletQuery) {
Expand Down
15 changes: 15 additions & 0 deletions src/modules/connections/ConnectionEvents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { BaseEvent } from '../../agent/Events'
import { ConnectionRecord } from './repository/ConnectionRecord'
import { ConnectionState } from './models/ConnectionState'

export enum ConnectionEventTypes {
ConnectionStateChanged = 'ConnectionStateChanged',
}

export interface ConnectionStateChangedEvent extends BaseEvent {
type: typeof ConnectionEventTypes.ConnectionStateChanged
payload: {
connectionRecord: ConnectionRecord
previousState: ConnectionState | null
}
}
11 changes: 0 additions & 11 deletions src/modules/connections/ConnectionsModule.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Verkey } from 'indy-sdk'
import { EventEmitter } from 'events'
import { Lifecycle, scoped } from 'tsyringe'

import { AgentConfig } from '../../agent/AgentConfig'
Expand Down Expand Up @@ -42,16 +41,6 @@ export class ConnectionsModule {
this.registerHandlers(dispatcher)
}

/**
* Get the event emitter for the connection service. Will emit state changed events
* when the state of connections records changes.
*
* @returns event emitter for connection related state changes
*/
public get events(): EventEmitter {
return this.connectionService
}

public async createConnection(config?: {
autoAcceptConnection?: boolean
alias?: string
Expand Down
Loading

0 comments on commit ea814b2

Please sign in to comment.