Skip to content

Commit

Permalink
fix(events): close ws connection on unmount, catch and retry logic (#…
Browse files Browse the repository at this point in the history
…1130)

## Done

Refactored events code:
- Close ws connection on unmount
- Add catch and retry logic for ws connection failures
- Refactored promises code
  • Loading branch information
edlerd authored Mar 5, 2025
2 parents 0e54e15 + 0e98b52 commit 8988790
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 97 deletions.
8 changes: 2 additions & 6 deletions src/api/images.tsx
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import {
continueOrFinish,
handleResponse,
pushFailure,
pushSuccess,
} from "util/helpers";
import { handleResponse } from "util/helpers";
import { continueOrFinish, pushFailure, pushSuccess } from "util/promises";
import type { LxdImage } from "types/image";
import type { LxdApiResponse } from "types/apiResponse";
import type { LxdOperationResponse } from "types/operation";
Expand Down
8 changes: 2 additions & 6 deletions src/api/instance-snapshots.tsx
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import {
continueOrFinish,
handleResponse,
pushFailure,
pushSuccess,
} from "util/helpers";
import { handleResponse } from "util/helpers";
import { continueOrFinish, pushFailure, pushSuccess } from "util/promises";
import type { LxdInstance, LxdInstanceSnapshot } from "types/instance";
import type { LxdOperationResponse } from "types/operation";
import type { EventQueue } from "context/eventQueue";
Expand Down
4 changes: 1 addition & 3 deletions src/api/instances.tsx
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import {
continueOrFinish,
handleEtagResponse,
handleResponse,
handleTextResponse,
pushFailure,
pushSuccess,
} from "util/helpers";
import { continueOrFinish, pushFailure, pushSuccess } from "util/promises";
import type { LxdInstance, LxdInstanceAction } from "types/instance";
import type { LxdTerminal, TerminalConnectPayload } from "types/terminal";
import type { LxdApiResponse } from "types/apiResponse";
Expand Down
8 changes: 2 additions & 6 deletions src/api/volume-snapshots.tsx
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import {
continueOrFinish,
handleResponse,
pushFailure,
pushSuccess,
} from "util/helpers";
import { handleResponse } from "util/helpers";
import { continueOrFinish, pushFailure, pushSuccess } from "util/promises";
import type { LxdOperationResponse } from "types/operation";
import type { LxdStorageVolume, LxdVolumeSnapshot } from "types/storage";
import type { LxdApiResponse, LxdSyncResponse } from "types/apiResponse";
Expand Down
2 changes: 1 addition & 1 deletion src/pages/images/actions/BulkDeleteImageBtn.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { deleteImageBulk } from "api/images";
import { useQueryClient } from "@tanstack/react-query";
import { queryKeys } from "util/queryKeys";
import { useEventQueue } from "context/eventQueue";
import { getPromiseSettledCounts } from "util/helpers";
import { getPromiseSettledCounts } from "util/promises";
import { pluralize } from "util/instanceBulkActions";
import { useToastNotification } from "context/toastNotificationProvider";
import BulkDeleteButton from "components/BulkDeleteButton";
Expand Down
85 changes: 53 additions & 32 deletions src/pages/instances/Events.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ import { useAuth } from "context/auth";
import { useQueryClient } from "@tanstack/react-query";
import { queryKeys } from "util/queryKeys";
import { useOperations } from "context/operationsProvider";
import { useNotify } from "@canonical/react-components";

const EVENT_HANDLER_DELAY = 250;
const WS_RETRY_DELAY_MULTIPLIER = 250;
const MAX_WS_CONNECTION_RETRIES = 5;

