Skip to content

Commit

Permalink
feat(forwarder): configs for KeepAlive params (#627)
Browse files Browse the repository at this point in the history
The GRPC KeepAlive params can now be configured in the config.yaml and
passed through the forwarder. By default, time is 30s, timeout 10s, and
permit without stream is enabled by default
  • Loading branch information
hspedro authored Jun 26, 2024
1 parent 4776096 commit 6daae52
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 8 deletions.
4 changes: 4 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ adapters:
inCluster: false
masterUrl: "https://127.0.0.1:6443"
kubeconfig: "./kubeconfig/kubeconfig.yaml"
grpc:
keepAlive:
time: 30s
timeout: 5s
workers:
syncInterval: 10s
stopTimeoutDuration: 2m
Expand Down
17 changes: 16 additions & 1 deletion docs/tutorials/EventsForwarding.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,19 @@ This event forwarding type uses the [GRPCForwarder service proto definition](htt
to forward events, this means that the external service should use gRPC protocol and implement this service to receive events.

#### Response
Maestro expects the forwarder event response to return a HTTP code, which is mapped internally to a gRPC code. This mapping is done in the [handlerGrpcClientResponse function](https://github.com/topfreegames/maestro/blob/main/internal/adapters/events/events_forwarder.go)
Maestro expects the forwarder event response to return a HTTP code, which is mapped internally to a gRPC code. This mapping is done in the [handlerGrpcClientResponse function](https://github.com/topfreegames/maestro/blob/main/internal/adapters/events/events_forwarder.go)

#### Client Configuration

The GRPC forwarder uses a client that will dial into the address of the forwarder configured in the scheduler and forward events to it. However, we need
to configure a KeepAlive mechanism so we constantly send HTTP/2 ping frames on the channel and detect broken connections. Without a KeepAlive mechanism,
broken TCP connections are only refreshed when Kernel kills the fd responsible due to inactivity, which can take up to 20 minutes.

Thus, there are some configurations that you can do to tweak the KeepAlive configuration. Those configs can be set either as an env var or in the `config.yaml`:

* `adapters.grpc.keepAlive.time`: After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive. If set below 10s, a minimum value of 10s will be used instead. Defaults to 30s.
* `adapters.grpc.keepAlive.timeout`: After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that the connection is closed. Defaults to 5s.

[GRPC Official Doc Reference](https://grpc.io/docs/guides/keepalive/)

[GRPC Internals Reference](https://github.com/grpc/grpc/blob/master/doc/keepalive.md)
33 changes: 29 additions & 4 deletions internal/adapters/events/forwarder_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,45 @@ var (
_ ports.ForwarderClient = (*ForwarderClient)(nil)
)

const (
DefaultKeepAliveTime = 30 * time.Second
DefaultKeepAliveTimeout = 10 * time.Second
DefaultKeepAlivePermitWithoutStream = true
)

type ForwarderClientConfig struct {
KeepAlive keepalive.ClientParameters
}

// ForwarderClient is a struct that holds grpc clients to be used by forwarders.
type ForwarderClient struct {
c *cache.Cache
c *cache.Cache
config ForwarderClientConfig
}

// NewForwarderClient instantiate a new grpc forwarder client.
func NewForwarderClient() *ForwarderClient {
func NewForwarderClient(keepAliveCfg keepalive.ClientParameters) *ForwarderClient {
cache := cache.New(24*time.Hour, 0)
cache.OnEvicted(func(_key string, clientFromCache interface{}) {
ForwarderClient := clientFromCache.(*grpc.ClientConn)
ForwarderClient.Close()
})
config := ForwarderClientConfig{
KeepAlive: keepalive.ClientParameters{
Time: DefaultKeepAliveTime,
Timeout: DefaultKeepAliveTimeout,
PermitWithoutStream: DefaultKeepAlivePermitWithoutStream,
},
}
if keepAliveCfg.Time > 0 {
config.KeepAlive.Time = keepAliveCfg.Time
}
if keepAliveCfg.Timeout > 0 {
config.KeepAlive.Timeout = keepAliveCfg.Timeout
}
return &ForwarderClient{
c: cache,
c: cache,
config: config,
}
}

Expand Down Expand Up @@ -170,7 +195,7 @@ func (f *ForwarderClient) createGRPCConnection(address string) (*grpc.ClientConn
conn, err := grpc.Dial(
address,
dialOption,
grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: 30 * time.Second, Timeout: 10 * time.Second, PermitWithoutStream: true}),
grpc.WithKeepaliveParams(f.config.KeepAlive),
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)),
)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions internal/adapters/events/forwarder_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/topfreegames/maestro/internal/core/entities/forwarder"
"github.com/topfreegames/maestro/test"
"google.golang.org/grpc/keepalive"

pb "github.com/topfreegames/protos/maestro/grpc/generated"
)
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestForwarderClient_SendRoomStatus(t *testing.T) {
)
require.NoError(t, err)
}
f := NewForwarderClient()
f := NewForwarderClient(keepalive.ClientParameters{})
got, err := f.SendRoomStatus(tt.args.ctx, tt.args.forwarder, &tt.args.in)
if !tt.wantErr(t, err, fmt.Sprintf("SendRoomStatus(%v, %v, %v)", tt.args.ctx, tt.args.forwarder, tt.args.in)) {
return
Expand Down Expand Up @@ -237,7 +238,7 @@ func TestSendPlayerEvent(t *testing.T) {
}

func basicArrangeForwarderClient(t *testing.T) {
forwarderClientAdapter = NewForwarderClient()
forwarderClientAdapter = NewForwarderClient(keepalive.ClientParameters{})
}

func newRoomEvent(mockIdentifier string) pb.RoomEvent {
Expand Down
10 changes: 9 additions & 1 deletion internal/service/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,17 @@ import (
"github.com/topfreegames/maestro/internal/core/services/rooms"
"github.com/topfreegames/maestro/internal/core/services/schedulers"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"google.golang.org/grpc/keepalive"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

// configurations paths for the adapters
const (
// GRPC KeepAlive Configs
grpcKeepAliveTimePath = "adapters.grpc.keepalive.time"
grpcKeepAliveTimeoutPath = "adapters.grpc.keepalive.timeout"
// Kubernetes runtime
runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl"
runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig"
Expand Down Expand Up @@ -110,7 +114,11 @@ func NewRoomManager(clock ports.Clock, portAllocator ports.PortAllocator, roomSt

// NewEventsForwarder instantiates GRPC as events forwarder.
func NewEventsForwarder(c config.Config) (ports.EventsForwarder, error) {
forwarderGrpc := eventsadapters.NewForwarderClient()
keepAliveCfg := keepalive.ClientParameters{
Time: c.GetDuration(grpcKeepAliveTimePath),
Timeout: c.GetDuration(grpcKeepAliveTimeoutPath),
}
forwarderGrpc := eventsadapters.NewForwarderClient(keepAliveCfg)
return eventsadapters.NewEventsForwarder(forwarderGrpc), nil
}

Expand Down

0 comments on commit 6daae52

Please sign in to comment.