Skip to content

Commit

Permalink
Merge branch 'feat/openmedia-hotstandby-heartbeat-logic'
Browse files Browse the repository at this point in the history
  • Loading branch information
nytamin committed Jan 20, 2025
2 parents 3d76560 + 0c403f1 commit 61bda3f
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 277 deletions.
22 changes: 20 additions & 2 deletions packages/connector/src/MosConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ export class MosConnection extends EventEmitter<MosConnectionEvents> implements
MosConnection.nextSocketID,
connectionOptions.secondary.ports?.lower ?? MosConnection.CONNECTION_PORT_LOWER,
'lower',
true
!connectionOptions.secondary.openMediaHotStandby
)
secondary.createClient(
MosConnection.nextSocketID,
connectionOptions.secondary.ports?.upper ?? MosConnection.CONNECTION_PORT_UPPER,
'upper',
true
!connectionOptions.secondary.openMediaHotStandby
)
if (!connectionOptions.primary.dontUseQueryPort) {
secondary.createClient(
Expand All @@ -180,6 +180,7 @@ export class MosConnection extends EventEmitter<MosConnectionEvents> implements
if (connectionOptions.secondary?.openMediaHotStandby) {
// Initially disable heartbeats on secondary since primary should be attempted first
secondary.disableHeartbeats()
primary.enableHeartbeats()

primary.on('connectionChanged', () => {
if (primary.connected) {
Expand All @@ -190,6 +191,23 @@ export class MosConnection extends EventEmitter<MosConnectionEvents> implements
primary.disableHeartbeats()
}
})

// Handle secondary connection changes
setTimeout(() => {
secondary?.on('connectionChanged', () => {
if (!primary.connected) {
// Secondary is active
if (secondary?.connected) {
secondary.enableHeartbeats()
primary.disableHeartbeats()
} else {
// Secondary disconnected - try to re-enable primary
primary.enableHeartbeats()
secondary?.disableHeartbeats()
}
}
})
}, 50)
}
}

Expand Down
151 changes: 0 additions & 151 deletions packages/connector/src/__tests__/MosConnection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
decode,
delay,
encode,
getConnectionsFromDevice,
getMessageId,
getXMLReply,
initMosConnection,
Expand Down Expand Up @@ -542,157 +541,7 @@ describe('MosDevice: General', () => {
expect(onError).toHaveBeenCalledTimes(0)
expect(onWarning).toHaveBeenCalledTimes(0)

await mos.dispose()
})
test('Hot standby', async () => {
const mos = new MosConnection({
mosID: 'jestMOS',
acceptsConnections: true,
profiles: {
'0': true,
'1': true,
},
})
const onError = jest.fn((e) => console.log(e))
const onWarning = jest.fn((e) => console.log(e))
mos.on('error', onError)
mos.on('warning', onWarning)

expect(mos.acceptsConnections).toBe(true)
await initMosConnection(mos)
expect(mos.isListening).toBe(true)

const mosDevice = await mos.connect({
primary: {
id: 'primary',
host: '192.168.0.1',
timeout: 200,
},
secondary: {
id: 'secondary',
host: '192.168.0.2',
timeout: 200,
openMediaHotStandby: true,
},
})

expect(mosDevice).toBeTruthy()
expect(mosDevice.idPrimary).toEqual('jestMOS_primary')

const connections = getConnectionsFromDevice(mosDevice)
expect(connections.primary).toBeTruthy()
expect(connections.secondary).toBeTruthy()
connections.primary?.setAutoReconnectInterval(300)
connections.secondary?.setAutoReconnectInterval(300)

const onConnectionChange = jest.fn()
mosDevice.onConnectionChange((connectionStatus: IMOSConnectionStatus) => {
onConnectionChange(connectionStatus)
})

expect(SocketMock.instances).toHaveLength(7)
expect(SocketMock.instances[1].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[1].connectedPort).toEqual(10540)
expect(SocketMock.instances[2].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[2].connectedPort).toEqual(10541)
expect(SocketMock.instances[3].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[3].connectedPort).toEqual(10542)

// TODO: Perhaps the hot-standby should not be connected at all at this point?
expect(SocketMock.instances[4].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[4].connectedPort).toEqual(10540)
expect(SocketMock.instances[5].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[5].connectedPort).toEqual(10541)
expect(SocketMock.instances[6].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[6].connectedPort).toEqual(10542)

// Simulate primary connected:
for (const i of SocketMock.instances) {
if (i.connectedHost === '192.168.0.1') i.mockEmitConnected()
}
// Wait for the primary to be initially connected:
await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected behaviour from a hot standby - we leave it up to the library consumer to decide if this is bad or not
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
onConnectionChange.mockClear()

// Simulate primary disconnect, secondary hot standby takes over:
for (const i of SocketMock.instances) {
i.mockConnectCount = 0
if (i.connectedHost === '192.168.0.1') i.mockEmitClose()
if (i.connectedHost === '192.168.0.2') i.mockEmitConnected()
}

// Wait for the secondary to be connected:
await waitFor(() => mosDevice.getConnectionStatus().SecondaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: false,
PrimaryStatus: expect.stringContaining('Primary'),
SecondaryConnected: true,
SecondaryStatus: 'Secondary: Connected',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: false,
PrimaryStatus: expect.stringContaining('Primary'),
SecondaryConnected: true,
SecondaryStatus: 'Secondary: Connected',
})
onConnectionChange.mockClear()

// Simulate that the primary comes back online:
for (const i of SocketMock.instances) {
if (i.connectedHost === '192.168.0.1') {
expect(i.mockConnectCount).toBeGreaterThanOrEqual(1) // should have tried to reconnect
i.mockEmitConnected()
}

if (i.connectedHost === '192.168.0.2') i.mockEmitClose()
}

// Wait for the primary to be connected:
await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})

await mos.dispose()
})
})
async function waitFor(fcn: () => boolean, timeout: number): Promise<void> {
const startTime = Date.now()

while (Date.now() - startTime < timeout) {
await delay(10)

if (fcn()) return
}
throw new Error('Timeout in waitFor')
}
Loading

0 comments on commit 61bda3f

Please sign in to comment.