const Events: FC = () => {
const { isAuthenticated } = useAuth();
const eventQueue = useEventQueue();
const queryClient = useQueryClient();
const notify = useNotify();
const [eventWs, setEventWs] = useState<WebSocket | null>(null);
const { refetchOperations } = useOperations();

Expand Down Expand Up @@ -65,45 +71,60 @@ const Events: FC = () => {
return "undefined";
};

const connectEventWs = () => {
const wsUrl = `wss://${location.host}/1.0/events?type=operation,lifecycle&all-projects=true`;
const ws = new WebSocket(wsUrl);
ws.onopen = () => {
setEventWs(ws);
};
ws.onclose = () => {
setEventWs(null);
};
ws.onmessage = (message: MessageEvent<Blob | string | null>) => {
if (typeof message.data !== "string") {
console.log("Invalid format on event api: ", message.data);
return;
}
const event = JSON.parse(message.data) as LxdEvent;
if (event.type === "operation") {
queryClient.invalidateQueries({
queryKey: [queryKeys.operations, event.project],
});
refetchOperations();
}
if (event.type === "lifecycle") {
const rootQueryKey = getLifecycleRootQueryKey(event);
queryClient.invalidateQueries({
predicate: (query) => query.queryKey[0] === rootQueryKey,
});
const connectEventWs = (retryCount = 0) => {
try {
const wsUrl = `wss://${location.host}/1.0/events?type=operation,lifecycle&all-projects=true`;
const ws = new WebSocket(wsUrl);
ws.onopen = () => {
setEventWs(ws);
};
ws.onclose = () => {
setEventWs(null);
};
ws.onmessage = (message: MessageEvent<Blob | string | null>) => {
if (typeof message.data !== "string") {
console.log("Invalid format on event api: ", message.data);
return;
}
const event = JSON.parse(message.data) as LxdEvent;
if (event.type === "operation") {
queryClient.invalidateQueries({
queryKey: [queryKeys.operations, event.project],
});
refetchOperations();
}
if (event.type === "lifecycle") {
const rootQueryKey = getLifecycleRootQueryKey(event);
queryClient.invalidateQueries({
predicate: (query) => query.queryKey[0] === rootQueryKey,
});
}
// ensure open requests that reply with an operation and register
// new handlers in the eventQueue are closed before handling the event
setTimeout(() => {
handleEvent(event);
}, EVENT_HANDLER_DELAY);
};
} catch (e) {
if (retryCount < MAX_WS_CONNECTION_RETRIES) {
setTimeout(() => {
connectEventWs(retryCount + 1);
}, WS_RETRY_DELAY_MULTIPLIER * retryCount);
} else {
notify.failure("Failed to connect to event api", e);
}
// ensure open requests that reply with an operation and register
// new handlers in the eventQueue are closed before handling the event
setTimeout(() => {
handleEvent(event);
}, 250);
};
}
};

useEffect(() => {
if (!eventWs && isAuthenticated) {
connectEventWs();
}
return () => {
if (eventWs) {
eventWs.close();
}
};
}, [eventWs, isAuthenticated]);
return <></>;
};
Expand Down
2 changes: 1 addition & 1 deletion src/pages/instances/actions/InstanceBulkActions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
pluralize,
} from "util/instanceBulkActions";
import InstanceBulkAction from "pages/instances/actions/InstanceBulkAction";
import { getPromiseSettledCounts } from "util/helpers";
import { getPromiseSettledCounts } from "util/promises";
import { useEventQueue } from "context/eventQueue";
import { useToastNotification } from "context/toastNotificationProvider";
import { useInstanceEntitlements } from "util/entitlements/instances";
Expand Down
2 changes: 1 addition & 1 deletion src/pages/instances/actions/InstanceBulkDelete.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { pluralize } from "util/instanceBulkActions";
import { queryKeys } from "util/queryKeys";
import { useQueryClient } from "@tanstack/react-query";
import { deletableStatuses } from "util/instanceDelete";
import { getPromiseSettledCounts } from "util/helpers";
import { getPromiseSettledCounts } from "util/promises";
import { useEventQueue } from "context/eventQueue";
import { useToastNotification } from "context/toastNotificationProvider";
import { useInstanceEntitlements } from "util/entitlements/instances";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { useQueryClient } from "@tanstack/react-query";
import { queryKeys } from "util/queryKeys";
import { pluralize } from "util/instanceBulkActions";
import { useEventQueue } from "context/eventQueue";
import { getPromiseSettledCounts } from "util/helpers";
import { getPromiseSettledCounts } from "util/promises";
import { useInstanceEntitlements } from "util/entitlements/instances";
import BulkDeleteButton from "components/BulkDeleteButton";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { useQueryClient } from "@tanstack/react-query";
import { queryKeys } from "util/queryKeys";
import { pluralize } from "util/instanceBulkActions";
import { useEventQueue } from "context/eventQueue";
import { getPromiseSettledCounts } from "util/helpers";
import { getPromiseSettledCounts } from "util/promises";
import type { LxdStorageVolume } from "types/storage";
import { useToastNotification } from "context/toastNotificationProvider";
import BulkDeleteButton from "components/BulkDeleteButton";
Expand Down
39 changes: 0 additions & 39 deletions src/util/helpers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -200,45 +200,6 @@ export const getParentsBottomSpacing = (element: Element): number => {
return sum;
};

export const getPromiseSettledCounts = (
results: PromiseSettledResult<void>[],
): { fulfilledCount: number; rejectedCount: number } => {
const fulfilledCount = results.filter(
(result) => result.status === "fulfilled",
).length;
const rejectedCount = results.filter(
(result) => result.status === "rejected",
).length;
return { fulfilledCount, rejectedCount };
};

export const pushSuccess = (results: PromiseSettledResult<void>[]): void => {
results.push({
status: "fulfilled",
value: undefined,
});
};

export const pushFailure = (
results: PromiseSettledResult<void>[],
msg: string,
): void => {
results.push({
status: "rejected",
reason: msg,
});
};

export const continueOrFinish = (
results: PromiseSettledResult<void>[],
totalLength: number,
resolve: (value: PromiseSettledResult<void>[]) => void,
): void => {
if (totalLength === results.length) {
resolve(results);
}
};

export const logout = (): void =>
void fetch("/oidc/logout").then(() => {
if (!window.location.href.includes("/ui/login")) {
Expand Down
40 changes: 40 additions & 0 deletions src/util/promises.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
export const getPromiseSettledCounts = (
results: PromiseSettledResult<void>[],
): { fulfilledCount: number; rejectedCount: number } => {
const settledCounts = { fulfilledCount: 0, rejectedCount: 0 };
results.forEach((result) => {
if (result.status === "fulfilled") {
settledCounts.fulfilledCount++;
} else if (result.status === "rejected") {
settledCounts.rejectedCount++;
}
});
return settledCounts;
};

export const pushSuccess = (results: PromiseSettledResult<void>[]): void => {
results.push({
status: "fulfilled",
value: undefined,
});
};

export const pushFailure = (
results: PromiseSettledResult<void>[],
msg: string,
): void => {
results.push({
status: "rejected",
reason: msg,
});
};

export const continueOrFinish = (
results: PromiseSettledResult<void>[],
totalLength: number,
resolve: (value: PromiseSettledResult<void>[]) => void,
): void => {
if (totalLength === results.length) {
resolve(results);
}
};

0 comments on commit 8988790

Please sign in to comment.