From 059b256bb1fa8c3f3d87b3e1872a53ba766512c5 Mon Sep 17 00:00:00 2001 From: Dweb Fan Date: Sun, 19 May 2024 21:57:45 -0700 Subject: [PATCH 1/2] fix encrypting multipart upload issue Signed-off-by: Dweb Fan --- cmd/lomob/crypt.go | 111 ++++++++++++++++++++++++++++++++++---- cmd/lomob/iso.go | 5 ++ cmd/lomob/main.go | 4 ++ cmd/lomob/upload-files.go | 2 +- cmd/lomob/upload-iso.go | 60 +++++++++++++++++---- common/io/crypto.go | 2 +- common/io/crypto_test.go | 61 +++++++++++++++++++++ common/io/readseeker.go | 6 +++ 8 files changed, 230 insertions(+), 21 deletions(-) diff --git a/cmd/lomob/crypt.go b/cmd/lomob/crypt.go index c75d450..dbd8409 100644 --- a/cmd/lomob/crypt.go +++ b/cmd/lomob/crypt.go @@ -8,10 +8,13 @@ import ( "io" "os" "reflect" + "strconv" "syscall" "github.com/lomorage/lomo-backup/common/crypto" + "github.com/lomorage/lomo-backup/common/datasize" lomohash "github.com/lomorage/lomo-backup/common/hash" + lomoio "github.com/lomorage/lomo-backup/common/io" "github.com/urfave/cli" "golang.org/x/term" ) @@ -65,11 +68,7 @@ func encryptCmd(ctx *cli.Context) error { return errors.New("usage: [input filename] [[output filename]]. If output filename is not given, it will be .enc") } - salt, err := genSalt(ifilename) - if err != nil { - return err - } - + var err error masterKey := ctx.String("encrypt-key") if masterKey == "" { masterKey, err = getMasterKey() @@ -78,6 +77,11 @@ func encryptCmd(ctx *cli.Context) error { } } + salt, err := genSalt(ifilename) + if err != nil { + return err + } + src, err := os.Open(ifilename) if err != nil { return err @@ -91,30 +95,117 @@ func encryptCmd(ctx *cli.Context) error { defer dst.Close() fmt.Printf("Start encrypt '%s', and save output to '%s'\n", ifilename, ofilename) - _, _, err = encryptLocalFile(src, dst, []byte(masterKey), salt, true) + + ps := ctx.String("part-size") + if ps == "" { + _, err = encryptLocalFile(src, dst, []byte(masterKey), salt, true) + if err != nil { + return err + } + + fmt.Println("Finish encryption!") + + return nil + } + + // Derive key from passphrase using Argon2 + // TODO: Using IV as salt for simplicity, change to different salt? + encryptKey := crypto.DeriveKeyFromMasterKey([]byte(masterKey), salt) + + partSize, err := datasize.ParseString(ps) + if err != nil { + return err + } + + stat, err := src.Stat() if err != nil { return err } + index := 1 + remaining := stat.Size() + var ( + start, end, curr, partLength int64 + encryptor *crypto.Encryptor + prs *lomoio.FilePartReadSeeker + ) + for curr = 0; remaining != 0; curr += partLength { + if remaining < int64(partSize) { + partLength = remaining + } else { + partLength = int64(partSize) + } + + if curr == 0 { + end = int64(int(partLength) - crypto.SaltLen()) + } else { + start = end + end += partLength + } + + // create a local tmpfile and save intermittent part + pf, err := os.Create(ofilename + ".part" + strconv.Itoa(index)) + if err != nil { + return err + } + defer pf.Close() + + mw := io.MultiWriter(dst, pf) + + if prs == nil { + prs = lomoio.NewFilePartReadSeeker(src, start, end) + } else { + prs.SetStartEnd(start, end) + } + + if encryptor == nil { + encryptor, err = crypto.NewEncryptor(prs, encryptKey, salt, false) + if err != nil { + return err + } + n, err := mw.Write(salt) + if err != nil { + return err + } + if n != len(salt) { + return fmt.Errorf("write %d byte salt while expecting %d", n, len(salt)) + } + } + + n, err := io.Copy(mw, encryptor) + if err != nil { + return err + } + + if n != end-start { + return fmt.Errorf("write %d byte salt while expecting %d btw [%d, %d]", n, end-start, start, end) + } + + fmt.Printf("Created '%s'\n", pf.Name()) + + index++ + remaining -= end - start + } + fmt.Println("Finish encryption!") return nil } -func encryptLocalFile(src io.ReadSeeker, dst io.Writer, masterKey, iv []byte, hasHeader bool) ([]byte, []byte, error) { +func encryptLocalFile(src io.ReadSeeker, dst io.Writer, masterKey, iv []byte, hasHeader bool) ([]byte, error) { // Derive key from passphrase using Argon2 // TODO: Using IV as salt for simplicity, change to different salt? encryptKey := crypto.DeriveKeyFromMasterKey(masterKey, iv) encryptor, err := crypto.NewEncryptor(src, encryptKey, iv, hasHeader) if err != nil { - return nil, nil, err + return nil, err } _, err = io.Copy(dst, encryptor) if err != nil { - return nil, nil, err + return nil, err } - return encryptor.GetHashOrig(), encryptor.GetHashEncrypt(), nil + return encryptor.GetHashEncrypt(), nil } func decryptLocalFile(ctx *cli.Context) error { diff --git a/cmd/lomob/iso.go b/cmd/lomob/iso.go index 847bc01..e7ad548 100644 --- a/cmd/lomob/iso.go +++ b/cmd/lomob/iso.go @@ -63,6 +63,8 @@ func mkISO(ctx *cli.Context) error { isoFilename = ctx.Args()[0] } + logrus.Infof("Total %d files (%s)", len(files), datasize.ByteSize(currentSizeNotInISO).HR()) + for { if currentSizeNotInISO < isoSize.Bytes() { currSize := datasize.ByteSize(currentSizeNotInISO) @@ -90,6 +92,9 @@ func mkISO(ctx *cli.Context) error { len(files)-len(leftFiles), datasize.ByteSize(size).HR(), filename, len(leftFiles), datasize.ByteSize(currentSizeNotInISO-size).HR()) + if len(leftFiles) == 0 { + return nil + } if len(ctx.Args()) > 0 { fmt.Println("Please supply another filename") return nil diff --git a/cmd/lomob/main.go b/cmd/lomob/main.go index b59a561..7c23ecc 100644 --- a/cmd/lomob/main.go +++ b/cmd/lomob/main.go @@ -390,6 +390,10 @@ func main() { Usage: "Master key to encrypt current upload file", EnvVar: "LOMOB_MASTER_KEY", }, + cli.StringFlag{ + Name: "part-size,p", + Usage: "Size of each upload partition. KB=1000 Byte. 0 means no part. Mainly for local test purpose", + }, }, }, { diff --git a/cmd/lomob/upload-files.go b/cmd/lomob/upload-files.go index 46b8af7..931982e 100644 --- a/cmd/lomob/upload-files.go +++ b/cmd/lomob/upload-files.go @@ -327,7 +327,7 @@ func uploadEncryptFileToS3(cli *clients.AWSClient, bucket, storageClass, filenam tmpFileName := tmpFile.Name() defer tmpFile.Close() - _, hash, err := encryptLocalFile(src, tmpFile, []byte(masterKey), salt, true) + hash, err := encryptLocalFile(src, tmpFile, []byte(masterKey), salt, true) if err != nil { return "", err } diff --git a/cmd/lomob/upload-iso.go b/cmd/lomob/upload-iso.go index 31c65f3..a2135c2 100644 --- a/cmd/lomob/upload-iso.go +++ b/cmd/lomob/upload-iso.go @@ -341,6 +341,11 @@ func uploadEncryptParts(cli *clients.AWSClient, region, bucket, storageClass, is } salt := decoded[:crypto.SaltLen()] + + // Derive key from passphrase using Argon2 + // TODO: Using IV as salt for simplicity, change to different salt? + encryptKey := crypto.DeriveKeyFromMasterKey([]byte(masterKey), salt) + // iso size need add salt block size so as to compare with remote size isoInfo.Size += crypto.SaltLen() isoInfo.HashRemote = "" @@ -356,8 +361,12 @@ func uploadEncryptParts(cli *clients.AWSClient, region, bucket, storageClass, is partsHash := [][]byte{} - var start, end int64 - var failParts []int + var ( + start, end int64 + failParts []int + encryptor *crypto.Encryptor + prs *lomoio.FilePartReadSeeker + ) for i, p := range parts { // add salt len for the last part if i == len(parts)-1 { @@ -392,18 +401,47 @@ func uploadEncryptParts(cli *clients.AWSClient, region, bucket, storageClass, is defer os.Remove(tmpFilename) defer tmpFile.Close() - prs := lomoio.NewFilePartReadSeeker(isoFile, start, end) - hl, hr, err := encryptLocalFile(prs, tmpFile, []byte(masterKey), salt, i == 0) + if prs == nil { + prs = lomoio.NewFilePartReadSeeker(isoFile, start, end) + } else { + prs.SetStartEnd(start, end) + } + + hr := sha256.New() + mw := io.MultiWriter(hr, tmpFile) + + if encryptor == nil { + encryptor, err = crypto.NewEncryptor(prs, encryptKey, salt, false) + if err != nil { + return err + } + n, err := mw.Write(salt) + if err != nil { + return err + } + if n != len(salt) { + return fmt.Errorf("write %d byte salt while expecting %d", n, len(salt)) + } + } + + n, err := io.Copy(mw, encryptor) if err != nil { return err } - p.SetHashLocal(hl) - p.SetHashRemote(hr) + if n != end-start { + return fmt.Errorf("write %d byte salt while expecting %d btw [%d, %d]", n, end-start, start, end) + } + + hrData := hr.Sum(nil) + p.SetHashRemote(hrData) + + // seek to beginning for upload _, err = tmpFile.Seek(0, io.SeekStart) if err != nil { return err } + p.Etag, err = cli.Upload(int64(p.PartNo), int64(p.Size), request, tmpFile, p.HashRemote) if err != nil { failParts = append(failParts, p.PartNo) @@ -415,7 +453,7 @@ func uploadEncryptParts(cli *clients.AWSClient, region, bucket, storageClass, is } continue } - partsHash = append(partsHash, hr) + partsHash = append(partsHash, hrData) err = db.UpdatePartEtagAndStatusHash(p.IsoID, p.PartNo, p.Etag, p.HashLocal, p.HashRemote, types.PartUploaded) if err != nil { logrus.Infof("Update %s's part number %d status %s:%s", isoFilename, p.PartNo, @@ -479,13 +517,17 @@ func uploadISO(accessKeyID, accessKey, region, bucket, storageClass, isoFilename } func uploadISOs(ctx *cli.Context) error { - partSize, err := datasize.ParseString(ctx.String("part-size")) + ps, err := datasize.ParseString(ctx.String("part-size")) if err != nil { return err } + partSize := int(ps) if partSize < 5*1024*1024 { return errors.New("part size must be larger than 5*1024*1024=5242880") } + if partSize%crypto.SaltLen() != 0 || (partSize-crypto.SaltLen())%crypto.SaltLen() != 0 { + return errors.Errorf("part size must be able to divided by salt length '%d'", crypto.SaltLen()) + } err = initDB(ctx.GlobalString("db")) if err != nil { @@ -525,7 +567,7 @@ func uploadISOs(ctx *cli.Context) error { for _, isoFilename := range ctx.Args() { err = uploadISO(accessKeyID, secretAccessKey, region, bucket, storageClass, - isoFilename, masterKey, int(partSize), saveParts, force) + isoFilename, masterKey, partSize, saveParts, force) if err != nil { return err } diff --git a/common/io/crypto.go b/common/io/crypto.go index ffde474..1ad6fa3 100644 --- a/common/io/crypto.go +++ b/common/io/crypto.go @@ -96,7 +96,7 @@ func (r *CryptoStreamReader) Read(p []byte) (n int, err error) { r.offset += n - r.stream.XORKeyStream(p, buf) + r.stream.XORKeyStream(p[:n], buf[:n]) _, err = r.hashEncrypt.Write(p[:n]) if err != nil { diff --git a/common/io/crypto_test.go b/common/io/crypto_test.go index 58cee57..ec61235 100644 --- a/common/io/crypto_test.go +++ b/common/io/crypto_test.go @@ -468,6 +468,67 @@ func TestCryptoStreamReaderSeekReadEncrypt(t *testing.T) { verifyCryptoReadSeek(t, expectFile, r, 101, -1, -1, io.SeekCurrent, expectStream) } +func TestCryptoStreamReaderEncryptLargeBuffer(t *testing.T) { + // use large buffer to read multiple times, and value should be same + nl := 16 + nonce := make([]byte, nl) + for i := 0; i < nl; i++ { + nonce[i] = byte(i) + } + + f, err := os.Open(testFilename) + require.Nil(t, err) + defer f.Close() + + key, _ := hex.DecodeString("6368616e676520746869732070617373") + + stream := getCryptoStream(t, key, nonce) + + prs := NewFilePartReadSeeker(f, 0, 100) + r, err := NewCryptoStreamReader(prs, nonce, stream) + require.Nil(t, err) + + expectFile, err := os.Open(testFilename) + require.Nil(t, err) + defer expectFile.Close() + + // initial read will return nonce + buf := make([]byte, 200) + n, err := r.Read(buf) + require.Nil(t, err) + require.EqualValues(t, len(nonce), n) + require.EqualValues(t, nonce, buf[:n]) + + expectStream := getCryptoStream(t, key, nonce) + + verifyCryptoLargeBuffer(t, expectFile, expectStream, 100, r) + + prs.SetStartEnd(100, 200) + verifyCryptoLargeBuffer(t, expectFile, expectStream, 100, r) + + prs.SetStartEnd(200, 201) + verifyCryptoLargeBuffer(t, expectFile, expectStream, 1, r) + + prs.SetStartEnd(201, 300) + verifyCryptoLargeBuffer(t, expectFile, expectStream, 99, r) +} + +func verifyCryptoLargeBuffer(t *testing.T, expectReader io.Reader, expectStream cipher.Stream, + expectLen int, stream *CryptoStreamReader) { + expectReadBuffer := make([]byte, expectLen) + expectBuffer := make([]byte, expectLen) + expectSize, err := expectReader.Read(expectReadBuffer) + require.Nil(t, err) + expectStream.XORKeyStream(expectBuffer, expectReadBuffer) + + buffer := make([]byte, expectLen+100) + size, err := stream.Read(buffer) + require.Nil(t, err, "read lengh: %d", size) + + require.Equal(t, expectSize, size) + require.Equal(t, expectBuffer, buffer[:size], "expect len: %d", expectLen) +} + func TestCryptoStreamReaderSeekReadEncryptNoNonce(t *testing.T) { nl := 16 nonce := make([]byte, nl) diff --git a/common/io/readseeker.go b/common/io/readseeker.go index 5b51cfa..4d3666b 100644 --- a/common/io/readseeker.go +++ b/common/io/readseeker.go @@ -23,6 +23,12 @@ func (prs *FilePartReadSeeker) Size() int64 { return prs.end - prs.start } +func (prs *FilePartReadSeeker) SetStartEnd(start, end int64) { + prs.start = start + prs.end = end + prs.current = start +} + func (prs *FilePartReadSeeker) Read(p []byte) (n int, err error) { currBegin := prs.current defer func() { From 3f6b7b869fdf2cf50aab6f7762d6903af6e5035c Mon Sep 17 00:00:00 2001 From: Dweb Fan Date: Sun, 19 May 2024 22:57:35 -0700 Subject: [PATCH 2/2] fix iso creation if files are not exist any more Signed-off-by: Dweb Fan --- cmd/lomob/iso.go | 55 ++++++++++++++++++++++++++--------- cmd/lomob/script/create_db.sh | 1 + common/dbx/iso.go | 17 +++++++++++ 3 files changed, 59 insertions(+), 14 deletions(-) diff --git a/cmd/lomob/iso.go b/cmd/lomob/iso.go index e7ad548..b24363d 100644 --- a/cmd/lomob/iso.go +++ b/cmd/lomob/iso.go @@ -84,13 +84,34 @@ func mkISO(ctx *cli.Context) error { datasize.ByteSize(iso.Size).HR()) } - size, filename, leftFiles, err := createIso(isoSize.Bytes(), isoFilename, scanRootDirs, files) + size, filename, leftFiles, notExistFiles, err := createIso(isoSize.Bytes(), isoFilename, scanRootDirs, files) if err != nil { return err } - logrus.Infof("%d files (%s) are added into %s, and %d files (%s) need to be added", - len(files)-len(leftFiles), datasize.ByteSize(size).HR(), filename, - len(leftFiles), datasize.ByteSize(currentSizeNotInISO-size).HR()) + if len(notExistFiles) == 0 { + logrus.Infof("%d files (%s) are added into %s, and %d files (%s) need to be added", + len(files)-len(leftFiles), datasize.ByteSize(size).HR(), filename, + len(leftFiles), datasize.ByteSize(currentSizeNotInISO-size).HR()) + } else { + fileIDs := bytes.Buffer{} + notExistSizes := 0 + for _, f := range notExistFiles { + notExistSizes += f.Size + fileIDs.WriteString(strconv.Itoa(f.ID)) + fileIDs.WriteString(",") + } + logrus.Infof("%d files (%s) are added into %s, %d files (%s) need to be added, %d files (%s) not exist", + len(files)-len(leftFiles), datasize.ByteSize(size).HR(), filename, + len(leftFiles), datasize.ByteSize(currentSizeNotInISO-size).HR(), + len(notExistFiles), datasize.ByteSize(notExistSizes).HR()) + + ids := fileIDs.String() + _, err = db.DeleteBatchFiles(strings.Trim(ids, ",")) + if err != nil { + return err + } + size += uint64(notExistSizes) + } if len(leftFiles) == 0 { return nil @@ -130,18 +151,19 @@ func createFileInStaging(srcFile, dstFile string) error { } func createIso(maxSize uint64, isoFilename string, scanRootDirs map[int]string, - files []*types.FileInfo) (uint64, string, []*types.FileInfo, error) { + files []*types.FileInfo) (uint64, string, []*types.FileInfo, []*types.FileInfo, error) { stagingDir, err := os.MkdirTemp("", "lomobackup-") if err != nil { - return 0, "", nil, err + return 0, "", nil, nil, err } defer os.RemoveAll(stagingDir) const seperater = ',' var ( - fileCount int - filesSize uint64 - end time.Time + fileCount int + filesSize uint64 + end time.Time + notExistFiles []*types.FileInfo ) start := futuretime fileIDs := bytes.Buffer{} @@ -169,6 +191,11 @@ func createIso(maxSize uint64, isoFilename string, scanRootDirs map[int]string, err = createFileInStaging(srcFile, dstFile) if err != nil { + if os.IsNotExist(err) { + notExistFiles = append(notExistFiles, f) + logrus.Warnf("'%s' not exist anymore", srcFile) + continue + } logrus.Warnf("Add %s into %s:%s: %s", srcFile, isoFilename, dstFile, err) continue } @@ -212,18 +239,18 @@ func createIso(maxSize uint64, isoFilename string, scanRootDirs map[int]string, stagingDir).CombinedOutput() if err != nil { fmt.Println(string(out)) - return 0, "", nil, err + return 0, "", nil, nil, err } fileInfo, err := os.Stat(isoFilename) if err != nil { - return 0, "", nil, err + return 0, "", nil, nil, err } isoInfo := &types.ISOInfo{Name: isoFilename, Size: int(fileInfo.Size())} hash, err := lomohash.CalculateHashFile(isoFilename) if err != nil { - return 0, "", nil, err + return 0, "", nil, nil, err } isoInfo.SetHashLocal(hash) // create db entry and update file info @@ -235,10 +262,10 @@ func createIso(maxSize uint64, isoFilename string, scanRootDirs map[int]string, } logrus.Infof("Takes %s to update iso_id for %d files in DB", time.Since(start).Truncate(time.Second).String(), count) - return filesSize, isoFilename, files[idx+1:], err + return filesSize, isoFilename, files[idx+1:], notExistFiles, err } - return filesSize, isoFilename, nil, nil + return filesSize, isoFilename, nil, nil, nil } func listISO(ctx *cli.Context) error { diff --git a/cmd/lomob/script/create_db.sh b/cmd/lomob/script/create_db.sh index b050e6c..c1d777a 100755 --- a/cmd/lomob/script/create_db.sh +++ b/cmd/lomob/script/create_db.sh @@ -1 +1,2 @@ +rm ../lomob.db sqlite3 ../lomob.db < ../../../common/dbx/schema/1.sql diff --git a/common/dbx/iso.go b/common/dbx/iso.go index 190bda6..549e19a 100644 --- a/common/dbx/iso.go +++ b/common/dbx/iso.go @@ -23,6 +23,8 @@ const ( updateBatchFilesIsoIDStmt = "update files set iso_id=%d where id in (%s)" updateFileIsoIDAndRemoteHashStmt = "update files set iso_id=?, hash_remote=? where id=?" + deleteBatchFilesStmt = "delete from files where id in (%s)" + getIsoByNameStmt = "select id, size, hash_local, hash_remote, region, bucket, upload_id, upload_key," + " create_time from isos where name=?" listIsosStmt = "select id, name, size, status, region, bucket, hash_local, hash_remote, create_time from isos" @@ -202,6 +204,21 @@ func (db *DB) CreateIsoWithFileIDs(iso *types.ISOInfo, fileIDs string) (int, int return int(isoID), int(updatedFiles), err } +func (db *DB) DeleteBatchFiles(fileIDs string) (int, error) { + var updatedFiles int64 + err := db.retryIfLocked(fmt.Sprintf("delete files %s", fileIDs), + func(tx *sql.Tx) error { + res, err := tx.Exec(fmt.Sprintf(deleteBatchFilesStmt, fileIDs)) + if err != nil { + return err + } + updatedFiles, err = res.RowsAffected() + return err + }, + ) + return int(updatedFiles), err +} + func (db *DB) ResetISOUploadInfo(isoFilename string) error { return db.retryIfLocked(fmt.Sprintf("reset iso %s upload info", isoFilename), func(tx *sql.Tx) error {