Skip to content

Commit

Permalink
Implement test backend that will fetch matches, assign tickets and de…
Browse files Browse the repository at this point in the history
…lete tickets at scale (#804)
  • Loading branch information
sawagh authored Sep 18, 2019
1 parent 12e5a37 commit 21cf069
Showing 1 changed file with 141 additions and 4 deletions.
145 changes: 141 additions & 4 deletions examples/scale/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,36 @@
package backend

import (
"context"
"fmt"
"io"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
"open-match.dev/open-match/examples/scale/profiles"
"open-match.dev/open-match/internal/config"
"open-match.dev/open-match/internal/logging"
"open-match.dev/open-match/internal/rpc"
"open-match.dev/open-match/pkg/pb"
)

var (
logger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "scale.backend",
})

// TODO: Add metrics to track matches created, tickets assigned, deleted.
matchCount uint64
assigned uint64
deleted uint64
)

// Run triggers execution of the scale backend component that fetches
// matches at scale from Open Match.
// Run triggers execution of functions that continuously fetch, assign and
// delete matches.
func Run() {
cfg, err := config.Read()
if err != nil {
Expand All @@ -39,8 +54,130 @@ func Run() {
}

logging.ConfigureLogging(cfg)
beConn, err := rpc.GRPCClientFromConfig(cfg, "api.backend")
if err != nil {
logger.Fatalf("failed to connect to Open Match Backend, got %v", err)
}

defer beConn.Close()
be := pb.NewBackendClient(beConn)

feConn, err := rpc.GRPCClientFromConfig(cfg, "api.frontend")
if err != nil {
logger.Fatalf("failed to connect to Open Match Frontend, got %v", err)
}

defer feConn.Close()
fe := pb.NewFrontendClient(feConn)

// TODO: This is a placeholder - add the actual implementation.
// The buffered channels attempt to decouple fetch, assign and delete. It is
// best effort and these operations may still block each other if buffers are full.
matches := make(chan *pb.Match, 1000)
deleteIds := make(chan string, 1000)

go doFetch(cfg, be, matches)
go doAssign(be, matches, deleteIds)
go doDelete(fe, deleteIds)

// The above goroutines run forever and so the main goroutine needs to block.
select {}
}

// doFetch continuously fetches all profiles in a loop and queues up the fetched
// matches for assignment.
func doFetch(cfg config.View, be pb.BackendClient, matches chan *pb.Match) {
startTime := time.Now()
mprofiles := profiles.Generate(cfg)
logger.Infof("Generated Profiles: %v", mprofiles)
for {
var wg sync.WaitGroup
for _, p := range mprofiles {
wg.Add(1)
p := p
go func(wg *sync.WaitGroup) {
defer wg.Done()
fetch(be, p, matches)
}(&wg)
}

// Wait for all FetchMatches calls to complete before proceeding.
wg.Wait()
logger.Infof("FetchedMatches:%v, AssignedTickets:%v, DeletedTickets:%v in time %v", atomic.LoadUint64(&matchCount), atomic.LoadUint64(&assigned), atomic.LoadUint64(&deleted), time.Since(startTime))
}
}

func fetch(be pb.BackendClient, p *pb.MatchProfile, matches chan *pb.Match) {
req := &pb.FetchMatchesRequest{
Config: &pb.FunctionConfig{
Host: "om-function",
Port: 50502,
Type: pb.FunctionConfig_GRPC,
},
Profiles: []*pb.MatchProfile{p},
}

stream, err := be.FetchMatches(context.Background(), req)
if err != nil {
logger.Errorf("FetchMatches failed, got %v", err)
return
}

for {
resp, err := stream.Recv()
if err == io.EOF {
return
}

if err != nil {
logger.Errorf("FetchMatches failed, got %v", err)
return
}

matches <- resp.GetMatch()
atomic.AddUint64(&matchCount, 1)
}
}

// doAssign continuously assigns matches that were queued in the matches channel
// by doFetch and after successful assignment, queues all the tickets to deleteIds
// channel for deletion by doDelete.
func doAssign(be pb.BackendClient, matches chan *pb.Match, deleteIds chan string) {
for match := range matches {
ids := []string{}
for _, t := range match.Tickets {
ids = append(ids, t.Id)
}

req := &pb.AssignTicketsRequest{
TicketIds: ids,
Assignment: &pb.Assignment{
Connection: fmt.Sprintf("%d.%d.%d.%d:2222", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256)),
},
}

if _, err := be.AssignTickets(context.Background(), req); err != nil {
logger.Errorf("AssignTickets failed, got %v", err)
continue
}

atomic.AddUint64(&assigned, uint64(len(ids)))
for _, id := range ids {
deleteIds <- id
}
}
}

// doDelete deletes all the tickets whose ids get added to the deleteIds channel.
func doDelete(fe pb.FrontendClient, deleteIds chan string) {
for id := range deleteIds {
req := &pb.DeleteTicketRequest{
TicketId: id,
}

if _, err := fe.DeleteTicket(context.Background(), req); err != nil {
logger.Errorf("DeleteTicket failed for ticket %v, got %v", id, err)
continue
}

atomic.AddUint64(&deleted, 1)
}
}

0 comments on commit 21cf069

Please sign in to comment.