Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
Fixed bugs in instanceserver connections (#10887)
Browse files Browse the repository at this point in the history
* Fixed bugs in instanceserver connections

There were still some bugs surrounding a client connecting to an
instanceserver if the instance record for that server no longer
existed. updateInstance needed to return false in that situation.

This still led to a situation where the client would be stuck trying
to authenticate forever. Added a step prior to authentication where
the client listens for a message with { instanceReady: true }, which
is sent by setupSocketFunctions. If the server isn't ready within
10 seconds, then it returns { instanceReady: false }, and the client
will do a new instanceserver provision and hopefully connect properly
this time.

* make eslint happy

---------

Co-authored-by: Josh Field <[email protected]>
  • Loading branch information
barankyle and HexaField authored Aug 6, 2024
1 parent c0b4c25 commit 50aa284
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 32 deletions.
113 changes: 86 additions & 27 deletions packages/client-core/src/transports/SocketWebRTCClientFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import { getSearchParamFromURL } from '@etherealengine/common/src/utils/getSearc
import { Engine } from '@etherealengine/ecs/src/Engine'
import { defineSystem, destroySystem } from '@etherealengine/ecs/src/SystemFunctions'
import { PresentationSystemGroup } from '@etherealengine/ecs/src/SystemGroups'
import { AuthTask } from '@etherealengine/engine/src/avatar/functions/receiveJoinWorld'
import { AuthTask, ReadyTask } from '@etherealengine/engine/src/avatar/functions/receiveJoinWorld'
import { Identifiable, PeerID, State, dispatchAction, getMutableState, getState, none } from '@etherealengine/hyperflux'
import {
Action,
Expand Down Expand Up @@ -98,6 +98,8 @@ import {
stopFaceTracking,
stopLipsyncTracking
} from '../media/webcam/WebcamInput'
import { ChannelState } from '../social/services/ChannelService'
import { LocationState } from '../social/services/LocationService'
import { AuthState } from '../user/services/AuthService'
import { MediaStreamState, MediaStreamService as _MediaStreamService } from './MediaStreams'
import { clearPeerMediaChannels } from './PeerMediaChannelState'
Expand Down Expand Up @@ -221,7 +223,7 @@ export const connectToInstance = (
if (instanceStillProvisioned(instanceID, locationID, channelID)) _connect()
}, 3000)

const onConnect = () => {
const onConnect = async () => {
if (aborted || !primus) return
connecting = false
primus.off('incoming::open', onConnect)
Expand All @@ -230,31 +232,49 @@ export const connectToInstance = (
clearTimeout(connectionFailTimeout)

const topic = locationID ? NetworkTopics.world : NetworkTopics.media
authenticatePrimus(primus, instanceID, topic)

/** Server closed the connection. */
const onDisconnect = () => {
if (aborted) return
if (primus) {
primus.off('incoming::end', onDisconnect)
primus.off('end', onDisconnect)
const instanceserverReady = await checkInstanceserverReady(primus, instanceID, topic)
if (instanceserverReady) {
await authenticatePrimus(primus, instanceID, topic)

/** Server closed the connection. */
const onDisconnect = () => {
if (aborted) return
if (primus) {
primus.off('incoming::end', onDisconnect)
primus.off('end', onDisconnect)
}
const network = getState(NetworkState).networks[instanceID] as SocketWebRTCClientNetwork
if (!network) return logger.error('Disconnected from unconnected instance ' + instanceID)

logger.info('Disconnected from network %o', { topic: network.topic, id: network.id })
/**
* If we are disconnected (server closes our socket) rather than leave the network,
* we just need to destroy and recreate the transport
*/
closeNetwork(network)
/** If we still have the instance provisioned, we should try again */
if (instanceStillProvisioned(instanceID, locationID, channelID)) _connect()
}
// incoming::end is emitted when the server closes the connection
primus.on('incoming::end', onDisconnect)
// end is emitted when the client closes the connection
primus.on('end', onDisconnect)
} else {
if (locationID) {
const currentLocation = getMutableState(LocationState).currentLocation.location
const currentLocationId = currentLocation.id.value
currentLocation.id.set(undefined as unknown as LocationID)
currentLocation.id.set(currentLocationId)
} else {
const channelState = getMutableState(ChannelState)
const targetChannelId = channelState.targetChannelId.value
channelState.targetChannelId.set(undefined as unknown as ChannelID)
channelState.targetChannelId.set(targetChannelId)
}
const network = getState(NetworkState).networks[instanceID] as SocketWebRTCClientNetwork
if (!network) return logger.error('Disconnected from unconnected instance ' + instanceID)

logger.info('Disonnected from network %o', { topic: network.topic, id: network.id })
/**
* If we are disconnected (server closes our socket) rather than leave the network,
* we just need to destroy and recreate the transport
*/
closeNetwork(network)
/** If we still have the instance provisioned, we should try again */
if (instanceStillProvisioned(instanceID, locationID, channelID)) _connect()
primus.removeAllListeners()
primus.end()
console.log('PRIMUS GONE')
}
// incoming::end is emitted when the server closes the connection
primus.on('incoming::end', onDisconnect)
// end is emitted when the client closes the connection
primus.on('end', onDisconnect)
}
primus!.on('incoming::open', onConnect)
}
Expand Down Expand Up @@ -283,6 +303,45 @@ export const getChannelIdFromTransport = (network: SocketWebRTCClientNetwork) =>
return isWorldConnection ? null : currentChannelInstanceConnection?.channelId
}

export async function checkInstanceserverReady(primus: Primus, instanceID: InstanceID, topic: Topic) {
logger.info('Checking that instanceserver is ready')
const { instanceReady } = await new Promise<ReadyTask>((resolve) => {
const onStatus = (response: ReadyTask) => {
// eslint-disable-next-line no-prototype-builtins
if (response.hasOwnProperty('instanceReady')) {
clearInterval(interval)
resolve(response)
primus.off('data', onStatus)
primus.removeListener('incoming::end', onDisconnect)
}
}

primus.on('data', onStatus)

let disconnected = false
const interval = setInterval(() => {
if (disconnected) {
clearInterval(interval)
resolve({ instanceReady: false })
primus.removeAllListeners()
primus.end()
return
}
}, 100)

const onDisconnect = () => {
disconnected = true
}
primus.addListener('incoming::end', onDisconnect)
})

if (!instanceReady) {
unprovisionInstance(topic, instanceID)
}

return instanceReady
}

export async function authenticatePrimus(primus: Primus, instanceID: InstanceID, topic: Topic) {
logger.info('Authenticating instance ' + instanceID)

Expand Down Expand Up @@ -325,10 +384,10 @@ export async function authenticatePrimus(primus: Primus, instanceID: InstanceID,
/** We failed to connect to be authenticated, we do not want to try again */
// TODO: do we want to unprovision here?
unprovisionInstance(topic, instanceID)
return logger.error(new Error('Unable to connect with credentials' + error))
return logger.error(new Error('Unable to connect with credentials ' + error))
}

connectToNetwork(primus, instanceID, topic, hostPeerID!, routerRtpCapabilities!, cachedActions!)
await connectToNetwork(primus, instanceID, topic, hostPeerID!, routerRtpCapabilities!, cachedActions!)
}

export const connectToNetwork = async (
Expand Down
4 changes: 4 additions & 0 deletions packages/engine/src/avatar/functions/receiveJoinWorld.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ export type AuthTask = {
error?: AuthError
}

export type ReadyTask = {
instanceReady: boolean
}

export type JoinWorldRequestData = {
inviteCode?: InviteCode
}
Expand Down
18 changes: 16 additions & 2 deletions packages/instanceserver/src/SocketFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import { getServerNetwork } from './SocketWebRTCServerFunctions'

const logger = multiLogger.child({ component: 'instanceserver:spark' })

const NON_READY_INTERVALS = 100 //100 tenths of a second, i.e. 10 seconds

export const setupSocketFunctions = async (app: Application, spark: any) => {
let authTask: AuthTask | undefined

Expand All @@ -50,15 +52,27 @@ export const setupSocketFunctions = async (app: Application, spark: any) => {
*
* Authorize user and make sure everything is valid before allowing them to join the world
**/
await new Promise<void>((resolve) => {
const ready = await new Promise<boolean>((resolve) => {
let counter = 0
const interval = setInterval(() => {
counter++
if (getState(InstanceServerState).ready) {
clearInterval(interval)
resolve()
resolve(true)
}
if (counter > NON_READY_INTERVALS) {
clearInterval(interval)
resolve(false)
}
}, 100)
})

if (!ready) {
app.primus.write({ instanceReady: false })
return
}

app.primus.write({ instanceReady: true })
const network = getServerNetwork(app)

const onAuthenticationRequest = async (data) => {
Expand Down
10 changes: 7 additions & 3 deletions packages/instanceserver/src/channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,13 @@ const updateInstance = async ({
if (isNeedingNewServer && !instanceStarted) {
instanceStarted = true
const initialized = await initializeInstance({ app, status, headers, userId })
if (initialized) await loadEngine({ app, sceneId, headers })
else instanceStarted = false
return true
if (initialized) {
await loadEngine({ app, sceneId, headers })
return true
} else {
instanceStarted = false
return false
}
} else {
try {
if (!getState(InstanceServerState).ready)
Expand Down

0 comments on commit 50aa284

Please sign in to comment.