Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFE: Support async interceptors #580

Open
dbo opened this issue Aug 17, 2023 · 8 comments
Open

RFE: Support async interceptors #580

dbo opened this issue Aug 17, 2023 · 8 comments

Comments

@dbo
Copy link

dbo commented Aug 17, 2023

As far as I see there's no way to asynchronously intercept resp. call the next interceptor which is sometimes inconvenient when e.g. authorization is lazily initialized.

@jcready
Copy link
Contributor

jcready commented Aug 17, 2023

It is possible, albeit inconvenient with a lot of boilerplate. Basically you must create and return your own [ClientStreaming|DuplexStreaming|ServerStreaming|Unary]Call (depending on the interceptor) and then manually resolve/reject the call's Deferreds. I outlined how one would create a retry interceptor for a unary call here.

Here's something similar for asynchronously getting/setting an authorization header:

import {
  Deferred,
  RpcError,
  RpcInterceptor,
  RpcMetadata,
  RpcStatus,
  UnaryCall,
} from '@protobuf-ts/runtime-rpc';

import { asyncGetAuthHeader } from '../somewhere';

const myAuthInterceptor: RpcInterceptor = {
  interceptUnary(next, method, input, options) {
    const defHeader = new Deferred<RpcMetadata>(),
      defMessage = new Deferred<object>(),
      defStatus = new Deferred<RpcStatus>(),
      defTrailer = new Deferred<RpcMetadata>();

    if (!options.meta) {
      options.meta = {};
    }

    void (async () => {
      try {
        // set auth request header
        options.meta.Authorization = await asyncGetAuthHeader();

        // perform the actual call
        const result = next(method, input, options);

        // resolve all our Deferreds
        defHeader.resolve(result.headers);
        defMessage.resolve(result.response);
        defStatus.resolve(result.status);
        defTrailer.resolve(result.trailers);
      } catch (err) {
        defHeader.rejectPending(err);
        defMessage.rejectPending(err);
        defStatus.rejectPending(err);
        defTrailer.rejectPending(err);
      }
    })();

    return new UnaryCall(
      method,
      options.meta,
      input,
      defHeader.promise,
      defMessage.promise,
      defStatus.promise,
      defTrailer.promise
    );
  },
};

@dbo
Copy link
Author

dbo commented Aug 17, 2023

Thanks for sharing this, but this is too much bit magic for me. Code is more read than written and my co-workers would just stumble upon it.
For the time being I am going with lazy-initialized transport explicitly setting up the authorization meta.

I still would appreciate a leaner way with less code.

@daudfauzy98
Copy link

@jcready Please can you provide an example how to handle unauthenticated request (expired access token) too? So we should request a new token with refresh token provided. I'm struggling with that stuff

@jcready
Copy link
Contributor

jcready commented Aug 23, 2023

@daudfauzy98 How are you sending the access token? Ideally you'd know that your access token was expired before issuing the request. If you were able to know it was expired (or about to expire) then the above should basically be all you need to do. Where asyncGetAuthHeader is where you both use your refresh token to get a new access token if it is expired and return access token.

If you aren't able to know that your access token is expired (or about to expire) then you could do something like this (perform initial request, if it fails then refresh access token, set new access token in the request headers, attempt the request again):

import {
  Deferred,
  RpcError,
  RpcInterceptor,
  RpcMetadata,
  RpcStatus,
  UnaryCall,
} from '@protobuf-ts/runtime-rpc';

import { getCurrentAccessToken, refreshAccessToken } from '../somewhere';

