Skip to content

Commit

Permalink
Sync commits
Browse files Browse the repository at this point in the history
- Fix mirror progress bug
- Fix table sync_versions version seq bug
- Add proxy config for multi sync
- Add url style config for duckdb reader
- Make minio bucket lookup configurable
- Server handle git lfs push when file larger than config size
  • Loading branch information
pulltheflower committed Jan 22, 2025
1 parent 60937af commit 36cabcf
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 60 deletions.
9 changes: 9 additions & 0 deletions api/handler/git_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,17 @@ func (h *GitHTTPHandler) LfsUpload(ctx *gin.Context) {
uploadRequest.RepoType = types.RepositoryType(ctx.GetString("repo_type"))
uploadRequest.CurrentUser = httpbase.GetCurrentUser(ctx)

if uploadRequest.CurrentUser == "" {
httpbase.UnauthorizedError(ctx, component.ErrUnauthorized)
return
}

err = h.gitHttp.LfsUpload(ctx.Request.Context(), ctx.Request.Body, uploadRequest)
if err != nil {
slog.Error("Failed to upload lfs file", slog.Any("error", err), slog.String("namespace", uploadRequest.Namespace), slog.String("name", uploadRequest.Name), slog.String("oid", uploadRequest.Oid))
if errors.Is(err, component.ErrPermissionDenied) {
httpbase.UnauthorizedError(ctx, err)
}
httpbase.ServerError(ctx, err)
return
}
Expand Down
9 changes: 7 additions & 2 deletions builder/parquet/duckdb_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ type duckdbReader struct {

// NewS3Reader create a new reader to read from s3 compatible object storage service
func NewS3Reader(cfg *config.Config) (Reader, error) {
urlStyle := "vhost"
if cfg.S3.BucketLookup == "path" {
urlStyle = "path"
}
s3SetupSql := fmt.Sprintf(`
INSTALL httpfs;
LOAD httpfs;
SET s3_region = '%s';
SET s3_endpoint = '%s';
SET s3_url_style = 'vhost';
SET s3_access_key_id = '%s';
SET s3_secret_access_key = '%s';
`, cfg.S3.Region, cfg.S3.Endpoint, cfg.S3.AccessKeyID, cfg.S3.AccessKeySecret)
SET s3_use_ssl = %t;
SET s3_url_style = '%s';
`, cfg.S3.Region, cfg.S3.Endpoint, cfg.S3.AccessKeyID, cfg.S3.AccessKeySecret, cfg.S3.EnableSSL, urlStyle)
db, err := sql.Open("duckdb", "")
if err != nil {
return nil, fmt.Errorf("failed to connect to duckdb, cause:%w", err)
Expand Down
14 changes: 13 additions & 1 deletion builder/store/s3/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,23 @@ import (
"opencsg.com/csghub-server/common/config"
)

var bucketLookupMapping = map[string]minio.BucketLookupType{
"auto": minio.BucketLookupAuto,
"dns": minio.BucketLookupDNS,
"path": minio.BucketLookupPath,
}

func NewMinio(cfg *config.Config) (Client, error) {
var bucketLookupType minio.BucketLookupType
if val, ok := bucketLookupMapping[cfg.S3.BucketLookup]; ok {
bucketLookupType = val
} else {
bucketLookupType = minio.BucketLookupAuto
}
mClient, err := minio.New(cfg.S3.Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(cfg.S3.AccessKeyID, cfg.S3.AccessKeySecret, ""),
Secure: cfg.S3.EnableSSL,
BucketLookup: minio.BucketLookupAuto,
BucketLookup: bucketLookupType,
Region: cfg.S3.Region,
})
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,12 @@ type Config struct {
Region string `env:"STARHUB_SERVER_S3_REGION"`
Endpoint string `env:"STARHUB_SERVER_S3_ENDPOINT, default=localhost:9000"`
//for better performance of LFS downloading from s3. (can ignore if S3.Endpoint is alreay an internal domain or ip address)
InternalEndpoint string `env:"STARHUB_SERVER_S3_INTERNAL_ENDPOINT, default="`
Bucket string `env:"STARHUB_SERVER_S3_BUCKET, default=opencsg-test"`
EnableSSL bool `env:"STARHUB_SERVER_S3_ENABLE_SSL, default=false"`
InternalEndpoint string `env:"STARHUB_SERVER_S3_INTERNAL_ENDPOINT, default="`
Bucket string `env:"STARHUB_SERVER_S3_BUCKET, default=opencsg-test"`
EnableSSL bool `env:"STARHUB_SERVER_S3_ENABLE_SSL, default=false"`
URLUploadMaxFileSize int64 `env:"STARHUB_SERVER_S3_URL_UPLOAD_MAX_FILE_SIZE, default=5153960755"`
// BucketLookup type, can be "auto" "dns" or "path"
BucketLookup string `env:"STARHUB_SERVER_S3_BUCKET_LOOKUP, default=auto"`
}

SensitiveCheck struct {
Expand Down
1 change: 1 addition & 0 deletions common/config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ endpoint = "localhost:9000"
internal_endpoint = ""
bucket = "opencsg-test"
enable_ssl = false
url_upload_max_file_size = 5153960755

[sensitive_check]
enable = false
Expand Down
69 changes: 30 additions & 39 deletions component/git_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package component

import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -308,16 +306,24 @@ func (c *gitHTTPComponentImpl) buildObjectResponse(ctx context.Context, req type
}

func (c *gitHTTPComponentImpl) LfsUpload(ctx context.Context, body io.ReadCloser, req types.UploadRequest) error {
var exists bool
var exists, allowed bool
defer body.Close()
repo, err := c.repoStore.FindByPath(ctx, req.RepoType, req.Namespace, req.Name)
if err != nil {
return fmt.Errorf("failed to find repo, error: %w", err)
}

allowed, err = c.repoComponent.AllowWriteAccess(ctx, req.RepoType, req.Namespace, req.Name, req.CurrentUser)
if err != nil {
return err
}
if !allowed {
return ErrPermissionDenied
}

pointer := types.Pointer{Oid: req.Oid, Size: req.Size}

if !pointer.Valid() {
slog.Error("invalid lfs oid", slog.String("oid", req.Oid))
return errors.New("invalid lfs oid")
}

Expand All @@ -334,28 +340,7 @@ func (c *gitHTTPComponentImpl) LfsUpload(ctx context.Context, body io.ReadCloser
}
uploadOrVerify := func() error {
if exists {
allowed, err := c.repoComponent.AllowWriteAccess(ctx, req.RepoType, req.Namespace, req.Name, req.CurrentUser)
if err != nil {
slog.Error("Unable to check if LFS MetaObject [%s] is allowed. Error: %v", pointer.Oid, err)
return err
}
if !allowed {
// The file exists but the user has no access to it.
// The upload gets verified by hashing and size comparison to prove access to it.
hash := sha256.New()
written, err := io.Copy(hash, body)
if err != nil {
slog.Error("Error creating hash. Error", "error", err)
return err
}

if written != pointer.Size {
return types.ErrSizeMismatch
}
if hex.EncodeToString(hash.Sum(nil)) != pointer.Oid {
return types.ErrHashMismatch
}
}
return nil
} else {
var (
uploadErr error
Expand Down Expand Up @@ -399,7 +384,6 @@ func (c *gitHTTPComponentImpl) LfsUpload(ctx context.Context, body io.ReadCloser
})
return err
}
defer body.Close()
if err := uploadOrVerify(); err != nil {
if errors.Is(err, types.ErrSizeMismatch) || errors.Is(err, types.ErrHashMismatch) {
slog.Error("Upload does not match LFS MetaObject [%s]. Error: %v", pointer.Oid, err)
Expand Down Expand Up @@ -436,14 +420,17 @@ func (c *gitHTTPComponentImpl) LfsVerify(ctx context.Context, req types.VerifyRe
return types.ErrSizeMismatch
}

_, err = c.lfsMetaObjectStore.Create(ctx, database.LfsMetaObject{
Oid: p.Oid,
Size: p.Size,
RepositoryID: repo.ID,
Existing: true,
})
if err != nil {
return fmt.Errorf("failed to create lfs meta object in database: %w", err)
_, err = c.lfsMetaObjectStore.FindByOID(ctx, repo.ID, p.Oid)
if err != nil && errors.Is(err, sql.ErrNoRows) {
_, err = c.lfsMetaObjectStore.Create(ctx, database.LfsMetaObject{
Oid: p.Oid,
Size: p.Size,
RepositoryID: repo.ID,
Existing: true,
})
if err != nil {
return fmt.Errorf("failed to create lfs meta object in database: %w", err)
}
}

return nil
Expand Down Expand Up @@ -698,12 +685,16 @@ func (c *gitHTTPComponentImpl) buildDownloadLink(req types.BatchRequest, pointer
// }

func (c *gitHTTPComponentImpl) buildUploadLink(req types.BatchRequest, pointer types.Pointer) string {
objectKey := path.Join("lfs", pointer.RelativePath())
u, err := c.s3Client.PresignedPutObject(context.Background(), c.config.S3.Bucket, objectKey, time.Hour*24)
if err != nil {
if pointer.Size >= c.config.S3.URLUploadMaxFileSize {
return c.config.APIServer.PublicDomain + "/" + path.Join(fmt.Sprintf("%ss", req.RepoType), url.PathEscape(req.Namespace), url.PathEscape(req.Name+".git"), "info/lfs/objects", url.PathEscape(pointer.Oid), strconv.FormatInt(pointer.Size, 10))
} else {
objectKey := path.Join("lfs", pointer.RelativePath())
u, err := c.s3Client.PresignedPutObject(context.Background(), c.config.S3.Bucket, objectKey, time.Hour*24)
if err != nil {
return c.config.APIServer.PublicDomain + "/" + path.Join(fmt.Sprintf("%ss", req.RepoType), url.PathEscape(req.Namespace), url.PathEscape(req.Name+".git"), "info/lfs/objects", url.PathEscape(pointer.Oid), strconv.FormatInt(pointer.Size, 10))
}
return u.String()
}
return u.String()
}

func (c *gitHTTPComponentImpl) buildVerifyLink(req types.BatchRequest) string {
Expand Down
24 changes: 13 additions & 11 deletions component/git_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ func TestGitHTTPComponent_LfsUpload(t *testing.T) {
)
}

if exist {
gc.mocks.components.repo.EXPECT().AllowWriteAccess(
ctx, types.ModelRepo, "ns", "n", "user",
).Return(true, nil)
} else {
gc.mocks.components.repo.EXPECT().AllowWriteAccess(
ctx, types.ModelRepo, "ns", "n", "user",
).Return(true, nil)

if !exist {
gc.mocks.s3Client.EXPECT().PutObject(
ctx, "",
"lfs/a3/f8/e1b4f77bb24e508906c6972f81928f0d926e6daef1b29d12e348b8a3547e",
Expand All @@ -245,13 +245,14 @@ func TestGitHTTPComponent_LfsUpload(t *testing.T) {
ConcurrentStreamParts: true,
NumThreads: 5,
}).Return(minio.UploadInfo{Size: 100}, nil)

gc.mocks.stores.LfsMetaObjectMock().EXPECT().Create(ctx, database.LfsMetaObject{
Oid: oid,
Size: 100,
RepositoryID: 123,
Existing: true,
}).Return(nil, nil)
}
gc.mocks.stores.LfsMetaObjectMock().EXPECT().Create(ctx, database.LfsMetaObject{
Oid: oid,
Size: 100,
RepositoryID: 123,
Existing: true,
}).Return(nil, nil)

err := gc.LfsUpload(ctx, rc, types.UploadRequest{
Oid: oid,
Expand Down Expand Up @@ -281,6 +282,7 @@ func TestGitHTTPComponent_LfsVerify(t *testing.T) {
gc.mocks.s3Client.EXPECT().StatObject(ctx, "", "lfs/oid", minio.StatObjectOptions{}).Return(
minio.ObjectInfo{Size: 100}, nil,
)
gc.mocks.stores.LfsMetaObjectMock().EXPECT().FindByOID(ctx, int64(123), "oid").Return(nil, sql.ErrNoRows)
gc.mocks.stores.LfsMetaObjectMock().EXPECT().Create(ctx, database.LfsMetaObject{
Oid: "oid",
Size: 100,
Expand Down
18 changes: 14 additions & 4 deletions mirror/lfssyncer/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ func (w *MinioLFSSyncWorker) GetLFSDownloadURLs(ctx context.Context, mirror *dat
}

func (w *MinioLFSSyncWorker) DownloadAndUploadLFSFiles(ctx context.Context, mirror *database.Mirror, pointers []*types.Pointer) error {
var finishedLFSFileCount int
var (
finishedLFSFileCount int
success bool
)
lfsFilesCount := len(pointers)
for _, pointer := range pointers {
objectKey := filepath.Join("lfs", pointer.RelativePath())
Expand All @@ -245,6 +248,7 @@ func (w *MinioLFSSyncWorker) DownloadAndUploadLFSFiles(ctx context.Context, mirr
if (err != nil && err.Error() != "The specified key does not exist.") || fileInfo.Size != pointer.Size {
err = w.DownloadAndUploadLFSFile(ctx, mirror, pointer)
if err != nil {
success = false
slog.Error("failed to download and upload LFS file", slog.Any("error", err))
}
}
Expand All @@ -253,22 +257,28 @@ func (w *MinioLFSSyncWorker) DownloadAndUploadLFSFiles(ctx context.Context, mirr
Size: pointer.Size,
Oid: pointer.Oid,
RepositoryID: mirror.Repository.ID,
Existing: true,
Existing: success,
}
_, err = w.lfsMetaObjectStore.UpdateOrCreate(ctx, lfsMetaObject)
if err != nil {
slog.Error("failed to update or create LFS meta object", slog.Any("error", err))
return fmt.Errorf("failed to update or create LFS meta object: %w", err)
}
slog.Info("finish to download and upload LFS file", slog.Any("objectKey", objectKey))
finishedLFSFileCount += 1
if success {
finishedLFSFileCount += 1
}
mirror.Progress = int8(finishedLFSFileCount * 100 / lfsFilesCount)
err = w.mirrorStore.Update(ctx, mirror)
if err != nil {
return fmt.Errorf("failed to update mirror progress: %w", err)
}
}
mirror.Status = types.MirrorFinished
if mirror.Progress != 100 {
mirror.Status = types.MirrorIncomplete
} else {
mirror.Status = types.MirrorFinished
}
err := w.mirrorStore.Update(ctx, mirror)
if err != nil {
return fmt.Errorf("failed to update mirror status: %w", err)
Expand Down
1 change: 1 addition & 0 deletions mirror/reposyncer/local_woker.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (w *LocalMirrorWoker) SyncRepo(ctx context.Context, task queue.MirrorTask)
})
} else {
mirror.Status = types.MirrorFinished
mirror.Progress = 100
mirror.Repository.SyncStatus = types.SyncStatusCompleted
_, err = w.repoStore.UpdateRepo(ctx, *mirror.Repository)
if err != nil {
Expand Down

0 comments on commit 36cabcf

Please sign in to comment.