Skip to content

Commit

Permalink
Add storage engine index (#1056)
Browse files Browse the repository at this point in the history
Introduce storage index runtime functions:
* RegisterStorageIndex
* RegisterStorageIndexFilter
* StorageIndexList
  • Loading branch information
sesposito authored Jul 17, 2023
1 parent 83ac9c4 commit 6470d81
Show file tree
Hide file tree
Showing 33 changed files with 1,469 additions and 333 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
### Added
- Introduce pagination for console API leaderboard and tournament listing endpoint.
- Introduce pagination for devconsole leaderboard view.
- Add storage object indexing support and related runtime functions.

### Changed
- Better formatting for graphed values in devconsole status view.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0
github.com/heroiclabs/nakama-common v0.0.0-20230713130524-38774b285b66
github.com/heroiclabs/nakama-common v1.27.1-0.20230717184507-dff09d7c8047
github.com/jackc/pgconn v1.14.0
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgtype v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/heroiclabs/nakama-common v0.0.0-20230713130524-38774b285b66 h1:hcZM9fsheM4xprMyszVL5n+YLl2MBGlhRIrYUV0/tg0=
github.com/heroiclabs/nakama-common v0.0.0-20230713130524-38774b285b66/go.mod h1:Os8XeXGvHAap/p6M/8fQ3gle4eEXDGRQmoRNcPQTjXs=
github.com/heroiclabs/nakama-common v1.27.1-0.20230717184507-dff09d7c8047 h1:BnNhnDQBeQyPigo2F162XwYYP7RxrEgxMJx992SLK+g=
github.com/heroiclabs/nakama-common v1.27.1-0.20230717184507-dff09d7c8047/go.mod h1:Os8XeXGvHAap/p6M/8fQ3gle4eEXDGRQmoRNcPQTjXs=
github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down
29 changes: 15 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ package main

import (
"context"
cryptoRand "crypto/rand"
"encoding/binary"
"flag"
"fmt"
"math/rand"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -104,14 +101,6 @@ func main() {
startupLogger.Info("Node", zap.String("name", config.GetName()), zap.String("version", semver), zap.String("runtime", runtime.Version()), zap.Int("cpu", runtime.NumCPU()), zap.Int("proc", runtime.GOMAXPROCS(0)))
startupLogger.Info("Data directory", zap.String("path", config.GetDataDir()))

// Initialize the global random with strongly seed.
var seed int64
if err := binary.Read(cryptoRand.Reader, binary.BigEndian, &seed); err != nil {
startupLogger.Warn("Failed to get strongly random seed, fallback to a less random one.", zap.Error(err))
seed = time.Now().UnixNano()
}
rand.Seed(seed)

redactedAddresses := make([]string, 0, 1)
for _, address := range config.GetDatabase().Addresses {
rawURL := fmt.Sprintf("postgres://%s", address)
Expand Down Expand Up @@ -153,7 +142,12 @@ func main() {
tracker.SetMatchJoinListener(matchRegistry.Join)
tracker.SetMatchLeaveListener(matchRegistry.Leave)
streamManager := server.NewLocalStreamManager(config, sessionRegistry, tracker)
runtime, runtimeInfo, err := server.NewRuntime(ctx, logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router)

storageIndex, err := server.NewLocalStorageIndex(logger, db)
if err != nil {
logger.Fatal("Failed to initialize storage index", zap.Error(err))
}
runtime, runtimeInfo, err := server.NewRuntime(ctx, logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, storageIndex)
if err != nil {
startupLogger.Fatal("Failed initializing runtime modules", zap.Error(err))
}
Expand All @@ -162,14 +156,21 @@ func main() {
tracker.SetPartyJoinListener(partyRegistry.Join)
tracker.SetPartyLeaveListener(partyRegistry.Leave)

storageIndex.RegisterFilters(runtime)
go func() {
if err = storageIndex.Load(ctx); err != nil {
logger.Error("Failed to load storage index entries from database", zap.Error(err))
}
}()

leaderboardScheduler.Start(runtime)
googleRefundScheduler.Start(runtime)

pipeline := server.NewPipeline(logger, config, db, jsonpbMarshaler, jsonpbUnmarshaler, sessionRegistry, statusRegistry, matchRegistry, partyRegistry, matchmaker, tracker, router, runtime)
statusHandler := server.NewLocalStatusHandler(logger, sessionRegistry, matchRegistry, tracker, metrics, config.GetName())

apiServer := server.StartApiServer(logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, sessionRegistry, sessionCache, statusRegistry, matchRegistry, matchmaker, tracker, router, streamManager, metrics, pipeline, runtime)
consoleServer := server.StartConsoleServer(logger, startupLogger, db, config, tracker, router, streamManager, metrics, sessionRegistry, sessionCache, consoleSessionCache, loginAttemptCache, statusRegistry, statusHandler, runtimeInfo, matchRegistry, configWarnings, semver, leaderboardCache, leaderboardRankCache, leaderboardScheduler, apiServer, runtime, cookie)
apiServer := server.StartApiServer(logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, version, socialClient, storageIndex, leaderboardCache, leaderboardRankCache, sessionRegistry, sessionCache, statusRegistry, matchRegistry, matchmaker, tracker, router, streamManager, metrics, pipeline, runtime)
consoleServer := server.StartConsoleServer(logger, startupLogger, db, config, tracker, router, streamManager, metrics, sessionRegistry, sessionCache, consoleSessionCache, loginAttemptCache, statusRegistry, statusHandler, runtimeInfo, matchRegistry, configWarnings, semver, leaderboardCache, leaderboardRankCache, leaderboardScheduler, storageIndex, apiServer, runtime, cookie)

gaenabled := len(os.Getenv("NAKAMA_TELEMETRY")) < 1
console.UIFS.Nt = !gaenabled
Expand Down
4 changes: 3 additions & 1 deletion server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type ApiServer struct {
config Config
version string
socialClient *social.Client
storageIndex StorageIndex
leaderboardCache LeaderboardCache
leaderboardRankCache LeaderboardRankCache
sessionCache SessionCache
Expand All @@ -84,7 +85,7 @@ type ApiServer struct {
grpcGatewayServer *http.Server
}

func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, version string, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, streamManager StreamManager, metrics Metrics, pipeline *Pipeline, runtime *Runtime) *ApiServer {
func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, protojsonUnmarshaler *protojson.UnmarshalOptions, config Config, version string, socialClient *social.Client, storageIndex StorageIndex, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, streamManager StreamManager, metrics Metrics, pipeline *Pipeline, runtime *Runtime) *ApiServer {
var gatewayContextTimeoutMs string
if config.GetSocket().IdleTimeoutMs > 500 {
// Ensure the GRPC Gateway timeout is just under the idle timeout (if possible) to ensure it has priority.
Expand Down Expand Up @@ -119,6 +120,7 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, p
socialClient: socialClient,
leaderboardCache: leaderboardCache,
leaderboardRankCache: leaderboardRankCache,
storageIndex: storageIndex,
sessionCache: sessionCache,
sessionRegistry: sessionRegistry,
statusRegistry: statusRegistry,
Expand Down
2 changes: 1 addition & 1 deletion server/api_leaderboard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ nk.leaderboard_create(%q, %v, %q, %q, reset, metadata)
pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, tracker, router, runtime)

apiServer := StartApiServer(logger, logger, db, protojsonMarshaler,
protojsonUnmarshaler, cfg, "3.0.0", nil, rtData.leaderboardCache,
protojsonUnmarshaler, cfg, "3.0.0", nil, nil, rtData.leaderboardCache,
rtData.leaderboardRankCache, nil, sessionCache,
nil, nil, nil, tracker, router, nil, metrics, pipeline, runtime)

Expand Down
4 changes: 2 additions & 2 deletions server/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *ApiServer) WriteStorageObjects(ctx context.Context, in *api.WriteStorag
})
}

acks, code, err := StorageWriteObjects(ctx, s.logger, s.db, s.metrics, false, ops)
acks, code, err := StorageWriteObjects(ctx, s.logger, s.db, s.metrics, s.storageIndex, false, ops)
if err != nil {
if code == codes.Internal {
return nil, status.Error(codes.Internal, "Error writing storage objects.")
Expand Down Expand Up @@ -279,7 +279,7 @@ func (s *ApiServer) DeleteStorageObjects(ctx context.Context, in *api.DeleteStor
})
}

if code, err := StorageDeleteObjects(ctx, s.logger, s.db, false, ops); err != nil {
if code, err := StorageDeleteObjects(ctx, s.logger, s.db, s.storageIndex, false, ops); err != nil {
if code == codes.Internal {
return nil, status.Error(codes.Internal, "Error deleting storage objects.")
}
Expand Down
7 changes: 4 additions & 3 deletions server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ var (
protojsonUnmarshaler = &protojson.UnmarshalOptions{
DiscardUnknown: false,
}
metrics = NewLocalMetrics(logger, logger, nil, cfg)
_ = CheckConfig(logger, cfg)
metrics = NewLocalMetrics(logger, logger, nil, cfg)
storageIdx, _ = NewLocalStorageIndex(logger, nil, []StorageIndexConfig{})
_ = CheckConfig(logger, cfg)
)

type DummyMessageRouter struct{}
Expand Down Expand Up @@ -199,7 +200,7 @@ func NewAPIServer(t *testing.T, runtime *Runtime) (*ApiServer, *Pipeline) {
tracker := &LocalTracker{}
sessionCache := NewLocalSessionCache(3600)
pipeline := NewPipeline(logger, cfg, db, protojsonMarshaler, protojsonUnmarshaler, nil, nil, nil, nil, nil, tracker, router, runtime)
apiServer := StartApiServer(logger, logger, db, protojsonMarshaler, protojsonUnmarshaler, cfg, "3.0.0", nil, nil, nil, nil, sessionCache, nil, nil, nil, tracker, router, nil, metrics, pipeline, runtime)
apiServer := StartApiServer(logger, logger, db, protojsonMarshaler, protojsonUnmarshaler, cfg, "3.0.0", nil, storageIdx, nil, nil, nil, sessionCache, nil, nil, nil, tracker, router, nil, metrics, pipeline, runtime)

WaitForSocket(nil, cfg)
return apiServer, pipeline
Expand Down
4 changes: 3 additions & 1 deletion server/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type ConsoleServer struct {
statusRegistry *StatusRegistry
matchRegistry MatchRegistry
statusHandler StatusHandler
storageIndex StorageIndex
runtimeInfo *RuntimeInfo
configWarnings map[string]string
serverVersion string
Expand All @@ -164,7 +165,7 @@ type ConsoleServer struct {
httpClient *http.Client
}

func StartConsoleServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, config Config, tracker Tracker, router MessageRouter, streamManager StreamManager, metrics Metrics, sessionRegistry SessionRegistry, sessionCache SessionCache, consoleSessionCache SessionCache, loginAttemptCache LoginAttemptCache, statusRegistry *StatusRegistry, statusHandler StatusHandler, runtimeInfo *RuntimeInfo, matchRegistry MatchRegistry, configWarnings map[string]string, serverVersion string, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, api *ApiServer, runtime *Runtime, cookie string) *ConsoleServer {
func StartConsoleServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, config Config, tracker Tracker, router MessageRouter, streamManager StreamManager, metrics Metrics, sessionRegistry SessionRegistry, sessionCache SessionCache, consoleSessionCache SessionCache, loginAttemptCache LoginAttemptCache, statusRegistry *StatusRegistry, statusHandler StatusHandler, runtimeInfo *RuntimeInfo, matchRegistry MatchRegistry, configWarnings map[string]string, serverVersion string, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, storageIndex StorageIndex, api *ApiServer, runtime *Runtime, cookie string) *ConsoleServer {
var gatewayContextTimeoutMs string
if config.GetConsole().IdleTimeoutMs > 500 {
// Ensure the GRPC Gateway timeout is just under the idle timeout (if possible) to ensure it has priority.
Expand Down Expand Up @@ -205,6 +206,7 @@ func StartConsoleServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.D
leaderboardCache: leaderboardCache,
leaderboardRankCache: leaderboardRankCache,
leaderboardScheduler: leaderboardScheduler,
storageIndex: storageIndex,
api: api,
cookie: cookie,
httpClient: &http.Client{Timeout: 5 * time.Second},
Expand Down
4 changes: 2 additions & 2 deletions server/console_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *ConsoleServer) DeleteStorageObject(ctx context.Context, in *console.Del
return nil, status.Error(codes.InvalidArgument, "Requires a valid user ID.")
}

code, err := StorageDeleteObjects(ctx, s.logger, s.db, true, StorageOpDeletes{
code, err := StorageDeleteObjects(ctx, s.logger, s.db, s.storageIndex, true, StorageOpDeletes{
&StorageOpDelete{
OwnerID: in.UserId,
ObjectID: &api.DeleteStorageObjectId{
Expand Down Expand Up @@ -334,7 +334,7 @@ func (s *ConsoleServer) WriteStorageObject(ctx context.Context, in *console.Writ
return nil, status.Error(codes.InvalidArgument, "Requires a valid JSON object value.")
}

acks, code, err := StorageWriteObjects(ctx, s.logger, s.db, s.metrics, true, StorageOpWrites{
acks, code, err := StorageWriteObjects(ctx, s.logger, s.db, s.metrics, s.storageIndex, true, StorageOpWrites{
&StorageOpWrite{
OwnerID: in.UserId,
Object: &api.WriteStorageObject{
Expand Down
12 changes: 6 additions & 6 deletions server/console_storage_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ func (s *ConsoleServer) importStorage(w http.ResponseWriter, r *http.Request) {
// Examine file name to determine if it's a JSON or CSV import.
if strings.HasSuffix(strings.ToLower(filename), ".json") {
// File has .json suffix, try to import as JSON.
err = importStorageJSON(r.Context(), s.logger, s.db, s.metrics, fileBytes)
err = importStorageJSON(r.Context(), s.logger, s.db, s.metrics, s.storageIndex, fileBytes)
} else {
// Assume all other files are CSV.
err = importStorageCSV(r.Context(), s.logger, s.db, s.metrics, fileBytes)
err = importStorageCSV(r.Context(), s.logger, s.db, s.metrics, s.storageIndex, fileBytes)
}

if err != nil {
Expand All @@ -145,7 +145,7 @@ func (s *ConsoleServer) importStorage(w http.ResponseWriter, r *http.Request) {
}
}

func importStorageJSON(ctx context.Context, logger *zap.Logger, db *sql.DB, metrics Metrics, fileBytes []byte) error {
func importStorageJSON(ctx context.Context, logger *zap.Logger, db *sql.DB, metrics Metrics, storageIndex StorageIndex, fileBytes []byte) error {
importedData := make([]*importStorageObject, 0)
ops := StorageOpWrites{}

Expand Down Expand Up @@ -200,7 +200,7 @@ func importStorageJSON(ctx context.Context, logger *zap.Logger, db *sql.DB, metr
return nil
}

acks, _, err := StorageWriteObjects(ctx, logger, db, metrics, true, ops)
acks, _, err := StorageWriteObjects(ctx, logger, db, metrics, storageIndex, true, ops)
if err != nil {
logger.Warn("Failed to write imported records.", zap.Error(err))
return errors.New("could not import records due to an internal error - please consult server logs")
Expand All @@ -210,7 +210,7 @@ func importStorageJSON(ctx context.Context, logger *zap.Logger, db *sql.DB, metr
return nil
}

func importStorageCSV(ctx context.Context, logger *zap.Logger, db *sql.DB, metrics Metrics, fileBytes []byte) error {
func importStorageCSV(ctx context.Context, logger *zap.Logger, db *sql.DB, metrics Metrics, storageIndex StorageIndex, fileBytes []byte) error {
r := csv.NewReader(bytes.NewReader(fileBytes))

columnIndexes := make(map[string]int)
Expand Down Expand Up @@ -300,7 +300,7 @@ func importStorageCSV(ctx context.Context, logger *zap.Logger, db *sql.DB, metri
return nil
}

acks, _, err := StorageWriteObjects(ctx, logger, db, metrics, true, ops)
acks, _, err := StorageWriteObjects(ctx, logger, db, metrics, storageIndex, true, ops)
if err != nil {
logger.Warn("Failed to write imported records.", zap.Error(err))
return errors.New("could not import records due to an internal error - please consult server logs")
Expand Down
9 changes: 6 additions & 3 deletions server/core_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func StorageListObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, cal
// Call from the runtime.
if ownerID == nil {
// List storage regardless of user.
// TODO
result, resultErr = StorageListObjectsAll(ctx, logger, db, true, collection, limit, cursor, sc)
} else {
// List for a particular user ID.
Expand Down Expand Up @@ -464,7 +463,7 @@ WHERE
return objects, err
}

func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, metrics Metrics, authoritativeWrite bool, ops StorageOpWrites) (*api.StorageObjectAcks, codes.Code, error) {
func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, metrics Metrics, storageIndex StorageIndex, authoritativeWrite bool, ops StorageOpWrites) (*api.StorageObjectAcks, codes.Code, error) {
var acks []*api.StorageObjectAck

if err := ExecuteInTx(ctx, db, func(tx *sql.Tx) error {
Expand All @@ -483,6 +482,8 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, me
return nil, codes.Internal, err
}

storageIndex.Write(ctx, ops)

return &api.StorageObjectAcks{Acks: acks}, codes.OK, nil
}

Expand Down Expand Up @@ -635,7 +636,7 @@ func storageWriteObject(ctx context.Context, logger *zap.Logger, metrics Metrics
return ack, nil
}

func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, authoritativeDelete bool, ops StorageOpDeletes) (codes.Code, error) {
func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, storageIndex StorageIndex, authoritativeDelete bool, ops StorageOpDeletes) (codes.Code, error) {
// Ensure deletes are processed in a consistent order.
sort.Sort(ops)

Expand Down Expand Up @@ -681,5 +682,7 @@ func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, a
return codes.Internal, err
}

storageIndex.Delete(ctx, ops)

return codes.OK, nil
}
Loading

0 comments on commit 6470d81

Please sign in to comment.