const myAuthInterceptor: RpcInterceptor = {
    interceptUnary(next, method, input, options) {
      const defHeader = new Deferred<RpcMetadata>(),
        defMessage = new Deferred<object>(),
        defStatus = new Deferred<RpcStatus>(),
        defTrailer = new Deferred<RpcMetadata>(),
        performRequest = async () => {
          const result = await next(method, input, options);
          defHeader.resolve(result.headers);
          defMessage.resolve(result.response);
          defStatus.resolve(result.status);
          defTrailer.resolve(result.trailers);
        },
        fail = (err: unknown) => {
          defHeader.rejectPending(err);
          defMessage.rejectPending(err);
          defStatus.rejectPending(err);
          defTrailer.rejectPending(err);
        };
  
      if (!options.meta) {
        options.meta = {};
      }
  
      void (async () => {
        try {
          await performRequest();
        } catch (err) {
          if (err instanceof RpcError && (
            err.code == GrpcStatus[GrpcStatus.PERMISSION_DENIED] ||
            err.code == GrpcStatus[GrpcStatus.UNAUTHENTICATED]
          )) {
            try {
              // Refresh the access token
              await refreshAccessToken();
              // update the request headers
              options.meta.Authorization = await getCurrentAccessToken();
              await performRequest();
            } catch (e) {
              return fail(e);
            }
          }
          fail(err);
        }
      })();
  
      return new UnaryCall(
        method,
        options.meta,
        input,
        defHeader.promise,
        defMessage.promise,
        defStatus.promise,
        defTrailer.promise
      );
    },
  };

@daudfauzy98
Copy link

daudfauzy98 commented Aug 28, 2023

Thanks for the example! Currently I did something like your given code. I got the session that contains access and refresh tokens when user has login, then store it to the session storage. So inside the interceptor, I take back the access token from the session storage, then provide it to option.meta.Authorization and finally send the request.

It runs fine as long if single request was performed. The problem is when there are multiple requests and the access token is expired, so it uses the same token and will got rejected by the server. Each request will perform refreshAccessToken() at the most same time, that makes a race between them. So, whoever get the new token first would be success and failed for the rest because the refresh token has changed by the first finished request. How do you handle such condition?

@jcready
Copy link
Contributor

jcready commented Sep 7, 2023

We're getting a little outside the scope of protobuf-ts here. Basically you need the refreshAccessToken function to be memoized (i.e. return the same promise to all callers based on if we're already refreshing the token) and we need to prevent sending new requests while we're waiting for the refreshed token. Something like:

// ../somewhere.ts

let refreshPromise: Promise<void> | undefined;

export async function getCurrentAccessToken(): Promise<string> {
  await refreshPromise;
  return sessionStorage.getItem('accessToken') ?? '';
}

export function refreshAccessToken(): Promise<void> {
  return refreshPromise ?? (refreshPromise = (async () => {
    try {
      // Actually retrieve a new access token somehow and store it in sessionStorage
      sessionStorage.setItem('accessToken', await magic());
    } finally {
      refreshPromise = undefined;
    }
  })());
}

Then we need to modify the interceptor a bit:

      void (async () => {
        try {
          // set the request headers and prevent new requests while access token is refreshing
          options.meta.Authorization = await getCurrentAccessToken();
          await performRequest();
        } catch (err) {
          if (err instanceof RpcError && (
            err.code == GrpcStatus[GrpcStatus.PERMISSION_DENIED] ||
            err.code == GrpcStatus[GrpcStatus.UNAUTHENTICATED]
          )) {
            try {
              // Refresh the access token
              await refreshAccessToken();
              // update the request headers
              options.meta.Authorization = await getCurrentAccessToken();
              await performRequest();
            } catch (e) {
              return fail(e);
            }
          }
          fail(err);
        }
      })();

Again, you would be much better off if you could know the access token was about to expire and preemptively refresh the access token as it would avoid having to make requests we know are going to fail.

@lsmurray
Copy link

lsmurray commented Sep 17, 2023

Just want to confirm that this is a reasonable implementation of both client/server streaming. In particular that I've covered both the successful and error cases of the RpcOutputStreamController. I'm not super familiar with the internals of this library so was mostly guess work. One edge case I'm worried about is missing a message since I get the result prior to subscribing to messages.

