Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a single "message" event listener to dispatch received messages #653

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions src/comlink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ interface ThrownValue {
type SerializedThrownValue =
| { isError: true; value: Error }
| { isError: false; value: unknown };
type PendingListenersMap = Map<
string,
(value: WireValue | PromiseLike<WireValue>) => void
>;

/**
* Internal transfer handler to handle thrown exceptions.
Expand Down Expand Up @@ -392,7 +396,26 @@ function closeEndPoint(endpoint: Endpoint) {
}

export function wrap<T>(ep: Endpoint, target?: any): Remote<T> {
return createProxy<T>(ep, [], target) as any;
const pendingListeners : PendingListenersMap = new Map();

ep.addEventListener("message", function handleMessage(ev: Event) {
const { data } = ev as MessageEvent;
if (!data || !data.id) {
return;
}
const resolver = pendingListeners.get(data.id);
if (!resolver) {
return;
}

try {
resolver(data);
} finally {
pendingListeners.delete(data.id);
}
});

return createProxy<T>(ep, pendingListeners, [], target) as any;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this case as any? I know this was also true of the earlier code but in our code (foxglove) we always avoid any - are we not able to get the correct type here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. But seems like it's not that easy, quoting the README:

Comlink does provide TypeScript types. When you expose() something of type T, the corresponding wrap() call will return something of type Comlink.Remote<T>. While this type has been battle-tested over some time now, it is implemented on a best-effort basis. There are some nuances that are incredibly hard if not impossible to encode correctly in TypeScript’s type system. It may sometimes be necessary to force a certain type using as unknown as <type>.

}

function throwIfProxyReleased(isReleased: boolean) {
Expand All @@ -402,7 +425,7 @@ function throwIfProxyReleased(isReleased: boolean) {
}

function releaseEndpoint(ep: Endpoint) {
return requestResponseMessage(ep, {
return requestResponseMessage(ep, new Map(), {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did noone ever address this? this now leaks INDEFINETLY, causing ep[finalizer]() to never be called!!! this means in browsers you will leak threads if you had worker.terminate linked to the exposed [finalizer] property, this promise will never resolve!!!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which part are you saying is leaking?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing

class SelfTerminate extends Worker {
  [finalize](){
    this.terminate() // never fires
  }
}

const x = wrap(new SelfTerminate('./w'))
x[releaseProxy]()

means the worker never terminates.... it used to before this change, thats because the releaseEndpoint promise never settles

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I don't understand how this worked before. When releasing the proxy, the finalizer of the exposed object (the object created in the worker) is called, not the finalizer of the SelfTerminate instance. I have tested this with v4.4.1 which does not include this change.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Comlink Worker Example</title>
</head>
<body>
    <script type="module">
        import * as Comlink from 'https://cdn.jsdelivr.net/npm/[email protected]/dist/esm/comlink.mjs';

        // Inline worker definition
        const workerCode = `
            importScripts('https://cdn.jsdelivr.net/npm/[email protected]/dist/umd/comlink.js');
            const workerAPI = {
                add(a, b) {
                    return a + b;
                },
                // Uncomment function below to have worker close itself when proxy is released.
                // [Comlink.finalizer]() {
                //     console.warn("Closing worker...");
                //     self.close();
                // }
            };
            Comlink.expose(workerAPI);
        `;

        class SelfTerminate extends Worker {
            [Comlink.finalizer]() {
                console.warn("Terminating worker...");
                this.terminate();
            }
        }

        const blob = new Blob([workerCode], { type: 'application/javascript' });
        const worker = new SelfTerminate(URL.createObjectURL(blob));
        (async function() {
            const workerAPI = Comlink.wrap(worker);
            const result = await workerAPI.add(5, 7);
            console.log(`The result of the addition is: ${result}`);
            workerAPI[Comlink.releaseProxy]();
        })();
    </script>
</body>
</html>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, sorry, a minor missunderstanding on my part, I wrote this when tired, it needs to be a message port, which then is never closed https://github.com/GoogleChromeLabs/comlink/blob/main/src/comlink.ts#L395

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK got it. I'll submit a PR that fixes this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await releaseEndpoint(ep, pendingListeners)

function releaseEndpoint (ep: Endpoint, listeners: PendingListenersMap = new Map()) {
  return requestResponseMessage(ep, listeners, {
    type: MessageType.RELEASE
  }).then(() => {
    if (finalizer in ep && typeof ep[finalizer] === 'function') {
      ep[finalizer]()
    }
  })
}

is how i fixed it on my end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you verify if the change in foxglove#6 also works for you? If yes, then I'll submit a PR to this repo here.

type: MessageType.RELEASE,
}).then(() => {
closeEndPoint(ep);
Expand Down Expand Up @@ -447,6 +470,7 @@ function unregisterProxy(proxy: object) {

function createProxy<T>(
ep: Endpoint,
pendingListeners: PendingListenersMap,
path: (string | number | symbol)[] = [],
target: object = function () {}
): Remote<T> {
Expand All @@ -458,20 +482,21 @@ function createProxy<T>(
return () => {
unregisterProxy(proxy);
releaseEndpoint(ep);
pendingListeners.clear();
isProxyReleased = true;
};
}
if (prop === "then") {
if (path.length === 0) {
return { then: () => proxy };
}
const r = requestResponseMessage(ep, {
const r = requestResponseMessage(ep, pendingListeners, {
type: MessageType.GET,
path: path.map((p) => p.toString()),
}).then(fromWireValue);
return r.then.bind(r);
}
return createProxy(ep, [...path, prop]);
return createProxy(ep, pendingListeners, [...path, prop]);
},
set(_target, prop, rawValue) {
throwIfProxyReleased(isProxyReleased);
Expand All @@ -480,6 +505,7 @@ function createProxy<T>(
const [value, transferables] = toWireValue(rawValue);
return requestResponseMessage(
ep,
pendingListeners,
{
type: MessageType.SET,
path: [...path, prop].map((p) => p.toString()),
Expand All @@ -492,17 +518,18 @@ function createProxy<T>(
throwIfProxyReleased(isProxyReleased);
const last = path[path.length - 1];
if ((last as any) === createEndpoint) {
return requestResponseMessage(ep, {
return requestResponseMessage(ep, pendingListeners, {
type: MessageType.ENDPOINT,
}).then(fromWireValue);
}
// We just pretend that `bind()` didn’t happen.
if (last === "bind") {
return createProxy(ep, path.slice(0, -1));
return createProxy(ep, pendingListeners, path.slice(0, -1));
}
const [argumentList, transferables] = processArguments(rawArgumentList);
return requestResponseMessage(
ep,
pendingListeners,
{
type: MessageType.APPLY,
path: path.map((p) => p.toString()),
Expand All @@ -516,6 +543,7 @@ function createProxy<T>(
const [argumentList, transferables] = processArguments(rawArgumentList);
return requestResponseMessage(
ep,
pendingListeners,
{
type: MessageType.CONSTRUCT,
path: path.map((p) => p.toString()),
Expand Down Expand Up @@ -595,23 +623,18 @@ function fromWireValue(value: WireValue): any {

function requestResponseMessage(
ep: Endpoint,
pendingListeners: PendingListenersMap,
msg: Message,
transfers?: Transferable[]
): Promise<WireValue> {
return new Promise((resolve) => {
const id = generateUUID();
ep.addEventListener("message", function l(ev: MessageEvent) {
if (!ev.data || !ev.data.id || ev.data.id !== id) {
return;
}
ep.removeEventListener("message", l as any);
resolve(ev.data);
} as any);
pendingListeners.set(id, resolve);
if (ep.start) {
ep.start();
}
ep.postMessage({ id, ...msg }, transfers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random thought for the future - but it would be nice to avoid this spread operator - I don't see any reason for it when we could instead have id, payload or similar. While not a massive perf issue, removing it is fewer work cycles for the runtime with no downside to the interface or logic since this is an internal structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be something for a separate PR?

});
});
}

function generateUUID(): string {
Expand Down