diff --git a/server/cmd/museum/main.go b/server/cmd/museum/main.go index 0a2d815c99..1223bdd003 100644 --- a/server/cmd/museum/main.go +++ b/server/cmd/museum/main.go @@ -6,6 +6,7 @@ import ( b64 "encoding/base64" "fmt" "github.com/ente-io/museum/pkg/controller/file_copy" + "github.com/ente-io/museum/pkg/controller/filedata" "net/http" "os" "os/signal" @@ -49,6 +50,7 @@ import ( castRepo "github.com/ente-io/museum/pkg/repo/cast" "github.com/ente-io/museum/pkg/repo/datacleanup" "github.com/ente-io/museum/pkg/repo/embedding" + fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata" "github.com/ente-io/museum/pkg/repo/kex" "github.com/ente-io/museum/pkg/repo/passkey" "github.com/ente-io/museum/pkg/repo/remotestore" @@ -134,7 +136,6 @@ func main() { }, []string{"method"}) s3Config := s3config.NewS3Config() - passkeysRepo, err := passkey.NewRepository(db) if err != nil { panic(err) @@ -162,6 +163,7 @@ func main() { fileRepo := &repo.FileRepository{DB: db, S3Config: s3Config, QueueRepo: queueRepo, ObjectRepo: objectRepo, ObjectCleanupRepo: objectCleanupRepo, ObjectCopiesRepo: objectCopiesRepo, UsageRepo: usageRepo} + fileDataRepo := &fileDataRepo.Repository{DB: db} familyRepo := &repo.FamilyRepository{DB: db} trashRepo := &repo.TrashRepository{DB: db, ObjectRepo: objectRepo, FileRepo: fileRepo, QueueRepo: queueRepo} publicCollectionRepo := repo.NewPublicCollectionRepository(db, viper.GetString("apps.public-albums")) @@ -238,6 +240,9 @@ func main() { FileRepo: fileRepo, } + accessCtrl := access.NewAccessController(collectionRepo, fileRepo) + fileDataCtrl := filedata.New(fileDataRepo, accessCtrl, objectCleanupController, s3Config, fileRepo, collectionRepo) + fileController := &controller.FileController{ FileRepo: fileRepo, ObjectRepo: objectRepo, @@ -287,8 +292,6 @@ func main() { JwtSecret: jwtSecretBytes, } - accessCtrl := access.NewAccessController(collectionRepo, fileRepo) - collectionController := &controller.CollectionController{ CollectionRepo: collectionRepo, AccessCtrl: accessCtrl, @@ -401,6 +404,7 @@ func main() { fileHandler := &api.FileHandler{ Controller: fileController, FileCopyCtrl: fileCopyCtrl, + FileDataCtrl: fileDataCtrl, } privateAPI.GET("/files/upload-urls", fileHandler.GetUploadURLs) privateAPI.GET("/files/multipart-upload-urls", fileHandler.GetMultipartUploadURLs) @@ -408,6 +412,13 @@ func main() { privateAPI.GET("/files/download/v2/:fileID", fileHandler.Get) privateAPI.GET("/files/preview/:fileID", fileHandler.GetThumbnail) privateAPI.GET("/files/preview/v2/:fileID", fileHandler.GetThumbnail) + + privateAPI.PUT("/files/data/", fileHandler.PutFileData) + privateAPI.POST("files/data/fetch", fileHandler.GetFilesData) + privateAPI.GET("files/data/fetch", fileHandler.GetFileData) + privateAPI.GET("/files/data/preview-upload-url/", fileHandler.GetPreviewUploadURL) + privateAPI.GET("/files/data/preview/", fileHandler.GetPreviewURL) + privateAPI.POST("/files", fileHandler.CreateOrUpdate) privateAPI.POST("/files/copy", fileHandler.CopyFiles) privateAPI.PUT("/files/update", fileHandler.Update) @@ -694,7 +705,7 @@ func main() { publicAPI.GET("/offers/black-friday", offerHandler.GetBlackFridayOffers) setKnownAPIs(server.Routes()) - setupAndStartBackgroundJobs(objectCleanupController, replicationController3) + setupAndStartBackgroundJobs(objectCleanupController, replicationController3, fileDataCtrl) setupAndStartCrons( userAuthRepo, publicCollectionRepo, twoFactorRepo, passkeysRepo, fileController, taskLockingRepo, emailNotificationCtrl, trashController, pushController, objectController, dataCleanupController, storageBonusCtrl, @@ -804,6 +815,7 @@ func setupDatabase() *sql.DB { func setupAndStartBackgroundJobs( objectCleanupController *controller.ObjectCleanupController, replicationController3 *controller.ReplicationController3, + fileDataCtrl *filedata.Controller, ) { isReplicationEnabled := viper.GetBool("replication.enabled") if isReplicationEnabled { @@ -811,10 +823,15 @@ func setupAndStartBackgroundJobs( if err != nil { log.Warnf("Could not start replication v3: %s", err) } + err = fileDataCtrl.StartReplication() + if err != nil { + log.Warnf("Could not start fileData replication: %s", err) + } } else { log.Info("Skipping Replication as replication is disabled") } + fileDataCtrl.StartDataDeletion() // Start data deletion for file data; objectCleanupController.StartRemovingUnreportedObjects() objectCleanupController.StartClearingOrphanObjects() } diff --git a/server/configurations/local.yaml b/server/configurations/local.yaml index 15d841dcba..e89cdf38a3 100644 --- a/server/configurations/local.yaml +++ b/server/configurations/local.yaml @@ -172,6 +172,18 @@ s3: # resolved, e.g. when running a local instance, or when using MinIO as a # production store. #use_path_style_urls: true + # + # Warning: For file-storage, do not specify buckets with any lock or versioning enabled. + # The application does not handle these cases. By default, we will use the derived-storage or hot storage bucket + # as the primary bucket for file-data storage. + # file-data-storage: + # mldata: + # primaryBucket: + # replicaBuckets: [] + # img_preview: + # primaryBucket: + # replicaBuckets: [] + # Key used for encrypting customer emails before storing them in DB # diff --git a/server/ente/file.go b/server/ente/file.go index a0e67c71cf..117db44729 100644 --- a/server/ente/file.go +++ b/server/ente/file.go @@ -153,8 +153,11 @@ type MultipartUploadURLs struct { type ObjectType string const ( - FILE ObjectType = "file" - THUMBNAIL ObjectType = "thumbnail" + FILE ObjectType = "file" + THUMBNAIL ObjectType = "thumbnail" + PreviewImage ObjectType = "img_preview" + PreviewVideo ObjectType = "vid_preview" + MlData ObjectType = "mldata" ) // S3ObjectKey represents the s3 object key and corresponding fileID for it @@ -199,7 +202,7 @@ type TempObject struct { ObjectKey string IsMultipart bool UploadID string - DataCenter string + BucketId string } // DuplicateFiles represents duplicate files diff --git a/server/ente/filedata/filedata.go b/server/ente/filedata/filedata.go new file mode 100644 index 0000000000..3db26fefb0 --- /dev/null +++ b/server/ente/filedata/filedata.go @@ -0,0 +1,122 @@ +package filedata + +import ( + "fmt" + "github.com/ente-io/museum/ente" +) + +type Entity struct { + FileID int64 `json:"fileID"` + Type ente.ObjectType `json:"type"` + EncryptedData string `json:"encryptedData"` + DecryptionHeader string `json:"decryptionHeader"` +} + +// GetFilesData should only be used for getting the preview video playlist and derived metadata. +type GetFilesData struct { + FileIDs []int64 `json:"fileIDs" binding:"required"` + Type ente.ObjectType `json:"type" binding:"required"` +} + +func (g *GetFilesData) Validate() error { + if g.Type != ente.PreviewVideo && g.Type != ente.MlData { + return ente.NewBadRequestWithMessage(fmt.Sprintf("unsupported object type %s", g.Type)) + } + if len(g.FileIDs) == 0 { + return ente.NewBadRequestWithMessage("fileIDs are required") + } + if len(g.FileIDs) > 200 { + return ente.NewBadRequestWithMessage("fileIDs should be less than or equal to 200") + } + return nil +} + +type GetFileData struct { + FileID int64 `form:"fileID" binding:"required"` + Type ente.ObjectType `form:"type" binding:"required"` +} + +func (g *GetFileData) Validate() error { + if g.Type != ente.PreviewVideo && g.Type != ente.MlData { + return ente.NewBadRequestWithMessage(fmt.Sprintf("unsupported object type %s", g.Type)) + } + return nil +} + +type GetFilesDataResponse struct { + Data []Entity `json:"data"` + PendingIndexFileIDs []int64 `json:"pendingIndexFileIDs"` + ErrFileIDs []int64 `json:"errFileIDs"` +} + +// S3FileMetadata stuck represents the metadata that is stored in the S3 bucket for non-file type metadata +// that is stored in the S3 bucket. +type S3FileMetadata struct { + Version int `json:"v"` + EncryptedData string `json:"encryptedData"` + DecryptionHeader string `json:"header"` + Client string `json:"client"` +} + +type GetPreviewURLRequest struct { + FileID int64 `form:"fileID" binding:"required"` + Type ente.ObjectType `form:"type" binding:"required"` +} + +func (g *GetPreviewURLRequest) Validate() error { + if g.Type != ente.PreviewVideo && g.Type != ente.PreviewImage { + return ente.NewBadRequestWithMessage(fmt.Sprintf("unsupported object type %s", g.Type)) + } + return nil +} + +type PreviewUploadUrlRequest struct { + FileID int64 `form:"fileID" binding:"required"` + Type ente.ObjectType `form:"type" binding:"required"` +} + +func (g *PreviewUploadUrlRequest) Validate() error { + if g.Type != ente.PreviewVideo && g.Type != ente.PreviewImage { + return ente.NewBadRequestWithMessage(fmt.Sprintf("unsupported object type %s", g.Type)) + } + return nil +} + +// Row represents the data that is stored in the file_data table. +type Row struct { + FileID int64 + UserID int64 + Type ente.ObjectType + // If a file type has multiple objects, then the size is the sum of all the objects. + Size int64 + LatestBucket string + ReplicatedBuckets []string + DeleteFromBuckets []string + InflightReplicas []string + PendingSync bool + IsDeleted bool + SyncLockedTill int64 + CreatedAt int64 + UpdatedAt int64 +} + +// S3FileMetadataObjectKey returns the object key for the metadata stored in the S3 bucket. +func (r *Row) S3FileMetadataObjectKey() string { + if r.Type == ente.MlData { + return derivedMetaPath(r.FileID, r.UserID) + } + if r.Type == ente.PreviewVideo { + return previewVideoPlaylist(r.FileID, r.UserID) + } + panic(fmt.Sprintf("S3FileMetadata should not be written for %s type", r.Type)) +} + +// GetS3FileObjectKey returns the object key for the file data stored in the S3 bucket. +func (r *Row) GetS3FileObjectKey() string { + if r.Type == ente.PreviewVideo { + return previewVideoPath(r.FileID, r.UserID) + } else if r.Type == ente.PreviewImage { + return previewImagePath(r.FileID, r.UserID) + } + panic(fmt.Sprintf("unsupported object type %s", r.Type)) +} diff --git a/server/ente/filedata/path.go b/server/ente/filedata/path.go new file mode 100644 index 0000000000..d0ea908800 --- /dev/null +++ b/server/ente/filedata/path.go @@ -0,0 +1,53 @@ +package filedata + +import ( + "fmt" + "github.com/ente-io/museum/ente" +) + +// BasePrefix returns the base prefix for all objects related to a file. To check if the file data is deleted, +// ensure that there's no file in the S3 bucket with this prefix. +func BasePrefix(fileID int64, ownerID int64) string { + return fmt.Sprintf("%d/file-data/%d/", ownerID, fileID) +} + +func AllObjects(fileID int64, ownerID int64, oType ente.ObjectType) []string { + switch oType { + case ente.PreviewVideo: + return []string{previewVideoPath(fileID, ownerID), previewVideoPlaylist(fileID, ownerID)} + case ente.MlData: + return []string{derivedMetaPath(fileID, ownerID)} + case ente.PreviewImage: + return []string{previewImagePath(fileID, ownerID)} + default: + // throw panic saying current object type is not supported + panic(fmt.Sprintf("object type %s is not supported", oType)) + } +} + +func PreviewUrl(fileID int64, ownerID int64, oType ente.ObjectType) string { + switch oType { + case ente.PreviewVideo: + return previewVideoPath(fileID, ownerID) + case ente.PreviewImage: + return previewImagePath(fileID, ownerID) + default: + panic(fmt.Sprintf("object type %s is not supported", oType)) + } +} + +func previewVideoPath(fileID int64, ownerID int64) string { + return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewVideo)) +} + +func previewVideoPlaylist(fileID int64, ownerID int64) string { + return fmt.Sprintf("%s%s", previewVideoPath(fileID, ownerID), "_playlist.m3u8") +} + +func previewImagePath(fileID int64, ownerID int64) string { + return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewImage)) +} + +func derivedMetaPath(fileID int64, ownerID int64) string { + return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.MlData)) +} diff --git a/server/ente/filedata/putfiledata.go b/server/ente/filedata/putfiledata.go new file mode 100644 index 0000000000..58394989da --- /dev/null +++ b/server/ente/filedata/putfiledata.go @@ -0,0 +1,66 @@ +package filedata + +import ( + "fmt" + "github.com/ente-io/museum/ente" +) + +type PutFileDataRequest struct { + FileID int64 `json:"fileID" binding:"required"` + Type ente.ObjectType `json:"type" binding:"required"` + EncryptedData *string `json:"encryptedData,omitempty"` + DecryptionHeader *string `json:"decryptionHeader,omitempty"` + // ObjectKey is the key of the object in the S3 bucket. This is needed while putting the object in the S3 bucket. + ObjectKey *string `json:"objectKey,omitempty"` + // size of the object that is being uploaded. This helps in checking the size of the object that is being uploaded. + ObjectSize *int64 `json:"objectSize,omitempty"` + Version *int `json:"version,omitempty"` +} + +func (r PutFileDataRequest) isEncDataPresent() bool { + return r.EncryptedData != nil && r.DecryptionHeader != nil && *r.EncryptedData != "" && *r.DecryptionHeader != "" +} + +func (r PutFileDataRequest) isObjectDataPresent() bool { + return r.ObjectKey != nil && *r.ObjectKey != "" && r.ObjectSize != nil && *r.ObjectSize > 0 +} + +func (r PutFileDataRequest) Validate() error { + switch r.Type { + case ente.PreviewVideo: + if !r.isEncDataPresent() || !r.isObjectDataPresent() { + return ente.NewBadRequestWithMessage("object and metadata are required") + } + case ente.PreviewImage: + if !r.isObjectDataPresent() || r.isEncDataPresent() { + return ente.NewBadRequestWithMessage("object (only) data is required for preview image") + } + case ente.MlData: + if !r.isEncDataPresent() || r.isObjectDataPresent() { + return ente.NewBadRequestWithMessage("encryptedData and decryptionHeader (only) are required for derived meta") + } + default: + return ente.NewBadRequestWithMessage(fmt.Sprintf("invalid object type %s", r.Type)) + } + return nil +} + +func (r PutFileDataRequest) S3FileMetadataObjectKey(ownerID int64) string { + if r.Type == ente.MlData { + return derivedMetaPath(r.FileID, ownerID) + } + if r.Type == ente.PreviewVideo { + return previewVideoPlaylist(r.FileID, ownerID) + } + panic(fmt.Sprintf("S3FileMetadata should not be written for %s type", r.Type)) +} + +func (r PutFileDataRequest) S3FileObjectKey(ownerID int64) string { + if r.Type == ente.PreviewVideo { + return previewVideoPath(r.FileID, ownerID) + } + if r.Type == ente.PreviewImage { + return previewImagePath(r.FileID, ownerID) + } + panic(fmt.Sprintf("S3FileObjectKey should not be written for %s type", r.Type)) +} diff --git a/server/migrations/89_file_data_table.down.sql b/server/migrations/89_file_data_table.down.sql new file mode 100644 index 0000000000..bdf2a8717b --- /dev/null +++ b/server/migrations/89_file_data_table.down.sql @@ -0,0 +1,10 @@ + +DROP INDEX IF EXISTS idx_file_data_user_type_deleted; +DROP INDEX IF EXISTS idx_file_data_last_sync_time; + +DROP TABLE IF EXISTS file_data; + +DROP TYPE IF EXISTS file_data_type; + +-- Delete triggers +DROP TRIGGER IF EXISTS check_no_common_entries ON file_data; \ No newline at end of file diff --git a/server/migrations/89_file_data_table.up.sql b/server/migrations/89_file_data_table.up.sql new file mode 100644 index 0000000000..4ea314a997 --- /dev/null +++ b/server/migrations/89_file_data_table.up.sql @@ -0,0 +1,62 @@ +ALTER TABLE temp_objects ADD COLUMN IF NOT EXISTS bucket_id s3region; +ALTER TYPE OBJECT_TYPE ADD VALUE 'mldata'; +ALTER TYPE s3region ADD VALUE 'b5'; +ALTER TYPE s3region ADD VALUE 'b6'; +-- Create the file_data table +CREATE TABLE IF NOT EXISTS file_data +( + file_id BIGINT NOT NULL, + user_id BIGINT NOT NULL, + data_type OBJECT_TYPE NOT NULL, + size BIGINT NOT NULL, + latest_bucket s3region NOT NULL, + replicated_buckets s3region[] NOT NULL DEFAULT '{}', +-- following field contains list of buckets from where we need to delete the data as the given data_type will not longer be persisted in that dc + delete_from_buckets s3region[] NOT NULL DEFAULT '{}', + inflight_rep_buckets s3region[] NOT NULL DEFAULT '{}', + is_deleted BOOLEAN NOT NULL DEFAULT false, + pending_sync BOOLEAN NOT NULL DEFAULT true, + sync_locked_till BIGINT NOT NULL DEFAULT 0, + created_at BIGINT NOT NULL DEFAULT now_utc_micro_seconds(), + updated_at BIGINT NOT NULL DEFAULT now_utc_micro_seconds(), + PRIMARY KEY (file_id, data_type) +); + +-- Add index for user_id and data_type for efficient querying for size calculation +CREATE INDEX idx_file_data_user_type_deleted ON file_data (user_id, data_type, is_deleted) INCLUDE (size); +CREATE INDEX idx_file_data_pending_sync_locked_till ON file_data (is_deleted, sync_locked_till) where pending_sync = true; + +CREATE OR REPLACE FUNCTION ensure_no_common_entries() + RETURNS TRIGGER AS +$$ +DECLARE + all_buckets s3region[]; + duplicate_buckets s3region[]; +BEGIN + -- Combine all bucket IDs into a single array + all_buckets := ARRAY [NEW.latest_bucket] || NEW.replicated_buckets || NEW.delete_from_buckets || + NEW.inflight_rep_buckets; + + -- Find duplicate bucket IDs + SELECT ARRAY_AGG(DISTINCT bucket) + INTO duplicate_buckets + FROM unnest(all_buckets) bucket + GROUP BY bucket + HAVING COUNT(*) > 1; + + -- If duplicates exist, raise an exception with details + IF ARRAY_LENGTH(duplicate_buckets, 1) > 0 THEN + RAISE EXCEPTION 'Duplicate bucket IDs found: %. Latest: %, Replicated: %, To Delete: %, Inflight: %', + duplicate_buckets, NEW.latest_bucket, NEW.replicated_buckets, NEW.delete_from_buckets, NEW.inflight_rep_buckets; + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER check_no_common_entries + BEFORE INSERT OR UPDATE + ON file_data + FOR EACH ROW +EXECUTE FUNCTION ensure_no_common_entries(); + diff --git a/server/pkg/api/file.go b/server/pkg/api/file.go index 064bc3be08..2e15ade325 100644 --- a/server/pkg/api/file.go +++ b/server/pkg/api/file.go @@ -3,6 +3,7 @@ package api import ( "fmt" "github.com/ente-io/museum/pkg/controller/file_copy" + "github.com/ente-io/museum/pkg/controller/filedata" "net/http" "os" "strconv" @@ -24,6 +25,7 @@ import ( type FileHandler struct { Controller *controller.FileController FileCopyCtrl *file_copy.FileCopyController + FileDataCtrl *filedata.Controller } // DefaultMaxBatchSize is the default maximum API batch size unless specified otherwise diff --git a/server/pkg/api/file_data.go b/server/pkg/api/file_data.go new file mode 100644 index 0000000000..36c863e65c --- /dev/null +++ b/server/pkg/api/file_data.go @@ -0,0 +1,97 @@ +package api + +import ( + "fmt" + "github.com/ente-io/museum/ente" + fileData "github.com/ente-io/museum/ente/filedata" + "github.com/ente-io/museum/pkg/utils/handler" + "github.com/ente-io/stacktrace" + "github.com/gin-gonic/gin" + "net/http" +) + +func (h *FileHandler) PutFileData(ctx *gin.Context) { + var req fileData.PutFileDataRequest + if err := ctx.ShouldBindJSON(&req); err != nil { + ctx.JSON(http.StatusBadRequest, ente.NewBadRequestWithMessage(err.Error())) + return + } + if err := req.Validate(); err != nil { + ctx.JSON(http.StatusBadRequest, err) + return + } + reqInt := &req + if reqInt.Version == nil { + version := 1 + reqInt.Version = &version + } + err := h.FileDataCtrl.InsertOrUpdate(ctx, &req) + if err != nil { + handler.Error(ctx, err) + + return + } + ctx.JSON(http.StatusOK, gin.H{}) +} + +func (h *FileHandler) GetFilesData(ctx *gin.Context) { + var req fileData.GetFilesData + if err := ctx.ShouldBindJSON(&req); err != nil { + ctx.JSON(http.StatusBadRequest, ente.NewBadRequestWithMessage(err.Error())) + return + } + resp, err := h.FileDataCtrl.GetFilesData(ctx, req) + if err != nil { + handler.Error(ctx, err) + return + } + ctx.JSON(http.StatusOK, resp) +} + +func (h *FileHandler) GetFileData(ctx *gin.Context) { + var req fileData.GetFileData + if err := ctx.ShouldBindJSON(&req); err != nil { + ctx.JSON(http.StatusBadRequest, ente.NewBadRequestWithMessage(err.Error())) + return + } + resp, err := h.FileDataCtrl.GetFileData(ctx, req) + if err != nil { + handler.Error(ctx, err) + return + } + ctx.JSON(http.StatusOK, gin.H{ + "data": resp, + }) +} + +func (h *FileHandler) GetPreviewUploadURL(c *gin.Context) { + var request fileData.PreviewUploadUrlRequest + if err := c.ShouldBindJSON(&request); err != nil { + handler.Error(c, stacktrace.Propagate(ente.ErrBadRequest, fmt.Sprintf("Request binding failed %s", err))) + return + } + url, err := h.FileDataCtrl.PreviewUploadURL(c, request) + if err != nil { + handler.Error(c, stacktrace.Propagate(err, "")) + return + } + c.JSON(http.StatusOK, gin.H{ + "url": url, + }) +} + +func (h *FileHandler) GetPreviewURL(c *gin.Context) { + var request fileData.GetPreviewURLRequest + if err := c.ShouldBindJSON(&request); err != nil { + handler.Error(c, stacktrace.Propagate(ente.ErrBadRequest, fmt.Sprintf("Request binding failed %s", err))) + return + } + url, err := h.FileDataCtrl.GetPreviewUrl(c, request) + if err != nil { + handler.Error(c, stacktrace.Propagate(err, "")) + return + } + c.JSON(http.StatusOK, gin.H{ + "url": url, + }) +} diff --git a/server/pkg/controller/filedata/controller.go b/server/pkg/controller/filedata/controller.go new file mode 100644 index 0000000000..d5f400c4fe --- /dev/null +++ b/server/pkg/controller/filedata/controller.go @@ -0,0 +1,311 @@ +package filedata + +import ( + "context" + "errors" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/ente-io/museum/ente" + fileData "github.com/ente-io/museum/ente/filedata" + "github.com/ente-io/museum/pkg/controller" + "github.com/ente-io/museum/pkg/controller/access" + "github.com/ente-io/museum/pkg/repo" + fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata" + "github.com/ente-io/museum/pkg/utils/array" + "github.com/ente-io/museum/pkg/utils/auth" + "github.com/ente-io/museum/pkg/utils/network" + "github.com/ente-io/museum/pkg/utils/s3config" + "github.com/ente-io/stacktrace" + "github.com/gin-gonic/gin" + log "github.com/sirupsen/logrus" + "strings" + "sync" + gTime "time" +) + +// _fetchConfig is the configuration for the fetching objects from S3 +type _fetchConfig struct { + RetryCount int + InitialTimeout gTime.Duration + MaxTimeout gTime.Duration +} + +var _defaultFetchConfig = _fetchConfig{RetryCount: 3, InitialTimeout: 10 * gTime.Second, MaxTimeout: 30 * gTime.Second} +var globalFileFetchSemaphore = make(chan struct{}, 400) + +type bulkS3MetaFetchResult struct { + s3MetaObject fileData.S3FileMetadata + dbEntry fileData.Row + err error +} + +type Controller struct { + Repo *fileDataRepo.Repository + AccessCtrl access.Controller + ObjectCleanupController *controller.ObjectCleanupController + S3Config *s3config.S3Config + FileRepo *repo.FileRepository + CollectionRepo *repo.CollectionRepository + downloadManagerCache map[string]*s3manager.Downloader + // for downloading objects from s3 for replication + workerURL string +} + +func New(repo *fileDataRepo.Repository, + accessCtrl access.Controller, + objectCleanupController *controller.ObjectCleanupController, + s3Config *s3config.S3Config, + fileRepo *repo.FileRepository, + collectionRepo *repo.CollectionRepository) *Controller { + embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetWasabiDerivedDC(), s3Config.GetDerivedStorageDataCenter(), "b5"} + cache := make(map[string]*s3manager.Downloader, len(embeddingDcs)) + for i := range embeddingDcs { + s3Client := s3Config.GetS3Client(embeddingDcs[i]) + cache[embeddingDcs[i]] = s3manager.NewDownloaderWithClient(&s3Client) + } + return &Controller{ + Repo: repo, + AccessCtrl: accessCtrl, + ObjectCleanupController: objectCleanupController, + S3Config: s3Config, + FileRepo: fileRepo, + CollectionRepo: collectionRepo, + downloadManagerCache: cache, + } +} + +func (c *Controller) InsertOrUpdate(ctx *gin.Context, req *fileData.PutFileDataRequest) error { + if err := req.Validate(); err != nil { + return stacktrace.Propagate(err, "validation failed") + } + userID := auth.GetUserID(ctx.Request.Header) + err := c._validatePermission(ctx, req.FileID, userID) + if err != nil { + return stacktrace.Propagate(err, "") + } + if req.Type != ente.MlData && req.Type != ente.PreviewVideo { + return stacktrace.Propagate(ente.NewBadRequestWithMessage("unsupported object type "+string(req.Type)), "") + } + fileOwnerID := userID + bucketID := c.S3Config.GetBucketID(req.Type) + if req.Type == ente.PreviewVideo { + fileObjectKey := req.S3FileObjectKey(fileOwnerID) + if !strings.Contains(*req.ObjectKey, fileObjectKey) { + return stacktrace.Propagate(ente.NewBadRequestWithMessage("objectKey should contain the file object key"), "") + } + err = c.copyObject(*req.ObjectKey, fileObjectKey, bucketID) + if err != nil { + return err + } + } + objectKey := req.S3FileMetadataObjectKey(fileOwnerID) + obj := fileData.S3FileMetadata{ + Version: *req.Version, + EncryptedData: *req.EncryptedData, + DecryptionHeader: *req.DecryptionHeader, + Client: network.GetClientInfo(ctx), + } + + size, uploadErr := c.uploadObject(obj, objectKey, bucketID) + if uploadErr != nil { + log.Error(uploadErr) + return stacktrace.Propagate(uploadErr, "") + } + + row := fileData.Row{ + FileID: req.FileID, + Type: req.Type, + UserID: fileOwnerID, + Size: size, + LatestBucket: bucketID, + } + err = c.Repo.InsertOrUpdate(ctx, row) + if err != nil { + return stacktrace.Propagate(err, "") + } + return nil +} + +func (c *Controller) GetFileData(ctx *gin.Context, req fileData.GetFileData) (*fileData.Entity, error) { + if err := req.Validate(); err != nil { + return nil, stacktrace.Propagate(err, "validation failed") + } + if err := c._validatePermission(ctx, req.FileID, auth.GetUserID(ctx.Request.Header)); err != nil { + return nil, stacktrace.Propagate(err, "") + } + doRows, err := c.Repo.GetFilesData(ctx, req.Type, []int64{req.FileID}) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + if len(doRows) == 0 || doRows[0].IsDeleted { + return nil, stacktrace.Propagate(ente.ErrNotFound, "") + } + s3MetaObject, err := c.fetchS3FileMetadata(context.Background(), doRows[0], doRows[0].LatestBucket) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return &fileData.Entity{ + FileID: doRows[0].FileID, + Type: doRows[0].Type, + EncryptedData: s3MetaObject.EncryptedData, + DecryptionHeader: s3MetaObject.DecryptionHeader, + }, nil +} + +func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) (*fileData.GetFilesDataResponse, error) { + userID := auth.GetUserID(ctx.Request.Header) + if err := c._validateGetFilesData(ctx, userID, req); err != nil { + return nil, stacktrace.Propagate(err, "") + } + + doRows, err := c.Repo.GetFilesData(ctx, req.Type, req.FileIDs) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + + activeRows := make([]fileData.Row, 0) + dbFileIds := make([]int64, 0) + errFileIds := make([]int64, 0) + for i := range doRows { + dbFileIds = append(dbFileIds, doRows[i].FileID) + if !doRows[i].IsDeleted { + activeRows = append(activeRows, doRows[i]) + } + } + pendingIndexFileIds := array.FindMissingElementsInSecondList(req.FileIDs, dbFileIds) + // Fetch missing doRows in parallel + s3MetaFetchResults, err := c.getS3FileMetadataParallel(activeRows) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + fetchedEmbeddings := make([]fileData.Entity, 0) + + // Populate missing data in doRows from fetched objects + for _, obj := range s3MetaFetchResults { + if obj.err != nil { + errFileIds = append(errFileIds, obj.dbEntry.FileID) + } else { + fetchedEmbeddings = append(fetchedEmbeddings, fileData.Entity{ + FileID: obj.dbEntry.FileID, + Type: obj.dbEntry.Type, + EncryptedData: obj.s3MetaObject.EncryptedData, + DecryptionHeader: obj.s3MetaObject.DecryptionHeader, + }) + } + } + + return &fileData.GetFilesDataResponse{ + Data: fetchedEmbeddings, + PendingIndexFileIDs: pendingIndexFileIds, + ErrFileIDs: errFileIds, + }, nil +} + +func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) { + var wg sync.WaitGroup + embeddingObjects := make([]bulkS3MetaFetchResult, len(dbRows)) + for i := range dbRows { + dbRow := dbRows[i] + wg.Add(1) + globalFileFetchSemaphore <- struct{}{} // Acquire from global semaphore + go func(i int, row fileData.Row) { + defer wg.Done() + defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore + dc := row.LatestBucket + s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, dc) + if err != nil { + log.WithField("bucket", dc). + Error("error fetching object: "+row.S3FileMetadataObjectKey(), err) + embeddingObjects[i] = bulkS3MetaFetchResult{ + err: err, + dbEntry: row, + } + + } else { + embeddingObjects[i] = bulkS3MetaFetchResult{ + s3MetaObject: *s3FileMetadata, + dbEntry: dbRow, + } + } + }(i, dbRow) + } + wg.Wait() + return embeddingObjects, nil +} + +func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, dc string) (*fileData.S3FileMetadata, error) { + opt := _defaultFetchConfig + objectKey := row.S3FileMetadataObjectKey() + ctxLogger := log.WithField("objectKey", objectKey).WithField("dc", row.LatestBucket) + totalAttempts := opt.RetryCount + 1 + timeout := opt.InitialTimeout + for i := 0; i < totalAttempts; i++ { + if i > 0 { + timeout = timeout * 2 + if timeout > opt.MaxTimeout { + timeout = opt.MaxTimeout + } + } + fetchCtx, cancel := context.WithTimeout(ctx, timeout) + select { + case <-ctx.Done(): + cancel() + return nil, stacktrace.Propagate(ctx.Err(), "") + default: + obj, err := c.downloadObject(fetchCtx, objectKey, dc) + cancel() // Ensure cancel is called to release resources + if err == nil { + if i > 0 { + ctxLogger.Infof("Fetched object after %d attempts", i) + } + return &obj, nil + } + // Check if the error is due to context timeout or cancellation + if err == nil && fetchCtx.Err() != nil { + ctxLogger.Error("Fetch timed out or cancelled: ", fetchCtx.Err()) + } else { + // check if the error is due to object not found + if s3Err, ok := err.(awserr.RequestFailure); ok { + if s3Err.Code() == s3.ErrCodeNoSuchKey { + return nil, stacktrace.Propagate(errors.New("object not found"), "") + } + } + ctxLogger.Error("Failed to fetch object: ", err) + } + } + } + return nil, stacktrace.Propagate(errors.New("failed to fetch object"), "") +} + +func (c *Controller) _validateGetFilesData(ctx *gin.Context, userID int64, req fileData.GetFilesData) error { + if err := req.Validate(); err != nil { + return stacktrace.Propagate(err, "validation failed") + } + if err := c.AccessCtrl.VerifyFileOwnership(ctx, &access.VerifyFileOwnershipParams{ + ActorUserId: userID, + FileIDs: req.FileIDs, + }); err != nil { + return stacktrace.Propagate(err, "User does not own some file(s)") + } + + return nil +} + +func (c *Controller) _validatePermission(ctx *gin.Context, fileID int64, actorID int64) error { + err := c.AccessCtrl.VerifyFileOwnership(ctx, &access.VerifyFileOwnershipParams{ + ActorUserId: actorID, + FileIDs: []int64{fileID}, + }) + if err != nil { + return stacktrace.Propagate(err, "User does not own file") + } + count, err := c.CollectionRepo.GetCollectionCount(fileID) + if err != nil { + return stacktrace.Propagate(err, "") + } + if count < 1 { + return stacktrace.Propagate(ente.ErrNotFound, "") + } + return nil +} diff --git a/server/pkg/controller/filedata/delete.go b/server/pkg/controller/filedata/delete.go new file mode 100644 index 0000000000..a610282095 --- /dev/null +++ b/server/pkg/controller/filedata/delete.go @@ -0,0 +1,148 @@ +package filedata + +import ( + "context" + "database/sql" + "errors" + "fmt" + "github.com/ente-io/museum/ente/filedata" + fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata" + enteTime "github.com/ente-io/museum/pkg/utils/time" + + log "github.com/sirupsen/logrus" + "time" +) + +// StartDataDeletion clears associated file data from the object store +func (c *Controller) StartDataDeletion() { + go c.startDeleteWorkers(5) +} + +func (c *Controller) startDeleteWorkers(n int) { + log.Infof("Starting %d delete workers for fileData", n) + + for i := 0; i < n; i++ { + go c.delete(i) + // Stagger the workers + time.Sleep(time.Duration(2*i+1) * time.Second) + } +} + +// Entry point for the delete worker (goroutine) +// +// i is an arbitrary index of the current routine. +func (c *Controller) delete(i int) { + // This is just + // + // while (true) { delete() } + // + // but with an extra sleep for a bit if nothing got deleted - both when + // something's wrong, or there's nothing to do. + for { + err := c.tryDelete() + if err != nil { + // Sleep in proportion to the (arbitrary) index to space out the + // workers further. + time.Sleep(time.Duration(i+1) * time.Minute) + } + } +} + +func (c *Controller) tryDelete() error { + newLockTime := enteTime.MicrosecondsAfterMinutes(10) + row, err := c.Repo.GetPendingSyncDataAndExtendLock(context.Background(), newLockTime, true) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Errorf("Could not fetch row for deletion: %s", err) + } + return err + } + err = c.deleteFileRow(*row) + if err != nil { + log.Errorf("Could not delete file data: %s", err) + return err + } + return nil +} + +func (c *Controller) deleteFileRow(fileDataRow filedata.Row) error { + if !fileDataRow.IsDeleted { + return fmt.Errorf("file %d is not marked as deleted", fileDataRow.FileID) + } + fileID := fileDataRow.FileID + ownerID, err := c.FileRepo.GetOwnerID(fileID) + if err != nil { + return err + } + if fileDataRow.UserID != ownerID { + // this should never happen + panic(fmt.Sprintf("file %d does not belong to user %d", fileID, ownerID)) + } + ctxLogger := log.WithField("file_id", fileDataRow.DeleteFromBuckets).WithField("type", fileDataRow.Type).WithField("user_id", fileDataRow.UserID) + objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type) + bucketColumnMap, err := getMapOfBucketItToColumn(fileDataRow) + if err != nil { + ctxLogger.WithError(err).Error("Failed to get bucketColumnMap") + return err + } + // Delete objects and remove buckets + for bucketID, columnName := range bucketColumnMap { + for _, objectKey := range objectKeys { + err := c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKey, bucketID) + if err != nil { + ctxLogger.WithError(err).WithFields(log.Fields{ + "bucketID": bucketID, + "column": columnName, + "objectKey": objectKey, + }).Error("Failed to delete object from datacenter") + return err + } + } + dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, columnName) + if dbErr != nil { + ctxLogger.WithError(dbErr).WithFields(log.Fields{ + "bucketID": bucketID, + "column": columnName, + }).Error("Failed to remove bucket from db") + return dbErr + + } + } + // Delete from Latest bucket + for k := range objectKeys { + err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], fileDataRow.LatestBucket) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete object from datacenter") + return err + } + } + dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow) + if dbErr != nil { + ctxLogger.WithError(dbErr).Error("Failed to remove from db") + return err + } + return nil +} + +func getMapOfBucketItToColumn(row filedata.Row) (map[string]string, error) { + bucketColumnMap := make(map[string]string) + for _, bucketID := range row.DeleteFromBuckets { + if existingColumn, exists := bucketColumnMap[bucketID]; exists { + return nil, fmt.Errorf("duplicate DeleteFromBuckets ID found: %s in column %s", bucketID, existingColumn) + } + bucketColumnMap[bucketID] = fileDataRepo.DeletionColumn + } + for _, bucketID := range row.ReplicatedBuckets { + if existingColumn, exists := bucketColumnMap[bucketID]; exists { + return nil, fmt.Errorf("duplicate ReplicatedBuckets ID found: %s in column %s", bucketID, existingColumn) + } + bucketColumnMap[bucketID] = fileDataRepo.ReplicationColumn + } + for _, bucketID := range row.InflightReplicas { + if existingColumn, exists := bucketColumnMap[bucketID]; exists { + return nil, fmt.Errorf("duplicate InFlightBucketID found: %s in column %s", bucketID, existingColumn) + } + bucketColumnMap[bucketID] = fileDataRepo.InflightRepColumn + } + return bucketColumnMap, nil +} diff --git a/server/pkg/controller/filedata/preview_files.go b/server/pkg/controller/filedata/preview_files.go new file mode 100644 index 0000000000..4d6b0113ba --- /dev/null +++ b/server/pkg/controller/filedata/preview_files.go @@ -0,0 +1,54 @@ +package filedata + +import ( + "fmt" + "github.com/ente-io/museum/ente" + "github.com/ente-io/museum/ente/filedata" + "github.com/ente-io/museum/pkg/utils/auth" + "github.com/ente-io/stacktrace" + "github.com/gin-gonic/gin" +) + +func (c *Controller) GetPreviewUrl(ctx *gin.Context, request filedata.GetPreviewURLRequest) (*string, error) { + if err := request.Validate(); err != nil { + return nil, err + } + actorUser := auth.GetUserID(ctx.Request.Header) + if err := c._validatePermission(ctx, request.FileID, actorUser); err != nil { + return nil, err + } + data, err := c.Repo.GetFilesData(ctx, request.Type, []int64{request.FileID}) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + if len(data) == 0 || data[0].IsDeleted { + return nil, stacktrace.Propagate(ente.ErrNotFound, "") + } + enteUrl, err := c.signedUrlGet(data[0].LatestBucket, data[0].GetS3FileObjectKey()) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return &enteUrl.URL, nil +} + +func (c *Controller) PreviewUploadURL(ctx *gin.Context, request filedata.PreviewUploadUrlRequest) (*string, error) { + if err := request.Validate(); err != nil { + return nil, err + } + actorUser := auth.GetUserID(ctx.Request.Header) + if err := c._validatePermission(ctx, request.FileID, actorUser); err != nil { + return nil, err + } + fileOwnerID, err := c.FileRepo.GetOwnerID(request.FileID) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + // note: instead of the final url, give a temp url for upload purpose. + uploadUrl := fmt.Sprintf("%s_temp_upload", filedata.PreviewUrl(request.FileID, fileOwnerID, request.Type)) + bucketID := c.S3Config.GetBucketID(request.Type) + enteUrl, err := c.getUploadURL(bucketID, uploadUrl) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return &enteUrl.URL, nil +} diff --git a/server/pkg/controller/filedata/replicate.go b/server/pkg/controller/filedata/replicate.go new file mode 100644 index 0000000000..1cf4488d70 --- /dev/null +++ b/server/pkg/controller/filedata/replicate.go @@ -0,0 +1,119 @@ +package filedata + +import ( + "context" + "database/sql" + "errors" + "fmt" + "github.com/ente-io/museum/ente/filedata" + fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata" + enteTime "github.com/ente-io/museum/pkg/utils/time" + "github.com/ente-io/stacktrace" + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "time" +) + +// StartReplication starts the replication process for file data. +// If +func (c *Controller) StartReplication() error { + workerURL := viper.GetString("replication.worker-url") + if workerURL == "" { + log.Infof("replication.worker-url was not defined, file data will downloaded directly during replication") + } else { + log.Infof("Worker URL to download objects for replication v3 is: %s", workerURL) + } + c.workerURL = workerURL + + workerCount := viper.GetInt("replication.file-data.worker-count") + if workerCount == 0 { + workerCount = 6 + } + go c.startWorkers(workerCount) + return nil +} +func (c *Controller) startWorkers(n int) { + log.Infof("Starting %d workers for replication v3", n) + + for i := 0; i < n; i++ { + go c.replicate(i) + // Stagger the workers + time.Sleep(time.Duration(2*i+1) * time.Second) + } +} + +// Entry point for the replication worker (goroutine) +// +// i is an arbitrary index of the current routine. +func (c *Controller) replicate(i int) { + for { + err := c.tryReplicate() + if err != nil { + // Sleep in proportion to the (arbitrary) index to space out the + // workers further. + time.Sleep(time.Duration(i+1) * time.Minute) + } + } +} + +func (c *Controller) tryReplicate() error { + newLockTime := enteTime.MicrosecondsAfterMinutes(60) + ctx, cancelFun := context.WithTimeout(context.Background(), 50*time.Minute) + defer cancelFun() + row, err := c.Repo.GetPendingSyncDataAndExtendLock(ctx, newLockTime, false) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Errorf("Could not fetch row for deletion: %s", err) + } + return err + } + err = c.replicateRowData(ctx, *row) + if err != nil { + log.Errorf("Could not delete file data: %s", err) + return err + } else { + // If the replication was completed without any errors, we can reset the lock time + return c.Repo.ResetSyncLock(ctx, *row, newLockTime) + } +} + +func (c *Controller) replicateRowData(ctx context.Context, row filedata.Row) error { + wantInBucketIDs := map[string]bool{} + wantInBucketIDs[c.S3Config.GetBucketID(row.Type)] = true + rep := c.S3Config.GetReplicatedBuckets(row.Type) + for _, bucket := range rep { + wantInBucketIDs[bucket] = true + } + delete(wantInBucketIDs, row.LatestBucket) + for _, bucket := range row.ReplicatedBuckets { + delete(wantInBucketIDs, bucket) + } + if len(wantInBucketIDs) > 0 { + s3FileMetadata, err := c.downloadObject(ctx, row.S3FileMetadataObjectKey(), row.LatestBucket) + if err != nil { + return stacktrace.Propagate(err, "error fetching metadata object "+row.S3FileMetadataObjectKey()) + } + for bucketID := range wantInBucketIDs { + if err := c.uploadAndVerify(ctx, row, s3FileMetadata, bucketID); err != nil { + return stacktrace.Propagate(err, "error uploading and verifying metadata object") + } + } + } else { + log.Infof("No replication pending for file %d and type %s", row.FileID, string(row.Type)) + } + return c.Repo.MarkReplicationAsDone(ctx, row) +} + +func (c *Controller) uploadAndVerify(ctx context.Context, row filedata.Row, s3FileMetadata filedata.S3FileMetadata, dstBucketID string) error { + if err := c.Repo.RegisterReplicationAttempt(ctx, row, dstBucketID); err != nil { + return stacktrace.Propagate(err, "could not register replication attempt") + } + metadataSize, err := c.uploadObject(s3FileMetadata, row.S3FileMetadataObjectKey(), dstBucketID) + if err != nil { + return err + } + if metadataSize != row.Size { + return fmt.Errorf("uploaded metadata size %d does not match expected size %d", metadataSize, row.Size) + } + return c.Repo.MoveBetweenBuckets(row, dstBucketID, fileDataRepo.InflightRepColumn, fileDataRepo.ReplicationColumn) +} diff --git a/server/pkg/controller/filedata/s3.go b/server/pkg/controller/filedata/s3.go new file mode 100644 index 0000000000..e6f1200bf8 --- /dev/null +++ b/server/pkg/controller/filedata/s3.go @@ -0,0 +1,112 @@ +package filedata + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/ente-io/museum/ente" + fileData "github.com/ente-io/museum/ente/filedata" + "github.com/ente-io/stacktrace" + log "github.com/sirupsen/logrus" + stime "time" +) + +const PreSignedRequestValidityDuration = 7 * 24 * stime.Hour + +func (c *Controller) getUploadURL(dc string, objectKey string) (*ente.UploadURL, error) { + s3Client := c.S3Config.GetS3Client(dc) + r, _ := s3Client.PutObjectRequest(&s3.PutObjectInput{ + Bucket: c.S3Config.GetBucket(dc), + Key: &objectKey, + }) + url, err := r.Presign(PreSignedRequestValidityDuration) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + err = c.ObjectCleanupController.AddTempObjectKey(objectKey, dc) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return &ente.UploadURL{ + ObjectKey: objectKey, + URL: url, + }, nil +} + +func (c *Controller) signedUrlGet(dc string, objectKey string) (*ente.UploadURL, error) { + s3Client := c.S3Config.GetS3Client(dc) + r, _ := s3Client.GetObjectRequest(&s3.GetObjectInput{ + Bucket: c.S3Config.GetBucket(dc), + Key: &objectKey, + }) + url, err := r.Presign(PreSignedRequestValidityDuration) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return &ente.UploadURL{ObjectKey: objectKey, URL: url}, nil +} + +func (c *Controller) downloadObject(ctx context.Context, objectKey string, dc string) (fileData.S3FileMetadata, error) { + var obj fileData.S3FileMetadata + buff := &aws.WriteAtBuffer{} + bucket := c.S3Config.GetBucket(dc) + downloader := c.downloadManagerCache[dc] + _, err := downloader.DownloadWithContext(ctx, buff, &s3.GetObjectInput{ + Bucket: bucket, + Key: &objectKey, + }) + if err != nil { + return obj, err + } + err = json.Unmarshal(buff.Bytes(), &obj) + if err != nil { + return obj, stacktrace.Propagate(err, "unmarshal failed") + } + return obj, nil +} + +// uploadObject uploads the embedding object to the object store and returns the object size +func (c *Controller) uploadObject(obj fileData.S3FileMetadata, objectKey string, dc string) (int64, error) { + embeddingObj, _ := json.Marshal(obj) + s3Client := c.S3Config.GetS3Client(dc) + s3Bucket := c.S3Config.GetBucket(dc) + uploader := s3manager.NewUploaderWithClient(&s3Client) + up := s3manager.UploadInput{ + Bucket: s3Bucket, + Key: &objectKey, + Body: bytes.NewReader(embeddingObj), + } + result, err := uploader.Upload(&up) + if err != nil { + log.Error(err) + return -1, stacktrace.Propagate(err, "") + } + log.Infof("Uploaded to bucket %s", result.Location) + return int64(len(embeddingObj)), nil +} + +// copyObject copies the object from srcObjectKey to destObjectKey in the same bucket and returns the object size +func (c *Controller) copyObject(srcObjectKey string, destObjectKey string, bucketID string) error { + bucket := c.S3Config.GetBucket(bucketID) + s3Client := c.S3Config.GetS3Client(bucketID) + copySource := fmt.Sprintf("%s/%s", *bucket, srcObjectKey) + copyInput := &s3.CopyObjectInput{ + Bucket: bucket, + CopySource: ©Source, + Key: aws.String(destObjectKey), + } + + _, err := s3Client.CopyObject(copyInput) + if err != nil { + return fmt.Errorf("failed to copy (%s) from %s to %s: %v", bucketID, srcObjectKey, destObjectKey, err) + } + log.Infof("Copied (%s) from %s to %s", bucketID, srcObjectKey, destObjectKey) + return nil +} diff --git a/server/pkg/controller/object_cleanup.go b/server/pkg/controller/object_cleanup.go index 91426cb56c..9a6a6055ce 100644 --- a/server/pkg/controller/object_cleanup.go +++ b/server/pkg/controller/object_cleanup.go @@ -166,8 +166,10 @@ func (c *ObjectCleanupController) removeUnreportedObjects() int { func (c *ObjectCleanupController) removeUnreportedObject(tx *sql.Tx, t ente.TempObject) error { // TODO: object_cleanup // This should use the DC from TempObject (once we start persisting it) - // dc := t.DataCenter - dc := c.S3Config.GetHotDataCenter() + dc := t.BucketId + if dc == "" { + dc = c.S3Config.GetHotDataCenter() + } logger := log.WithFields(log.Fields{ "task": "remove-unreported-objects", @@ -232,7 +234,7 @@ func (c *ObjectCleanupController) addCleanupEntryForObjectKey(objectKey string, err := c.Repo.AddTempObject(ente.TempObject{ ObjectKey: objectKey, IsMultipart: false, - DataCenter: dc, + BucketId: dc, }, expirationTime) return stacktrace.Propagate(err, "") } @@ -247,7 +249,7 @@ func (c *ObjectCleanupController) AddMultipartTempObjectKey(objectKey string, up ObjectKey: objectKey, IsMultipart: true, UploadID: uploadID, - DataCenter: dc, + BucketId: dc, }, expiry) return stacktrace.Propagate(err, "") } diff --git a/server/pkg/controller/replication3.go b/server/pkg/controller/replication3.go index 61062c0e0a..99c81837a6 100644 --- a/server/pkg/controller/replication3.go +++ b/server/pkg/controller/replication3.go @@ -280,7 +280,7 @@ func (c *ReplicationController3) tryReplicate() error { return done(nil) } - err = ensureSufficientSpace(ob.Size) + err = file.EnsureSufficientSpace(ob.Size) if err != nil { // We don't have free space right now, maybe because other big files are // being downloaded simultanously, but we might get space later, so mark @@ -331,25 +331,6 @@ func (c *ReplicationController3) tryReplicate() error { return done(err) } -// Return an error if we risk running out of disk space if we try to download -// and write a file of size. -// -// This function keeps a buffer of 1 GB free space in its calculations. -func ensureSufficientSpace(size int64) error { - free, err := file.FreeSpace("/") - if err != nil { - return stacktrace.Propagate(err, "Failed to fetch free space") - } - - gb := uint64(1024) * 1024 * 1024 - need := uint64(size) + (2 * gb) - if free < need { - return fmt.Errorf("insufficient space on disk (need %d bytes, free %d bytes)", size, free) - } - - return nil -} - // Create a temporary file for storing objectKey. Return both the path to the // file, and the handle to the file. // diff --git a/server/pkg/repo/filedata/repository.go b/server/pkg/repo/filedata/repository.go new file mode 100644 index 0000000000..9f5dc5cf3f --- /dev/null +++ b/server/pkg/repo/filedata/repository.go @@ -0,0 +1,259 @@ +package filedata + +import ( + "context" + "database/sql" + "fmt" + "github.com/ente-io/museum/ente" + "github.com/ente-io/museum/ente/filedata" + "github.com/ente-io/museum/pkg/utils/array" + "github.com/ente-io/stacktrace" + "github.com/lib/pq" + "time" +) + +// Repository defines the methods for inserting, updating, and retrieving file data. +type Repository struct { + DB *sql.DB +} + +const ( + ReplicationColumn = "replicated_buckets" + DeletionColumn = "delete_from_buckets" + InflightRepColumn = "inflight_rep_buckets" +) + +func (r *Repository) InsertOrUpdate(ctx context.Context, data filedata.Row) error { + query := ` + INSERT INTO file_data + (file_id, user_id, data_type, size, latest_bucket) + VALUES + ($1, $2, $3, $4, $5) + ON CONFLICT (file_id, data_type) + DO UPDATE SET + size = EXCLUDED.size, + delete_from_buckets = array( + SELECT DISTINCT elem FROM unnest( + array_append( + array_cat(array_cat(file_data.replicated_buckets, file_data.delete_from_buckets), file_data.inflight_rep_buckets), + CASE WHEN file_data.latest_bucket != EXCLUDED.latest_bucket THEN file_data.latest_bucket END + ) + ) AS elem + WHERE elem IS NOT NULL AND elem != EXCLUDED.latest_bucket + ), + replicated_buckets = ARRAY[]::s3region[], + latest_bucket = EXCLUDED.latest_bucket, + updated_at = now_utc_micro_seconds() + WHERE file_data.is_deleted = false` + _, err := r.DB.ExecContext(ctx, query, + data.FileID, data.UserID, string(data.Type), data.Size, data.LatestBucket) + if err != nil { + return stacktrace.Propagate(err, "failed to insert file data") + } + return nil +} + +func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fileIDs []int64) ([]filedata.Row, error) { + rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at + FROM file_data + WHERE data_type = $1 AND file_id = ANY($2)`, string(oType), pq.Array(fileIDs)) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return convertRowsToFilesData(rows) +} + +func (r *Repository) GetFileData(ctx context.Context, fileIDs int64) ([]filedata.Row, error) { + rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets,inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at + FROM file_data + WHERE file_id = $1`, fileIDs) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return convertRowsToFilesData(rows) +} + +func (r *Repository) AddBucket(row filedata.Row, bucketID string, columnName string) error { + query := fmt.Sprintf(` + UPDATE file_data + SET %s = array( + SELECT DISTINCT elem FROM unnest( + array_append(file_data.%s, $1) + ) AS elem + ) + WHERE file_id = $2 AND data_type = $3 and user_id = $4`, columnName, columnName) + result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID) + if err != nil { + return stacktrace.Propagate(err, "failed to add bucket to "+columnName) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return stacktrace.Propagate(err, "") + } + if rowsAffected == 0 { + return stacktrace.NewError("bucket not added to " + columnName) + } + return nil +} + +func (r *Repository) RemoveBucket(row filedata.Row, bucketID string, columnName string) error { + query := fmt.Sprintf(` + UPDATE file_data + SET %s = array( + SELECT DISTINCT elem FROM unnest( + array_remove( + file_data.%s, + $1 + ) + ) AS elem + WHERE elem IS NOT NULL + ) + WHERE file_id = $2 AND data_type = $3 and user_id = $4`, columnName, columnName) + result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID) + if err != nil { + return stacktrace.Propagate(err, "failed to remove bucket from "+columnName) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return stacktrace.Propagate(err, "") + } + if rowsAffected == 0 { + return stacktrace.NewError("bucket not removed from " + columnName) + } + return nil +} + +func (r *Repository) MoveBetweenBuckets(row filedata.Row, bucketID string, sourceColumn string, destColumn string) error { + query := fmt.Sprintf(` + UPDATE file_data + SET %s = array( + SELECT DISTINCT elem FROM unnest( + array_append( + file_data.%s, + $1 + ) + ) AS elem + WHERE elem IS NOT NULL + ), + %s = array( + SELECT DISTINCT elem FROM unnest( + array_remove( + file_data.%s, + $1 + ) + ) AS elem + WHERE elem IS NOT NULL + ) + WHERE file_id = $2 AND data_type = $3 and user_id = $4`, destColumn, destColumn, sourceColumn, sourceColumn) + result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID) + if err != nil { + return stacktrace.Propagate(err, "failed to move bucket from "+sourceColumn+" to "+destColumn) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return stacktrace.Propagate(err, "") + } + if rowsAffected == 0 { + return stacktrace.NewError("bucket not moved from " + sourceColumn + " to " + destColumn) + } + return nil +} + +// GetPendingSyncDataAndExtendLock in a transaction gets single file data row that has been deleted and pending sync is true and sync_lock_till is less than now_utc_micro_seconds() and extends the lock till newSyncLockTime +// This is used to lock the file data row for deletion and extend +func (r *Repository) GetPendingSyncDataAndExtendLock(ctx context.Context, newSyncLockTime int64, forDeletion bool) (*filedata.Row, error) { + // ensure newSyncLockTime is in the future + if newSyncLockTime < time.Now().Add(5*time.Minute).UnixMicro() { + return nil, stacktrace.NewError("newSyncLockTime should be at least 5min in the future") + } + tx, err := r.DB.BeginTx(ctx, nil) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + defer tx.Rollback() + row := tx.QueryRow(`SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at + FROM file_data + where pending_sync = true and is_deleted = $1 and sync_locked_till < now_utc_micro_seconds() + LIMIT 1 + FOR UPDATE SKIP LOCKED`, forDeletion) + var fileData filedata.Row + err = row.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + if fileData.SyncLockedTill > newSyncLockTime { + return nil, stacktrace.NewError(fmt.Sprintf("newSyncLockTime (%d) is less than existing SyncLockedTill(%d), newSync", newSyncLockTime, fileData.SyncLockedTill)) + } + _, err = tx.Exec(`UPDATE file_data SET sync_locked_till = $1 WHERE file_id = $2 AND data_type = $3 AND user_id = $4`, newSyncLockTime, fileData.FileID, string(fileData.Type), fileData.UserID) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + err = tx.Commit() + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return &fileData, nil +} + +// MarkReplicationAsDone marks the pending_sync as false for the file data row +func (r *Repository) MarkReplicationAsDone(ctx context.Context, row filedata.Row) error { + query := `UPDATE file_data SET pending_sync = false WHERE is_deleted=true and file_id = $1 AND data_type = $2 AND user_id = $3` + _, err := r.DB.ExecContext(ctx, query, row.FileID, string(row.Type), row.UserID) + if err != nil { + return stacktrace.Propagate(err, "") + } + return nil +} + +func (r *Repository) RegisterReplicationAttempt(ctx context.Context, row filedata.Row, dstBucketID string) error { + if array.StringInList(dstBucketID, row.DeleteFromBuckets) { + return r.MoveBetweenBuckets(row, dstBucketID, DeletionColumn, InflightRepColumn) + } + if !array.StringInList(dstBucketID, row.InflightReplicas) { + return r.AddBucket(row, dstBucketID, InflightRepColumn) + } + return nil +} + +// ResetSyncLock resets the sync_locked_till to now_utc_micro_seconds() for the file data row only if pending_sync is false and +// the input syncLockedTill is equal to the existing sync_locked_till. This is used to reset the lock after the replication is done +func (r *Repository) ResetSyncLock(ctx context.Context, row filedata.Row, syncLockedTill int64) error { + query := `UPDATE file_data SET sync_locked_till = now_utc_micro_seconds() WHERE pending_sync = false and file_id = $1 AND data_type = $2 AND user_id = $3 AND sync_locked_till = $4` + _, err := r.DB.ExecContext(ctx, query, row.FileID, string(row.Type), row.UserID, syncLockedTill) + if err != nil { + return stacktrace.Propagate(err, "") + } + return nil +} + +func (r *Repository) DeleteFileData(ctx context.Context, row filedata.Row) error { + query := ` + DELETE FROM file_data + WHERE file_id = $1 AND data_type = $2 AND latest_bucket = $3 AND user_id = $4 + AND replicated_buckets = ARRAY[]::s3region[] AND delete_from_buckets = ARRAY[]::s3region[] and inflight_rep_buckets = ARRAY[]::s3region[] and is_deleted=True` + res, err := r.DB.ExecContext(ctx, query, row.FileID, string(row.Type), row.LatestBucket, row.UserID) + if err != nil { + return stacktrace.Propagate(err, "") + } + rowsAffected, err := res.RowsAffected() + if err != nil { + return stacktrace.Propagate(err, "") + } + if rowsAffected == 0 { + return stacktrace.NewError("file data not deleted") + } + return nil +} + +func convertRowsToFilesData(rows *sql.Rows) ([]filedata.Row, error) { + var filesData []filedata.Row + for rows.Next() { + var fileData filedata.Row + err := rows.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + filesData = append(filesData, fileData) + } + return filesData, nil +} diff --git a/server/pkg/repo/object.go b/server/pkg/repo/object.go index 380a68126d..b84dd38c60 100644 --- a/server/pkg/repo/object.go +++ b/server/pkg/repo/object.go @@ -148,6 +148,10 @@ func (repo *ObjectRepository) MarkObjectsAsDeletedForFileIDs(ctx context.Context for _, fileID := range fileIDs { embeddingsToBeDeleted = append(embeddingsToBeDeleted, strconv.FormatInt(fileID, 10)) } + _, err = tx.ExecContext(ctx, `UPDATE file_data SET is_deleted = TRUE, pending_sync = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs)) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } err = repo.QueueRepo.AddItems(ctx, tx, DeleteEmbeddingsQueue, embeddingsToBeDeleted) if err != nil { diff --git a/server/pkg/repo/object_cleanup.go b/server/pkg/repo/object_cleanup.go index 7074121381..b78910d052 100644 --- a/server/pkg/repo/object_cleanup.go +++ b/server/pkg/repo/object_cleanup.go @@ -25,8 +25,8 @@ type ObjectCleanupRepository struct { func (repo *ObjectCleanupRepository) AddTempObject(tempObject ente.TempObject, expirationTime int64) error { var err error if tempObject.IsMultipart { - _, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time,upload_id,is_multipart) - VALUES($1, $2, $3, $4)`, tempObject.ObjectKey, expirationTime, tempObject.UploadID, tempObject.IsMultipart) + _, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time,upload_id,is_multipart, bucket_id) + VALUES($1, $2, $3, $4)`, tempObject.ObjectKey, expirationTime, tempObject.UploadID, tempObject.IsMultipart, tempObject.BucketId) } else { _, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time) VALUES($1, $2)`, tempObject.ObjectKey, expirationTime) @@ -62,7 +62,7 @@ func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente } rows, err := tx.Query(` - SELECT object_key, is_multipart, upload_id FROM temp_objects + SELECT object_key, is_multipart, upload_id, bucket_id FROM temp_objects WHERE expiration_time <= $1 LIMIT 1000 FOR UPDATE SKIP LOCKED @@ -83,7 +83,8 @@ func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente for rows.Next() { var tempObject ente.TempObject var uploadID sql.NullString - err := rows.Scan(&tempObject.ObjectKey, &tempObject.IsMultipart, &uploadID) + var bucketID sql.NullString + err := rows.Scan(&tempObject.ObjectKey, &tempObject.IsMultipart, &uploadID, &bucketID) if err != nil { rollback() return nil, nil, stacktrace.Propagate(err, "") @@ -91,6 +92,9 @@ func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente if tempObject.IsMultipart { tempObject.UploadID = uploadID.String } + if bucketID.Valid { + tempObject.BucketId = bucketID.String + } tempObjects = append(tempObjects, tempObject) } return tx, tempObjects, nil diff --git a/server/pkg/utils/file/file.go b/server/pkg/utils/file/file.go index db94347026..5e1872e59c 100644 --- a/server/pkg/utils/file/file.go +++ b/server/pkg/utils/file/file.go @@ -35,6 +35,24 @@ func FreeSpace(path string) (uint64, error) { return fs.Bfree * uint64(fs.Bsize), nil } +// EnsureSufficientSpace Return an error if we risk running out of disk space if we try to download +// and write a file of size. +// This function keeps a buffer of 2 GB free space in its calculations. +func EnsureSufficientSpace(size int64) error { + free, err := FreeSpace("/") + if err != nil { + return stacktrace.Propagate(err, "Failed to fetch free space") + } + + gb := uint64(1024) * 1024 * 1024 + need := uint64(size) + (2 * gb) + if free < need { + return fmt.Errorf("insufficient space on disk (need %d bytes, free %d bytes)", size, free) + } + + return nil +} + func GetLockNameForObject(objectKey string) string { return fmt.Sprintf("Object:%s", objectKey) } diff --git a/server/pkg/utils/s3config/filedata.go b/server/pkg/utils/s3config/filedata.go new file mode 100644 index 0000000000..5f70cba043 --- /dev/null +++ b/server/pkg/utils/s3config/filedata.go @@ -0,0 +1,45 @@ +package s3config + +import ( + "fmt" + "github.com/ente-io/museum/ente" + "strings" +) + +type ObjectBucketConfig struct { + PrimaryBucket string `mapstructure:"primaryBucket"` + ReplicaBuckets []string `mapstructure:"replicaBuckets"` +} + +type FileDataConfig struct { + ObjectBucketConfig map[string]ObjectBucketConfig `mapstructure:"file-data-config"` +} + +func (f FileDataConfig) HasConfig(objectType ente.ObjectType) bool { + if objectType == "" || objectType == ente.FILE || objectType == ente.THUMBNAIL { + panic(fmt.Sprintf("Unsupported object type: %s", objectType)) + } + + _, ok := f.ObjectBucketConfig[key(objectType)] + return ok +} + +func (f FileDataConfig) GetPrimaryBucketID(objectType ente.ObjectType) string { + config, ok := f.ObjectBucketConfig[key(objectType)] + if !ok { + panic(fmt.Sprintf("No config for object type: %s, use HasConfig", key(objectType))) + } + return config.PrimaryBucket +} + +func (f FileDataConfig) GetReplicaBuckets(objectType ente.ObjectType) []string { + config, ok := f.ObjectBucketConfig[key(objectType)] + if !ok { + panic(fmt.Sprintf("No config for object type: %s, use HasConfig", key(objectType))) + } + return config.ReplicaBuckets +} + +func key(oType ente.ObjectType) string { + return strings.ToLower(string(oType)) +} diff --git a/server/pkg/utils/s3config/s3config.go b/server/pkg/utils/s3config/s3config.go index a562e51815..469d3fb632 100644 --- a/server/pkg/utils/s3config/s3config.go +++ b/server/pkg/utils/s3config/s3config.go @@ -1,10 +1,12 @@ package s3config import ( + "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + "github.com/ente-io/museum/ente" log "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -39,10 +41,18 @@ type S3Config struct { // Indicates if local minio buckets are being used. Enables various // debugging workarounds; not tested/intended for production. areLocalBuckets bool + + // FileDataConfig is the configuration for various file data. + // If for particular object type, the bucket is not specified, it will + // default to hotDC as the bucket with no replicas. Initially, this config won't support + // existing objectType (file, thumbnail) and will be used for new objectTypes. In the future, + // we can migrate existing objectTypes to this config. + fileDataConfig FileDataConfig } // # Datacenters -// +// Note: We are now renaming datacenter names to bucketID. Till the migration is completed, you will see usage of both +// terminology. // Below are some high level details about the three replicas ("data centers") // that are in use. There are a few other legacy ones too. // @@ -74,10 +84,11 @@ var ( dcWasabiEuropeCentral_v3 string = "wasabi-eu-central-2-v3" dcSCWEuropeFrance_v3 string = "scw-eu-fr-v3" dcWasabiEuropeCentralDerived string = "wasabi-eu-central-2-derived" + bucket5 string = "b5" + bucket6 string = "b6" ) // Number of days that the wasabi bucket is configured to retain objects. -// // We must wait at least these many days after removing the conditional hold // before we can delete the object. const WasabiObjectConditionalHoldDays = 21 @@ -89,9 +100,9 @@ func NewS3Config() *S3Config { } func (config *S3Config) initialize() { - dcs := [6]string{ + dcs := [8]string{ dcB2EuropeCentral, dcSCWEuropeFranceLockedDeprecated, dcWasabiEuropeCentralDeprecated, - dcWasabiEuropeCentral_v3, dcSCWEuropeFrance_v3, dcWasabiEuropeCentralDerived} + dcWasabiEuropeCentral_v3, dcSCWEuropeFrance_v3, dcWasabiEuropeCentralDerived, bucket5, bucket6} config.hotDC = dcB2EuropeCentral config.secondaryHotDC = dcWasabiEuropeCentral_v3 @@ -118,7 +129,6 @@ func (config *S3Config) initialize() { config.areLocalBuckets = areLocalBuckets for _, dc := range dcs { - config.buckets[dc] = viper.GetString("s3." + dc + ".bucket") config.buckets[dc] = viper.GetString("s3." + dc + ".bucket") s3Config := aws.Config{ Credentials: credentials.NewStaticCredentials(viper.GetString("s3."+dc+".key"), @@ -133,30 +143,60 @@ func (config *S3Config) initialize() { s3Config.DisableSSL = aws.Bool(true) s3Config.S3ForcePathStyle = aws.Bool(true) } - session, err := session.NewSession(&s3Config) + s3Session, err := session.NewSession(&s3Config) if err != nil { log.Fatal("Could not create session for " + dc) } - s3Client := *s3.New(session) + s3Client := *s3.New(s3Session) config.s3Configs[dc] = &s3Config config.s3Clients[dc] = s3Client if dc == dcWasabiEuropeCentral_v3 { config.isWasabiComplianceEnabled = viper.GetBool("s3." + dc + ".compliance") } } + + if err := viper.Sub("s3").Unmarshal(&config.fileDataConfig); err != nil { + log.Fatalf("Unable to decode into struct: %v\n", err) + return + } + } -func (config *S3Config) GetBucket(dc string) *string { - bucket := config.buckets[dc] +func (config *S3Config) GetBucket(dcOrBucketID string) *string { + bucket := config.buckets[dcOrBucketID] return &bucket } -func (config *S3Config) GetS3Config(dc string) *aws.Config { - return config.s3Configs[dc] +// GetBucketID returns the bucket ID for the given object type. Note: existing dc are renamed as bucketID +func (config *S3Config) GetBucketID(oType ente.ObjectType) string { + if config.fileDataConfig.HasConfig(oType) { + return config.fileDataConfig.GetPrimaryBucketID(oType) + } + if oType == ente.MlData || oType == ente.PreviewVideo || oType == ente.PreviewImage { + return config.derivedStorageDC + } + panic(fmt.Sprintf("ops not supported for type: %s", oType)) +} +func (config *S3Config) GetReplicatedBuckets(oType ente.ObjectType) []string { + if config.fileDataConfig.HasConfig(oType) { + return config.fileDataConfig.GetReplicaBuckets(oType) + } + if oType == ente.MlData || oType == ente.PreviewVideo || oType == ente.PreviewImage { + return []string{} + } + panic(fmt.Sprintf("ops not supported for object type: %s", oType)) +} + +func (config *S3Config) IsBucketActive(bucketID string) bool { + return config.buckets[bucketID] != "" +} + +func (config *S3Config) GetS3Config(dcOrBucketID string) *aws.Config { + return config.s3Configs[dcOrBucketID] } -func (config *S3Config) GetS3Client(dc string) s3.S3 { - return config.s3Clients[dc] +func (config *S3Config) GetS3Client(dcOrBucketID string) s3.S3 { + return config.s3Clients[dcOrBucketID] } func (config *S3Config) GetHotDataCenter() string {