From 68a7a2b31d426adab24bed6e066cd947f7db9a0e Mon Sep 17 00:00:00 2001 From: Daniil Zakhlystov <47750602+usernamedt@users.noreply.github.com> Date: Fri, 20 Aug 2021 21:00:14 +0500 Subject: [PATCH] Add get/put/rm to storage tools (#1069) --- .github/workflows/dockertests.yml | 1 + Makefile | 6 ++ cmd/st/delete_object.go | 27 +++++++ cmd/st/get_object.go | 40 ++++++++++ cmd/st/put_object.go | 45 +++++++++++ docker-compose.yml | 14 ++++ docker/st_tests/Dockerfile | 34 ++++++++ .../scripts/tests/storage_tool_tests.sh | 54 +++++++++++++ docs/README.md | 37 ++++++++- internal/compression/compression.go | 5 ++ internal/fetch_helper.go | 43 ++++++---- .../storagetools/delete_object_handler.go | 17 ++++ internal/storagetools/get_object_handler.go | 79 +++++++++++++++++++ internal/storagetools/put_object_handler.go | 69 ++++++++++++++++ 14 files changed, 451 insertions(+), 20 deletions(-) create mode 100644 cmd/st/delete_object.go create mode 100644 cmd/st/get_object.go create mode 100644 cmd/st/put_object.go create mode 100644 docker/st_tests/Dockerfile create mode 100755 docker/st_tests/scripts/tests/storage_tool_tests.sh create mode 100644 internal/storagetools/delete_object_handler.go create mode 100644 internal/storagetools/get_object_handler.go create mode 100644 internal/storagetools/put_object_handler.go diff --git a/.github/workflows/dockertests.yml b/.github/workflows/dockertests.yml index 7514c3dd9..8624c5c40 100644 --- a/.github/workflows/dockertests.yml +++ b/.github/workflows/dockertests.yml @@ -121,6 +121,7 @@ jobs: 'make MYSQL_TEST=mysql_delete_tests mysql_integration_test', 'make MYSQL_TEST=mysql_copy_tests mysql_integration_test', 'make gp_test', + 'make st_test', ] # do not cancel all tests if one failed fail-fast: false diff --git a/Makefile b/Makefile index ef4ef905d..7276c016c 100644 --- a/Makefile +++ b/Makefile @@ -179,6 +179,12 @@ gp_integration_test: load_docker_common docker-compose build gp gp_tests docker-compose up --exit-code-from gp_tests gp_tests +st_test: deps pg_build unlink_brotli st_integration_test + +st_integration_test: load_docker_common + docker-compose build st_tests + docker-compose up --exit-code-from st_tests st_tests + unittest: go list ./... | grep -Ev 'vendor|submodules|tmp' | xargs go vet go test -mod vendor -v $(TEST_MODIFIER) -tags "$(BUILD_TAGS)" ./internal/ diff --git a/cmd/st/delete_object.go b/cmd/st/delete_object.go new file mode 100644 index 000000000..107ba85e2 --- /dev/null +++ b/cmd/st/delete_object.go @@ -0,0 +1,27 @@ +package st + +import ( + "github.com/spf13/cobra" + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/storagetools" +) + +const deleteObjectShortDescription = "Delete the specified storage object" + +// deleteObjectCmd represents the deleteObject command +var deleteObjectCmd = &cobra.Command{ + Use: "rm relative_object_path", + Short: deleteObjectShortDescription, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + folder, err := internal.ConfigureFolder() + tracelog.ErrorLogger.FatalOnError(err) + + storagetools.HandleDeleteObject(args[0], folder) + }, +} + +func init() { + StorageToolsCmd.AddCommand(deleteObjectCmd) +} diff --git a/cmd/st/get_object.go b/cmd/st/get_object.go new file mode 100644 index 000000000..65d2eb7e8 --- /dev/null +++ b/cmd/st/get_object.go @@ -0,0 +1,40 @@ +package st + +import ( + "github.com/spf13/cobra" + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/storagetools" +) + +const ( + getObjectShortDescription = "Download the specified storage object" + + noDecryptFlag = "no-decrypt" + noDecompressFlag = "no-decompress" +) + +// getObjectCmd represents the getObject command +var getObjectCmd = &cobra.Command{ + Use: "get relative_object_path destination_path", + Short: getObjectShortDescription, + Args: cobra.ExactArgs(2), + Run: func(cmd *cobra.Command, args []string) { + objectPath := args[0] + dstPath := args[1] + + folder, err := internal.ConfigureFolder() + tracelog.ErrorLogger.FatalOnError(err) + + storagetools.HandleGetObject(objectPath, dstPath, folder, !noDecrypt, !noDecompress) + }, +} + +var noDecrypt bool +var noDecompress bool + +func init() { + StorageToolsCmd.AddCommand(getObjectCmd) + getObjectCmd.Flags().BoolVar(&noDecrypt, noDecryptFlag, false, "Do not decrypt the object") + getObjectCmd.Flags().BoolVar(&noDecompress, noDecompressFlag, false, "Do not decompress the object") +} diff --git a/cmd/st/put_object.go b/cmd/st/put_object.go new file mode 100644 index 000000000..22e406a13 --- /dev/null +++ b/cmd/st/put_object.go @@ -0,0 +1,45 @@ +package st + +import ( + "github.com/spf13/cobra" + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/storagetools" +) + +const ( + putObjectShortDescription = "Upload the specified file to the storage" + + noEncryptFlag = "no-encrypt" + noCompressFlag = "no-compress" + overwriteFlag = "force" + overwriteShorthand = "f" +) + +// putObjectCmd represents the putObject command +var putObjectCmd = &cobra.Command{ + Use: "put local_path destination_path", + Short: putObjectShortDescription, + Args: cobra.ExactArgs(2), + Run: func(cmd *cobra.Command, args []string) { + uploader, err := internal.ConfigureUploader() + tracelog.ErrorLogger.FatalOnError(err) + + localPath := args[0] + dstPath := args[1] + + storagetools.HandlePutObject(localPath, dstPath, uploader, overwrite, !noEncrypt, !noCompress) + }, +} + +var noEncrypt bool +var noCompress bool +var overwrite bool + +func init() { + StorageToolsCmd.AddCommand(putObjectCmd) + putObjectCmd.Flags().BoolVar(&noEncrypt, noEncryptFlag, false, "Do not encrypt the object") + putObjectCmd.Flags().BoolVar(&noCompress, noCompressFlag, false, "Do not compress the object") + putObjectCmd.Flags().BoolVarP(&overwrite, overwriteFlag, overwriteShorthand, + false, "Overwrite the existing object") +} diff --git a/docker-compose.yml b/docker-compose.yml index 7bbde6faa..c8fde8d96 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,6 +73,7 @@ services: && mkdir -p /export/mysqldeletebinlogs && mkdir -p /export/archivereadyrename && mkdir -p /export/gpfullbucket + && mkdir -p /export/storagetoolsbucket && /usr/bin/minio server /export' s3-another: @@ -635,3 +636,16 @@ services: - s3 links: - s3 + + st_tests: + build: + dockerfile: docker/st_tests/Dockerfile + context: . + image: wal-g/st_tests + container_name: wal-g_st_tests + env_file: + - docker/common/common_walg.env + depends_on: + - s3 + links: + - s3 \ No newline at end of file diff --git a/docker/st_tests/Dockerfile b/docker/st_tests/Dockerfile new file mode 100644 index 000000000..64fb741ef --- /dev/null +++ b/docker/st_tests/Dockerfile @@ -0,0 +1,34 @@ +FROM wal-g/golang:latest as build + +WORKDIR /go/src/github.com/wal-g/wal-g + +RUN apt-get update && \ + apt-get install --yes --no-install-recommends --no-install-suggests \ + liblzo2-dev + +RUN ls + +COPY go.mod go.mod +COPY vendor/ vendor/ +COPY internal/ internal/ +COPY pkg/ pkg/ +COPY cmd/ cmd/ +COPY main/ main/ +COPY utility/ utility/ + +RUN sed -i 's|#cgo LDFLAGS: -lbrotli.*|&-static -lbrotlicommon-static -lm|' \ + vendor/github.com/google/brotli/go/cbrotli/cgo.go && \ + sed -i 's|\(#cgo LDFLAGS:\) .*|\1 -Wl,-Bstatic -llzo2 -Wl,-Bdynamic|' \ + vendor/github.com/cyberdelia/lzo/lzo.go && \ + cd main/pg && \ + go build -mod vendor -race -o wal-g -tags "brotli lzo" -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S`" + +FROM wal-g/ubuntu:latest + +RUN apt-get update && apt-get install --yes --no-install-recommends --no-install-suggests brotli + +COPY --from=build /go/src/github.com/wal-g/wal-g/main/pg/wal-g /usr/bin + +COPY docker/st_tests/scripts/ /tmp + +CMD /tmp/tests/storage_tool_tests.sh diff --git a/docker/st_tests/scripts/tests/storage_tool_tests.sh b/docker/st_tests/scripts/tests/storage_tool_tests.sh new file mode 100755 index 000000000..9b721335b --- /dev/null +++ b/docker/st_tests/scripts/tests/storage_tool_tests.sh @@ -0,0 +1,54 @@ +#!/bin/sh +set -e -x + +export WALE_S3_PREFIX=s3://storagetoolsbucket + +# Empty list on empty storage +test "1" -eq "$(wal-g st ls | wc -l)" + +# Generate and upload some file to storage +head -c 100M testfile +wal-g st put testfile testfolder/testfile + +# Should not upload the duplicate file by default +wal-g st put testfile testfolder/testfile && EXIT_STATUS=$? || EXIT_STATUS=$? + +if [ "$EXIT_STATUS" -eq 0 ] ; then + echo "Error: Duplicate object was uploaded without the -f flag" + exit 1 +fi + +# Should upload the duplicate file if -f flag is present +wal-g st put testfile testfolder/testfile -f + +wal-g st ls +# WAL-G should show the uploaded file in the wal-g st ls output +test "2" -eq "$(wal-g st ls | wc -l)" + +# WAL-G should be able to download the uploaded file +wal-g st get testfolder/testfile.br fetched_testfile + +# Downloaded file should be identical to the original one +diff testfile fetched_testfile +rm fetched_testfile + +# WAL-G should be able to download the uploaded file without decompression +wal-g st get testfolder/testfile.br uncompressed_testfile.br --no-decompress + +brotli --decompress uncompressed_testfile.br +diff testfile uncompressed_testfile +rm uncompressed_testfile + +# WAL-G should be able to delete the uploaded file +wal-g st rm testfolder/testfile.br + +# Should get empty storage after file removal +test "1" -eq "$(wal-g st ls | wc -l)" + +# Should upload the file uncompressed without error +wal-g st put testfile testfolder/testfile --no-compress + +# Should download the file uncompressed without error +wal-g st get testfolder/testfile uncompressed_file --no-decompress + +diff testfile uncompressed_file \ No newline at end of file diff --git a/docs/README.md b/docs/README.md index 7d41a25d8..28b583ca1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -158,19 +158,48 @@ If `FIND_FULL` is specified, WAL-G will calculate minimum backup needed to keep ``target FIND_FULL base_0000000100000000000000C9_D_0000000100000000000000C4`` delete delta backup and all delta backups with the same base backup +**More commands are available for the chosen database engine. See it in [Databases](#databases)** + ## Storage tools (danger zone) -Storage tools allow interacting with the configured storage. Be aware that these commands can do potentially harmful operations and make sure that you know what you're doing. +`wal-g st` command series allows interacting with the configured storage. Be aware that these commands can do potentially harmful operations and make sure that you know what you're doing. ### ``ls`` Prints listing of the objects in the provided storage folder. +``wal-g st ls`` get listing with all objects in the configured storage. + +``wal-g st ls some_folder/some_subfolder`` get listing with all objects in the provided storage path. + +### ``get`` +Download the specified storage object. By default, the command will try to apply the decompression and decryption (if configured). + +Flags: +1. Add `--no-decompress` to download the remote object without decompression +2. Add `--no-decrypt` to download the remote object without decryption + Examples: -``wal-g dh ls`` get listing with all objects in the configured storage. +``wal-g st get path/to/remote_file path/to/local_file`` download the file from storage. -``wal-g dh ls some_folder/some_subfolder`` get listing with all objects in the provided storage path. +``wal-g st get path/to/remote_file path/to/local_file --no-decrypt`` download the file from storage without decryption. -**More commands are available for the chosen database engine. See it in [Databases](#databases)** +### ``rm`` +Remove the specified storage object. + +Example: + +``wal-g st rm path/to/remote_file`` remove the file from storage. + +### ``put`` +Upload the specified file to the storage. By default, the command will try to apply the compression and encryption (if configured). + +Flags: +1. Add `--no-compress` to upload the object without compression +2. Add `--no-encrypt` to upload the object without encryption + +Example: + +``wal-g st put path/to/local_file path/to/remote_file`` upload the local file to storage. Databases ----------- diff --git a/internal/compression/compression.go b/internal/compression/compression.go index fa9de3640..c6085563c 100644 --- a/internal/compression/compression.go +++ b/internal/compression/compression.go @@ -19,6 +19,11 @@ func GetDecompressorByCompressor(compressor Compressor) Decompressor { } func FindDecompressor(fileExtension string) Decompressor { + // cut the leading '.' (e.g. ".lz4" => "lz4") + if len(fileExtension) > 0 && fileExtension[0] == '.' { + fileExtension = fileExtension[1:] + } + for _, decompressor := range Decompressors { if decompressor.FileExtension() == fileExtension { return decompressor diff --git a/internal/fetch_helper.go b/internal/fetch_helper.go index a2ded4e1e..fdafade74 100644 --- a/internal/fetch_helper.go +++ b/internal/fetch_helper.go @@ -9,10 +9,11 @@ import ( "os/user" "path/filepath" + "github.com/wal-g/wal-g/internal/ioextensions" + "github.com/pkg/errors" "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal/compression" - "github.com/wal-g/wal-g/internal/ioextensions" "github.com/wal-g/wal-g/pkg/storages/storage" "github.com/wal-g/wal-g/utility" ) @@ -66,29 +67,39 @@ func TryDownloadFile(folder storage.Folder, path string) (walFileReader io.ReadC // TODO : unit tests func DecompressDecryptBytes(dst io.Writer, archiveReader io.ReadCloser, decompressor compression.Decompressor) error { - crypter := ConfigureCrypter() - if crypter != nil { - tracelog.DebugLogger.Printf("Selected crypter: %s", crypter.Name()) - - reader, err := crypter.Decrypt(archiveReader) - if err != nil { - return fmt.Errorf("failed to init decrypt reader: %w", err) - } - archiveReader = ioextensions.ReadCascadeCloser{ - Reader: reader, - Closer: archiveReader, - } - } else { - tracelog.DebugLogger.Printf("No crypter has been selected") + decryptReadCloser, err := DecryptBytes(archiveReader) + if err != nil { + return err } - err := decompressor.Decompress(dst, archiveReader) + err = decompressor.Decompress(dst, decryptReadCloser) if err != nil { return fmt.Errorf("failed to decompress archive reader: %w", err) } return nil } +func DecryptBytes(archiveReader io.ReadCloser) (io.ReadCloser, error) { + crypter := ConfigureCrypter() + if crypter == nil { + tracelog.DebugLogger.Printf("No crypter has been selected") + return archiveReader, nil + } + + tracelog.DebugLogger.Printf("Selected crypter: %s", crypter.Name()) + + decryptReader, err := crypter.Decrypt(archiveReader) + if err != nil { + return nil, fmt.Errorf("failed to init decrypt reader: %w", err) + } + decryptReadCloser := ioextensions.ReadCascadeCloser{ + Reader: decryptReader, + Closer: archiveReader, + } + + return decryptReadCloser, nil +} + // CachedDecompressor is the file extension describing decompressor type CachedDecompressor struct { FileExtension string diff --git a/internal/storagetools/delete_object_handler.go b/internal/storagetools/delete_object_handler.go new file mode 100644 index 000000000..bbe25cf07 --- /dev/null +++ b/internal/storagetools/delete_object_handler.go @@ -0,0 +1,17 @@ +package storagetools + +import ( + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/pkg/storages/storage" +) + +func HandleDeleteObject(objectPath string, folder storage.Folder) { + // some storages may not produce an error on deleting the non-existing object + exists, err := folder.Exists(objectPath) + tracelog.ErrorLogger.FatalfOnError("Failed to check object existence: %v", err) + if !exists { + tracelog.ErrorLogger.Fatalf("Object %s does not exist", objectPath) + } + err = folder.DeleteObjects([]string{objectPath}) + tracelog.ErrorLogger.FatalfOnError("Failed to delete the specified object: %v", err) +} diff --git a/internal/storagetools/get_object_handler.go b/internal/storagetools/get_object_handler.go new file mode 100644 index 000000000..f56c5531f --- /dev/null +++ b/internal/storagetools/get_object_handler.go @@ -0,0 +1,79 @@ +package storagetools + +import ( + "errors" + "io" + "os" + "path" + + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/compression" + "github.com/wal-g/wal-g/pkg/storages/storage" + "github.com/wal-g/wal-g/utility" +) + +func HandleGetObject(objectPath, dstPath string, folder storage.Folder, decrypt, decompress bool) { + fileName := path.Base(objectPath) + targetPath, err := getTargetFilePath(dstPath, fileName) + tracelog.ErrorLogger.FatalfOnError("Failed to determine the destination path: %v", err) + + dstFile, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0640) + tracelog.ErrorLogger.FatalfOnError("Failed to open the destination file: %v", err) + + err = downloadObject(objectPath, folder, dstFile, decrypt, decompress) + dstFile.Close() + if err != nil { + os.Remove(targetPath) + tracelog.ErrorLogger.Fatalf("Failed to download the file: %v", err) + } +} + +func getTargetFilePath(dstPath string, fileName string) (string, error) { + info, err := os.Stat(dstPath) + if errors.Is(err, os.ErrNotExist) { + return dstPath, nil + } + + if err != nil { + return "", err + } + + if info.IsDir() { + return path.Join(dstPath, fileName), nil + } + + return dstPath, nil +} + +func downloadObject(objectPath string, folder storage.Folder, fileWriter io.Writer, decrypt, decompress bool) error { + objReadCloser, err := folder.ReadObject(objectPath) + if err != nil { + return err + } + origReadCloser := objReadCloser + defer origReadCloser.Close() + + if decrypt { + objReadCloser, err = internal.DecryptBytes(objReadCloser) + if err != nil { + return err + } + } + + if decompress { + fileName := path.Base(objectPath) + fileExt := path.Ext(fileName) + decompressor := compression.FindDecompressor(fileExt) + if decompressor != nil { + return decompressor.Decompress(fileWriter, objReadCloser) + } + + tracelog.WarningLogger.Printf( + "decompressor for extension '%s' was not found (supported methods: %v), will download uncompressed", + fileExt, compression.CompressingAlgorithms) + } + + _, err = utility.FastCopy(fileWriter, objReadCloser) + return err +} diff --git a/internal/storagetools/put_object_handler.go b/internal/storagetools/put_object_handler.go new file mode 100644 index 000000000..cdcef599c --- /dev/null +++ b/internal/storagetools/put_object_handler.go @@ -0,0 +1,69 @@ +package storagetools + +import ( + "io" + "os" + "path/filepath" + + "github.com/wal-g/wal-g/internal/compression" + "github.com/wal-g/wal-g/internal/crypto" + + "github.com/wal-g/wal-g/utility" + + "github.com/wal-g/tracelog" + "github.com/wal-g/wal-g/internal" +) + +func HandlePutObject(localPath, dstPath string, uploader *internal.Uploader, overwrite, encrypt, compress bool) { + checkOverwrite(dstPath, uploader, overwrite) + + fileReadCloser := openLocalFile(localPath) + defer fileReadCloser.Close() + + storageFolderPath := utility.SanitizePath(filepath.Dir(dstPath)) + if storageFolderPath != "" { + folder := uploader.UploadingFolder + uploader.UploadingFolder = folder.GetSubFolder(storageFolderPath) + } + + fileName := utility.SanitizePath(filepath.Base(dstPath)) + err := uploadFile(fileName, fileReadCloser, uploader, encrypt, compress) + tracelog.ErrorLogger.FatalfOnError("Failed to upload: %v", err) +} + +func checkOverwrite(dstPath string, uploader *internal.Uploader, overwrite bool) { + fullPath := dstPath + "." + uploader.Compressor.FileExtension() + exists, err := uploader.UploadingFolder.Exists(fullPath) + tracelog.ErrorLogger.FatalfOnError("Failed to check object existence: %v", err) + if exists && !overwrite { + tracelog.ErrorLogger.Fatalf("Object %s already exists. To overwrite it, add the -f flag.", fullPath) + } +} + +func openLocalFile(localPath string) io.ReadCloser { + localFile, err := os.Open(localPath) + tracelog.ErrorLogger.FatalfOnError("Could not open the local file: %v", err) + fileInfo, err := localFile.Stat() + tracelog.ErrorLogger.FatalfOnError("Could not Stat() the local file: %v", err) + if fileInfo.IsDir() { + tracelog.ErrorLogger.Fatalf("Provided local path (%s) points to a directory, exiting", localPath) + } + + return localFile +} + +func uploadFile(name string, content io.Reader, uploader *internal.Uploader, encrypt, compress bool) error { + var crypter crypto.Crypter + if encrypt { + crypter = internal.ConfigureCrypter() + } + + var compressor compression.Compressor + if compress && uploader.Compressor != nil { + compressor = uploader.Compressor + name += "." + uploader.Compressor.FileExtension() + } + + uploadContents := internal.CompressAndEncrypt(content, compressor, crypter) + return uploader.Upload(name, uploadContents) +}