Skip to content

Commit

Permalink
[server] Add support for file-data (#2662)
Browse files Browse the repository at this point in the history
## Description

## Tests
  • Loading branch information
ua741 authored Aug 12, 2024
2 parents ac64aad + 4ce3362 commit 6259a97
Show file tree
Hide file tree
Showing 23 changed files with 1,589 additions and 48 deletions.
25 changes: 21 additions & 4 deletions server/cmd/museum/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
b64 "encoding/base64"
"fmt"
"github.com/ente-io/museum/pkg/controller/file_copy"
"github.com/ente-io/museum/pkg/controller/filedata"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
castRepo "github.com/ente-io/museum/pkg/repo/cast"
"github.com/ente-io/museum/pkg/repo/datacleanup"
"github.com/ente-io/museum/pkg/repo/embedding"
fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata"
"github.com/ente-io/museum/pkg/repo/kex"
"github.com/ente-io/museum/pkg/repo/passkey"
"github.com/ente-io/museum/pkg/repo/remotestore"
Expand Down Expand Up @@ -134,7 +136,6 @@ func main() {
}, []string{"method"})

s3Config := s3config.NewS3Config()

passkeysRepo, err := passkey.NewRepository(db)
if err != nil {
panic(err)
Expand Down Expand Up @@ -162,6 +163,7 @@ func main() {
fileRepo := &repo.FileRepository{DB: db, S3Config: s3Config, QueueRepo: queueRepo,
ObjectRepo: objectRepo, ObjectCleanupRepo: objectCleanupRepo,
ObjectCopiesRepo: objectCopiesRepo, UsageRepo: usageRepo}
fileDataRepo := &fileDataRepo.Repository{DB: db}
familyRepo := &repo.FamilyRepository{DB: db}
trashRepo := &repo.TrashRepository{DB: db, ObjectRepo: objectRepo, FileRepo: fileRepo, QueueRepo: queueRepo}
publicCollectionRepo := repo.NewPublicCollectionRepository(db, viper.GetString("apps.public-albums"))
Expand Down Expand Up @@ -238,6 +240,9 @@ func main() {
FileRepo: fileRepo,
}

accessCtrl := access.NewAccessController(collectionRepo, fileRepo)
fileDataCtrl := filedata.New(fileDataRepo, accessCtrl, objectCleanupController, s3Config, fileRepo, collectionRepo)

fileController := &controller.FileController{
FileRepo: fileRepo,
ObjectRepo: objectRepo,
Expand Down Expand Up @@ -287,8 +292,6 @@ func main() {
JwtSecret: jwtSecretBytes,
}

accessCtrl := access.NewAccessController(collectionRepo, fileRepo)

collectionController := &controller.CollectionController{
CollectionRepo: collectionRepo,
AccessCtrl: accessCtrl,
Expand Down Expand Up @@ -401,13 +404,21 @@ func main() {
fileHandler := &api.FileHandler{
Controller: fileController,
FileCopyCtrl: fileCopyCtrl,
FileDataCtrl: fileDataCtrl,
}
privateAPI.GET("/files/upload-urls", fileHandler.GetUploadURLs)
privateAPI.GET("/files/multipart-upload-urls", fileHandler.GetMultipartUploadURLs)
privateAPI.GET("/files/download/:fileID", fileHandler.Get)
privateAPI.GET("/files/download/v2/:fileID", fileHandler.Get)
privateAPI.GET("/files/preview/:fileID", fileHandler.GetThumbnail)
privateAPI.GET("/files/preview/v2/:fileID", fileHandler.GetThumbnail)

privateAPI.PUT("/files/data/", fileHandler.PutFileData)
privateAPI.POST("files/data/fetch", fileHandler.GetFilesData)
privateAPI.GET("files/data/fetch", fileHandler.GetFileData)
privateAPI.GET("/files/data/preview-upload-url/", fileHandler.GetPreviewUploadURL)
privateAPI.GET("/files/data/preview/", fileHandler.GetPreviewURL)

privateAPI.POST("/files", fileHandler.CreateOrUpdate)
privateAPI.POST("/files/copy", fileHandler.CopyFiles)
privateAPI.PUT("/files/update", fileHandler.Update)
Expand Down Expand Up @@ -694,7 +705,7 @@ func main() {
publicAPI.GET("/offers/black-friday", offerHandler.GetBlackFridayOffers)

setKnownAPIs(server.Routes())
setupAndStartBackgroundJobs(objectCleanupController, replicationController3)
setupAndStartBackgroundJobs(objectCleanupController, replicationController3, fileDataCtrl)
setupAndStartCrons(
userAuthRepo, publicCollectionRepo, twoFactorRepo, passkeysRepo, fileController, taskLockingRepo, emailNotificationCtrl,
trashController, pushController, objectController, dataCleanupController, storageBonusCtrl,
Expand Down Expand Up @@ -804,17 +815,23 @@ func setupDatabase() *sql.DB {
func setupAndStartBackgroundJobs(
objectCleanupController *controller.ObjectCleanupController,
replicationController3 *controller.ReplicationController3,
fileDataCtrl *filedata.Controller,
) {
isReplicationEnabled := viper.GetBool("replication.enabled")
if isReplicationEnabled {
err := replicationController3.StartReplication()
if err != nil {
log.Warnf("Could not start replication v3: %s", err)
}
err = fileDataCtrl.StartReplication()
if err != nil {
log.Warnf("Could not start fileData replication: %s", err)
}
} else {
log.Info("Skipping Replication as replication is disabled")
}

fileDataCtrl.StartDataDeletion() // Start data deletion for file data;
objectCleanupController.StartRemovingUnreportedObjects()
objectCleanupController.StartClearingOrphanObjects()
}
Expand Down
12 changes: 12 additions & 0 deletions server/configurations/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@ s3:
# resolved, e.g. when running a local instance, or when using MinIO as a
# production store.
#use_path_style_urls: true
#
# Warning: For file-storage, do not specify buckets with any lock or versioning enabled.
# The application does not handle these cases. By default, we will use the derived-storage or hot storage bucket
# as the primary bucket for file-data storage.
# file-data-storage:
# mldata:
# primaryBucket:
# replicaBuckets: []
# img_preview:
# primaryBucket:
# replicaBuckets: []


# Key used for encrypting customer emails before storing them in DB
#
Expand Down
9 changes: 6 additions & 3 deletions server/ente/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,11 @@ type MultipartUploadURLs struct {
type ObjectType string

const (
FILE ObjectType = "file"
THUMBNAIL ObjectType = "thumbnail"
FILE ObjectType = "file"
THUMBNAIL ObjectType = "thumbnail"
PreviewImage ObjectType = "img_preview"
PreviewVideo ObjectType = "vid_preview"
MlData ObjectType = "mldata"
)

// S3ObjectKey represents the s3 object key and corresponding fileID for it
Expand Down Expand Up @@ -199,7 +202,7 @@ type TempObject struct {
ObjectKey string
IsMultipart bool
UploadID string
DataCenter string
BucketId string
}

// DuplicateFiles represents duplicate files
Expand Down
122 changes: 122 additions & 0 deletions server/ente/filedata/filedata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package filedata

import (
"fmt"
"github.com/ente-io/museum/ente"
)

type Entity struct {
FileID int64 `json:"fileID"`
Type ente.ObjectType `json:"type"`
EncryptedData string `json:"encryptedData"`
DecryptionHeader string `json:"decryptionHeader"`
}

// GetFilesData should only be used for getting the preview video playlist and derived metadata.
type GetFilesData struct {
FileIDs []int64 `json:"fileIDs" binding:"required"`
Type ente.ObjectType `json:"type" binding:"required"`
}

func (g *GetFilesData) Validate() error {
if g.Type != ente.PreviewVideo && g.Type != ente.MlData {
return ente.NewBadRequestWithMessage(fmt.Sprintf("unsupported object type %s", g.Type))
}
if len(g.FileIDs) == 0 {
return ente.NewBadRequestWithMessage("fileIDs are required")
}
if len(g.FileIDs) > 200 {
return ente.NewBadRequestWithMessage("fileIDs should be less than or equal to 200")
}
return nil
}

type GetFileData struct {
FileID int64 `form:"fileID" binding:"required"`
Type ente.ObjectType `form:"type" binding:"required"`
}

func (g *GetFileData) Validate() error {
if g.Type != ente.PreviewVideo && g.Type != ente.MlData {
return ente.NewBadRequestWithMessage(fmt.Sprintf("unsupported object type %s", g.Type))
}
return nil
}

type GetFilesDataResponse struct {
Data []Entity `json:"data"`
PendingIndexFileIDs []int64 `json:"pendingIndexFileIDs"`
ErrFileIDs []int64 `json:"errFileIDs"`
}

// S3FileMetadata stuck represents the metadata that is stored in the S3 bucket for non-file type metadata
// that is stored in the S3 bucket.
type S3FileMetadata struct {
Version int `json:"v"`
EncryptedData string `json:"encryptedData"`
DecryptionHeader string `json:"header"`
Client string `json:"client"`
}

type GetPreviewURLRequest struct {
FileID int64 `form:"fileID" binding:"required"`
Type ente.ObjectType `form:"type" binding:"required"`
}

func (g *GetPreviewURLRequest) Validate() error {
if g.Type != ente.PreviewVideo && g.Type != ente.PreviewImage {
return ente.NewBadRequestWithMessage(fmt.Sprintf("unsupported object type %s", g.Type))
}
return nil
}

type PreviewUploadUrlRequest struct {
FileID int64 `form:"fileID" binding:"required"`
Type ente.ObjectType `form:"type" binding:"required"`
}

func (g *PreviewUploadUrlRequest) Validate() error {
if g.Type != ente.PreviewVideo && g.Type != ente.PreviewImage {
return ente.NewBadRequestWithMessage(fmt.Sprintf("unsupported object type %s", g.Type))
}
return nil
}

// Row represents the data that is stored in the file_data table.
type Row struct {
FileID int64
UserID int64
Type ente.ObjectType
// If a file type has multiple objects, then the size is the sum of all the objects.
Size int64
LatestBucket string
ReplicatedBuckets []string
DeleteFromBuckets []string
InflightReplicas []string
PendingSync bool
IsDeleted bool
SyncLockedTill int64
CreatedAt int64
UpdatedAt int64
}

// S3FileMetadataObjectKey returns the object key for the metadata stored in the S3 bucket.
func (r *Row) S3FileMetadataObjectKey() string {
if r.Type == ente.MlData {
return derivedMetaPath(r.FileID, r.UserID)
}
if r.Type == ente.PreviewVideo {
return previewVideoPlaylist(r.FileID, r.UserID)
}
panic(fmt.Sprintf("S3FileMetadata should not be written for %s type", r.Type))
}

// GetS3FileObjectKey returns the object key for the file data stored in the S3 bucket.
func (r *Row) GetS3FileObjectKey() string {
if r.Type == ente.PreviewVideo {
return previewVideoPath(r.FileID, r.UserID)
} else if r.Type == ente.PreviewImage {
return previewImagePath(r.FileID, r.UserID)
}
panic(fmt.Sprintf("unsupported object type %s", r.Type))
}
53 changes: 53 additions & 0 deletions server/ente/filedata/path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package filedata

import (
"fmt"
"github.com/ente-io/museum/ente"
)

// BasePrefix returns the base prefix for all objects related to a file. To check if the file data is deleted,
// ensure that there's no file in the S3 bucket with this prefix.
func BasePrefix(fileID int64, ownerID int64) string {
return fmt.Sprintf("%d/file-data/%d/", ownerID, fileID)
}

func AllObjects(fileID int64, ownerID int64, oType ente.ObjectType) []string {
switch oType {
case ente.PreviewVideo:
return []string{previewVideoPath(fileID, ownerID), previewVideoPlaylist(fileID, ownerID)}
case ente.MlData:
return []string{derivedMetaPath(fileID, ownerID)}
case ente.PreviewImage:
return []string{previewImagePath(fileID, ownerID)}
default:
// throw panic saying current object type is not supported
panic(fmt.Sprintf("object type %s is not supported", oType))
}
}

func PreviewUrl(fileID int64, ownerID int64, oType ente.ObjectType) string {
switch oType {
case ente.PreviewVideo:
return previewVideoPath(fileID, ownerID)
case ente.PreviewImage:
return previewImagePath(fileID, ownerID)
default:
panic(fmt.Sprintf("object type %s is not supported", oType))
}
}

func previewVideoPath(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewVideo))
}

func previewVideoPlaylist(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", previewVideoPath(fileID, ownerID), "_playlist.m3u8")
}

func previewImagePath(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewImage))
}

func derivedMetaPath(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.MlData))
}
66 changes: 66 additions & 0 deletions server/ente/filedata/putfiledata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package filedata

import (
"fmt"
"github.com/ente-io/museum/ente"
)

type PutFileDataRequest struct {
FileID int64 `json:"fileID" binding:"required"`
Type ente.ObjectType `json:"type" binding:"required"`
EncryptedData *string `json:"encryptedData,omitempty"`
DecryptionHeader *string `json:"decryptionHeader,omitempty"`
// ObjectKey is the key of the object in the S3 bucket. This is needed while putting the object in the S3 bucket.
ObjectKey *string `json:"objectKey,omitempty"`
// size of the object that is being uploaded. This helps in checking the size of the object that is being uploaded.
ObjectSize *int64 `json:"objectSize,omitempty"`
Version *int `json:"version,omitempty"`
}

func (r PutFileDataRequest) isEncDataPresent() bool {
return r.EncryptedData != nil && r.DecryptionHeader != nil && *r.EncryptedData != "" && *r.DecryptionHeader != ""
}

func (r PutFileDataRequest) isObjectDataPresent() bool {
return r.ObjectKey != nil && *r.ObjectKey != "" && r.ObjectSize != nil && *r.ObjectSize > 0
}

func (r PutFileDataRequest) Validate() error {
switch r.Type {
case ente.PreviewVideo:
if !r.isEncDataPresent() || !r.isObjectDataPresent() {
return ente.NewBadRequestWithMessage("object and metadata are required")
}
case ente.PreviewImage:
if !r.isObjectDataPresent() || r.isEncDataPresent() {
return ente.NewBadRequestWithMessage("object (only) data is required for preview image")
}
case ente.MlData:
if !r.isEncDataPresent() || r.isObjectDataPresent() {
return ente.NewBadRequestWithMessage("encryptedData and decryptionHeader (only) are required for derived meta")
}
default:
return ente.NewBadRequestWithMessage(fmt.Sprintf("invalid object type %s", r.Type))
}
return nil
}

func (r PutFileDataRequest) S3FileMetadataObjectKey(ownerID int64) string {
if r.Type == ente.MlData {
return derivedMetaPath(r.FileID, ownerID)
}
if r.Type == ente.PreviewVideo {
return previewVideoPlaylist(r.FileID, ownerID)
}
panic(fmt.Sprintf("S3FileMetadata should not be written for %s type", r.Type))
}

func (r PutFileDataRequest) S3FileObjectKey(ownerID int64) string {
if r.Type == ente.PreviewVideo {
return previewVideoPath(r.FileID, ownerID)
}
if r.Type == ente.PreviewImage {
return previewImagePath(r.FileID, ownerID)
}
panic(fmt.Sprintf("S3FileObjectKey should not be written for %s type", r.Type))
}
Loading

0 comments on commit 6259a97

Please sign in to comment.