import {
  Deferred,
  type RpcInterceptor,
  type RpcMetadata,
  RpcOutputStreamController,
  type RpcStatus,
  ServerStreamingCall,
  UnaryCall,
} from "@protobuf-ts/runtime-rpc";
import invariant from "tiny-invariant";

// async interceptors require reimplementing quite a few of the internals of protobuf-ts
// see https://github.com/timostamm/protobuf-ts/blob/main/packages/grpcweb-transport/src/grpc-web-transport.ts
// and https://github.com/timostamm/protobuf-ts/issues/580
// basic strategy is:
// 1. create a Deferred for each of the properties of the RpcCall
// 2. create a Promise to run the actual RPC call
// 3. return a new RpcCall using the Deferreds

async function getAccessToken(): Promise<string> {
  // eslint-disable-next-line unicorn/no-useless-promise-resolve-reject
  return Promise.resolve("fake-access-token");
}

export const createAsyncInterceptor = (): RpcInterceptor => {
  return {
    interceptUnary(next, method, input, options) {
      const defHeaders = new Deferred<RpcMetadata>();
      const defResponse = new Deferred<object>();
      const defStatus = new Deferred<RpcStatus>();
      const defTrailer = new Deferred<RpcMetadata>();

      if (!options.meta) {
        options.meta = {};
      }

      const runRPC = async () => {
        try {
          // set the access token
          invariant(options.meta);
          options.meta.Authorization = `Bearer ${await getAccessToken()}`;

          // perform the actual call
          const result = next(method, input, options);

          // resolve all our Deferreds
          defHeaders.resolve(result.headers);
          defResponse.resolve(result.response);
          defStatus.resolve(result.status);
          defTrailer.resolve(result.trailers);
        } catch (error) {
          defHeaders.rejectPending(error);
          defResponse.rejectPending(error);
          defStatus.rejectPending(error);
          defTrailer.rejectPending(error);
        }
      };

      // eslint-disable-next-line @typescript-eslint/no-floating-promises
      runRPC();

      const call = new UnaryCall(
        method,
        options.meta,
        input,
        defHeaders.promise,
        defResponse.promise,
        defStatus.promise,
        defTrailer.promise
      );

      return call;
    },
    interceptServerStreaming(next, method, input, options) {
      const defHeaders = new Deferred<RpcMetadata>();
      const responseStream = new RpcOutputStreamController<object>();
      const defStatus = new Deferred<RpcStatus>();
      const defTrailer = new Deferred<RpcMetadata>();

      if (!options.meta) {
        options.meta = {};
      }

      const runRPC = async () => {
        try {
          // set the access token
          invariant(options.meta);
          options.meta.Authorization = `Bearer ${await getAccessToken()}`;

          // perform the actual call
          const result = next(method, input, options);

          // resolve all our Deferreds
          defHeaders.resolve(result.headers);
          result.responses.onMessage((message) => {
            responseStream.notifyMessage(message);
          });
          result.responses.onComplete(() => {
            responseStream.notifyComplete();
          });
          defStatus.resolve(result.status);
          defTrailer.resolve(result.trailers);
        } catch (error) {
          invariant(error instanceof Error);
          defHeaders.rejectPending(error);
          responseStream.notifyError(error);
          defStatus.rejectPending(error);
          defTrailer.rejectPending(error);
        }
      };

      // eslint-disable-next-line @typescript-eslint/no-floating-promises
      runRPC();

      const call = new ServerStreamingCall(
        method,
        options.meta,
        input,
        defHeaders.promise,
        responseStream,
        defStatus.promise,
        defTrailer.promise
      );

      return call;
    },
  };
};

@jcready
Copy link
Contributor

jcready commented Sep 20, 2023

What you have basically does nothing since you aren't actually doing anything in the error handling (you aren't retrying the request with a new access token, just re-throwing the error). You're not awaiting the next() call so your error handling would never be run anyhow. You also have invariant(error instanceof Error); inside a catch which would (if your invariant didn't hold) cause your call to never resolve or reject (you probably don't want that).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants