Skip to content

Commit

Permalink
Fix RPC functionality (#21)
Browse files Browse the repository at this point in the history
API changes:

- Return and expect `Map<String, Object?>?` for payload in `Client.rpc` and `Socket.rpc` and en/decode it as JSON
- Fix Client.rpc over REST
- Add `httpKey` to `Client.rpc`
  • Loading branch information
blaugold authored Aug 30, 2024
1 parent 033aa66 commit 7d3a8da
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 188 deletions.
1 change: 0 additions & 1 deletion nakama/lib/nakama.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export 'src/models/matchmaker.dart'
PartyMatchmakerTicket;
export 'src/models/notification.dart' show Notification, NotificationList;
export 'src/models/party.dart' show PartyData, PartyLeader, PartyPresenceEvent;
export 'src/models/rpc.dart' show Rpc;
export 'src/models/session.dart' show Session;
export 'src/models/status.dart'
show
Expand Down
29 changes: 19 additions & 10 deletions nakama/lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -929,9 +929,12 @@ abstract interface class Client {
///
/// - [id] The ID of the function to execute.
/// - [payload] The payload to send with the function call.
Future<String?> rpc({
/// - [httpKey] The HTTP key to use for the function call. Not supported by
/// gRPC protocol.
Future<Map<String, Object?>?> rpc({
required String id,
String? payload,
Map<String, Object?>? payload,
String? httpKey,
});
}

Expand All @@ -949,6 +952,7 @@ abstract base class ClientBase implements Client {
});

static bool get _withoutSession => Zone.current[#_withoutSession]! as bool;
static String get _httpKey => Zone.current[#_httpKey]! as String;

@override
final String host;
Expand All @@ -971,13 +975,14 @@ abstract base class ClientBase implements Client {
String get authorizationHeader {
return switch (session) {
final session? when !_withoutSession => 'Bearer ${session.token}',
_ => 'Basic ${base64Encode('$serverKey:'.codeUnits)}'
_ => 'Basic ${base64Encode('$_httpKey:'.codeUnits)}'
};
}

Future<T> _performRequest<T>(
Future<T> Function() request, {
bool withoutSession = false,
String? httpKey,
}) async {
if (session
case Session(isExpired: true, isRefreshExpired: false, :final vars)
Expand All @@ -997,7 +1002,10 @@ abstract base class ClientBase implements Client {
try {
return await runZoned(
request,
zoneValues: {#_withoutSession: withoutSession},
zoneValues: {
#_withoutSession: withoutSession,
#_httpKey: httpKey ?? serverKey,
},
);
} on Exception catch (exception) {
if (translateException(exception) case final translatedException?) {
Expand Down Expand Up @@ -1517,9 +1525,9 @@ abstract base class ClientBase implements Client {
});

@visibleForOverriding
Future<String?> performRpc({
Future<Map<String, Object?>?> performRpc({
required String id,
String? payload,
Map<String, Object?>? payload,
});

@override
Expand All @@ -1539,7 +1547,7 @@ abstract base class ClientBase implements Client {
);
}

session = await _performRequest(withoutSession: true,() {
session = await _performRequest(withoutSession: true, () {
return performSessionRefresh(vars: vars);
});

Expand Down Expand Up @@ -2580,11 +2588,12 @@ abstract base class ClientBase implements Client {
}

@override
Future<String?> rpc({
Future<Map<String, Object?>?> rpc({
required String id,
String? payload,
Map<String, Object?>? payload,
String? httpKey,
}) {
return _performRequest(() {
return _performRequest(httpKey: httpKey, () {
return performRpc(
id: id,
payload: payload,
Expand Down
24 changes: 20 additions & 4 deletions nakama/lib/src/grpc_client.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:grpc/grpc.dart' hide Client;
Expand Down Expand Up @@ -1207,18 +1208,33 @@ final class GrpcClient extends ClientBase {
}

@override
Future<String?> performRpc({
Future<Map<String, Object?>?> performRpc({
required String id,
String? payload,
Map<String, Object?>? payload,
}) async {
final res = await _client.rpcFunc(
api.Rpc(
id: id,
payload: payload,
payload: payload != null ? jsonEncode(payload) : null,
),
);

return res.payload;
return res.payload.isEmpty ? null : jsonDecode(res.payload);
}

@override
Future<Map<String, Object?>?> rpc({
required String id,
Map<String, Object?>? payload,
String? httpKey,
}) async {
if (httpKey != null) {
throw NakamaError(
code: ErrorCode.invalidArgument,
message: 'RPC with HTTP key is not supported by gRPC protocol.',
);
}
return super.rpc(id: id, payload: payload, httpKey: httpKey);
}

@override
Expand Down
16 changes: 0 additions & 16 deletions nakama/lib/src/models/rpc.dart

This file was deleted.

126 changes: 0 additions & 126 deletions nakama/lib/src/models/rpc.freezed.dart

This file was deleted.

16 changes: 11 additions & 5 deletions nakama/lib/src/rest_client.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:convert';
import 'dart:io';

import 'package:dio/dio.dart';
Expand Down Expand Up @@ -1137,15 +1138,20 @@ final class RestClient extends ClientBase {
}

@override
Future<String?> performRpc({
Future<Map<String, Object?>?> performRpc({
required String id,
String? payload,
Map<String, Object?>? payload,
}) async {
final result = await switch (payload) {
final payload? => _api.rpcFunc(id: id, body: payload),
_ => _api.rpcFunc2(id: id)
final payload? => _api.rpcFunc(
id: id,
// The payload is double-encoded because the Nakama server expects
// it this way.
body: jsonEncode(jsonEncode(payload)),
),
_ => _api.rpcFunc2(id: id),
};

return result.payload;
return result.payload?.isEmpty ?? true ? null : jsonDecode(result.payload!);
}
}
23 changes: 17 additions & 6 deletions nakama/lib/src/socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import 'models/match.dart';
import 'models/matchmaker.dart';
import 'models/notification.dart';
import 'models/party.dart';
import 'models/rpc.dart';
import 'models/status.dart';

/// A socket for real-time communication with the Nakama server.
Expand Down Expand Up @@ -83,7 +82,10 @@ abstract class Socket {

Future<void> removeMatchmaker(String ticket);

Future<Rpc> rpc({required String id, String? payload});
Future<Map<String, Object?>?> rpc({
required String id,
Map<String, Object?>? payload,
});

Future<List<UserPresence>> followUsers({
List<String>? userIds,
Expand Down Expand Up @@ -287,7 +289,7 @@ class SocketImpl implements Socket {
PartyMatchmakerTicket.fromDto(envelope.partyMatchmakerTicket),
);
case rtapi.Envelope_Message.rpc:
_completePendingRequest(envelope.cid, Rpc.fromDto(envelope.rpc));
_completePendingRequest(envelope.cid, envelope.rpc);
case rtapi.Envelope_Message.party:
_completePendingRequest(envelope.cid, Party.fromDto(envelope.party));
case rtapi.Envelope_Message.channelPresenceEvent:
Expand Down Expand Up @@ -595,12 +597,21 @@ class SocketImpl implements Socket {
}

@override
Future<Rpc> rpc({required String id, String? payload}) {
return _send(
Future<Map<String, Object?>?> rpc({
required String id,
Map<String, Object?>? payload,
}) async {
final result = await _send<api.Rpc>(
rtapi.Envelope(
rpc: api.Rpc(id: id, payload: payload),
rpc: api.Rpc(
id: id,
payload: payload != null ? jsonEncode(payload) : null,
),
),
);
return result.payload.isEmpty
? null
: jsonDecode(result.payload) as Map<String, Object?>;
}

@override
Expand Down
Loading

0 comments on commit 7d3a8da

Please sign in to comment.