From 5deb42cbb8043a4c3a02e4fb3c047f5bf569889c Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Tue, 27 Jun 2023 12:58:24 +0530 Subject: [PATCH] chore(test): add support for Export Import upgrade strategy (#8883) --- dgraphtest/config.go | 2 +- dgraphtest/dgraph.go | 16 ++- dgraphtest/load.go | 238 ++++++++++++++++++++++++++++++++++++ dgraphtest/local_cluster.go | 54 ++++++-- dgraphtest/paths.go | 2 +- query/cloud_test.go | 2 +- query/upgrade_test.go | 6 +- 7 files changed, 306 insertions(+), 14 deletions(-) create mode 100644 dgraphtest/load.go diff --git a/dgraphtest/config.go b/dgraphtest/config.go index af50e8a1ae8..9b611b22d23 100644 --- a/dgraphtest/config.go +++ b/dgraphtest/config.go @@ -87,7 +87,7 @@ type ClusterConfig struct { } func NewClusterConfig() ClusterConfig { - prefix := fmt.Sprintf("test-%d", rand.NewSource(time.Now().UnixNano()).Int63()%1000000) + prefix := fmt.Sprintf("dgraphtest-%d", rand.NewSource(time.Now().UnixNano()).Int63()%1000000) defaultBackupVol := fmt.Sprintf("%v_backup", prefix) defaultExportVol := fmt.Sprintf("%v_export", prefix) return ClusterConfig{ diff --git a/dgraphtest/dgraph.go b/dgraphtest/dgraph.go index d9e79c95d0a..4931dc7d5db 100644 --- a/dgraphtest/dgraph.go +++ b/dgraphtest/dgraph.go @@ -87,6 +87,7 @@ type dnode interface { healthURL(*LocalCluster) (string, error) assignURL(*LocalCluster) (string, error) alphaURL(*LocalCluster) (string, error) + zeroURL(*LocalCluster) (string, error) } type zero struct { @@ -172,7 +173,15 @@ func (z *zero) assignURL(c *LocalCluster) (string, error) { } func (z *zero) alphaURL(c *LocalCluster) (string, error) { - return "", errors.New("no alpha URL for zero") + return "", errNotImplemented +} + +func (z *zero) zeroURL(c *LocalCluster) (string, error) { + publicPort, err := publicPort(c.dcli, z, zeroGrpcPort) + if err != nil { + return "", err + } + return "localhost:" + publicPort + "", nil } type alpha struct { @@ -274,7 +283,6 @@ func (a *alpha) mounts(c *LocalCluster) ([]mount.Mount, error) { ReadOnly: false, }) } - return mounts, nil } @@ -298,6 +306,10 @@ func (a *alpha) alphaURL(c *LocalCluster) (string, error) { return "localhost:" + publicPort + "", nil } +func (a *alpha) zeroURL(c *LocalCluster) (string, error) { + return "", errNotImplemented +} + func publicPort(dcli *docker.Client, dc dnode, privatePort string) (string, error) { // TODO(aman): we should cache the port information ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) diff --git a/dgraphtest/load.go b/dgraphtest/load.go new file mode 100644 index 00000000000..cf888b98701 --- /dev/null +++ b/dgraphtest/load.go @@ -0,0 +1,238 @@ +/* + * Copyright 2023 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dgraphtest + +import ( + "archive/tar" + "compress/gzip" + "context" + "fmt" + "io" + "log" + "os" + "os/exec" + "path/filepath" + "strings" + + "github.com/pkg/errors" + + "github.com/dgraph-io/dgraph/x" +) + +type LiveOpts struct { + RdfFiles []string + SchemaFiles []string + GqlSchemaFiles []string +} + +func readGzFile(sf string) ([]byte, error) { + fd, err := os.Open(sf) + if err != nil { + return nil, errors.Wrapf(err, "error opening file [%v]", sf) + } + defer func() { + if err := fd.Close(); err != nil { + log.Printf("[WARNING] error closing file [%v]: %v", sf, err) + } + }() + + gr, err := gzip.NewReader(fd) + if err != nil { + return nil, errors.Wrapf(err, "error creating a gzip reader for file [%v]", sf) + } + defer func() { + if err := gr.Close(); err != nil { + log.Printf("[WARNING] error closing gzip reader for file [%v]: %v", sf, err) + } + }() + + data, err := io.ReadAll(gr) + if err != nil { + return nil, errors.Wrapf(err, "error reading file content [%v]", sf) + } + return data, nil +} + +func setDQLSchema(c *LocalCluster, files []string) error { + gc, cleanup, err := c.Client() + if err != nil { + return errors.WithStack(err) + } + defer cleanup() + if err := gc.LoginIntoNamespace(context.Background(), + DefaultUser, DefaultPassword, x.GalaxyNamespace); err != nil { + return errors.WithStack(err) + } + + for _, sf := range files { + data, err := readGzFile(sf) + if err != nil { + return err + } + if err := gc.SetupSchema(string(data)); err != nil { + return errors.WithStack(err) + } + } + return nil +} + +func setGraphQLSchema(c *LocalCluster, files []string) error { + hc, err := c.HTTPClient() + if err != nil { + return errors.WithStack(err) + } + if err := hc.LoginIntoNamespace(DefaultUser, DefaultPassword, x.GalaxyNamespace); err != nil { + return errors.WithStack(err) + } + + for _, sf := range files { + data, err := readGzFile(sf) + if err != nil { + return err + } + // if there is no GraphQL schema in the cluster, + // the GQL file only has empty []. + if len(data) < 10 { + continue + } + if err := hc.UpdateGQLSchema(string(data)); err != nil { + return errors.WithStack(err) + } + } + return nil +} + +// LiveLoad runs the live loader with provided options +func (c *LocalCluster) LiveLoad(opts LiveOpts) error { + if err := setDQLSchema(c, opts.SchemaFiles); err != nil { + return err + } + if err := setGraphQLSchema(c, opts.GqlSchemaFiles); err != nil { + return err + } + + var alphaURLs []string + for i, aa := range c.alphas { + url, err := aa.alphaURL(c) + if err != nil { + return errors.Wrapf(err, "error finding URL to %vth alpha", i) + } + alphaURLs = append(alphaURLs, url) + } + zeroURL, err := c.zeros[0].zeroURL(c) + if err != nil { + return errors.Wrap(err, "error finding URL to 0th zero") + } + + args := []string{ + "live", + "--files", strings.Join(opts.RdfFiles, ","), + "--alpha", strings.Join(alphaURLs, ","), + "--zero", zeroURL, + } + if c.conf.acl { + args = append(args, "--creds", fmt.Sprintf("user=%s;password=%s;namespace=%d", + DefaultUser, DefaultPassword, x.GalaxyNamespace)) + } + if c.conf.encryption { + args = append(args, "--encryption", fmt.Sprintf("key-file=%v", encKeyPath)) + } + + log.Printf("[INFO] running live loader with args: [%v]", strings.Join(args, " ")) + cmd := exec.Command(filepath.Join(c.tempBinDir, "dgraph"), args...) + if out, err := cmd.CombinedOutput(); err != nil { + return errors.Wrapf(err, "error running live loader: %v", string(out)) + } else { + log.Printf("[INFO] ==== output for live loader ====") + log.Println(string(out)) + } + return nil +} + +// LiveLoadFromExport runs the live loader from the output of dgraph export +// The exportDir is the directory present inside the container. This function +// first copies all the files on the host and then runs the live loader. +func (c *LocalCluster) LiveLoadFromExport(exportDir string) error { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + exportDirHost, err := os.MkdirTemp("", "dgraph-export") + if err != nil { + return errors.Wrap(err, "error creating temp dir for exported data") + } + + // First, we need to copy the exported data from the container to host + ts, _, err := c.dcli.CopyFromContainer(ctx, c.alphas[0].cid(), exportDir) + if err != nil { + return errors.Wrapf(err, "error copying export dir from container [%v]", c.alphas[0].cname()) + } + defer func() { + if err := ts.Close(); err != nil { + log.Printf("[WARNING] error closing tared stream from docker cp for [%v]", c.alphas[0].cname()) + } + }() + + // .rdf.gz, .schema.gz,.gql_schema.gz + var rdfFiles, schemaFiles, gqlSchemaFiles []string + tr := tar.NewReader(ts) + for { + header, err := tr.Next() + if err == io.EOF { + break + } else if err != nil { + return errors.Wrapf(err, "error reading file in tared stream: [%+v]", header) + } + if header.Typeflag != tar.TypeReg { + continue + } + + fileName := filepath.Base(header.Name) + hostFile := filepath.Join(exportDirHost, fileName) + switch { + case strings.HasSuffix(fileName, ".rdf.gz"): + rdfFiles = append(rdfFiles, hostFile) + case strings.HasSuffix(fileName, ".schema.gz"): + schemaFiles = append(schemaFiles, hostFile) + case strings.HasSuffix(fileName, ".gql_schema.gz"): + gqlSchemaFiles = append(gqlSchemaFiles, hostFile) + default: + return errors.Errorf("found unexpected file in export: %v", fileName) + } + + fd, err := os.Create(hostFile) //nolint: G305 + if err != nil { + return errors.Wrapf(err, "error creating file [%v]", hostFile) + } + defer func() { + if err := fd.Close(); err != nil { + log.Printf("[WARNING] error closing file while docker cp: [%+v]", header) + } + }() + if _, err := io.Copy(fd, tr); err != nil { + return errors.Wrapf(err, "error writing to [%v] from: [%+v]", fd.Name(), header) + } + } + + opts := LiveOpts{ + RdfFiles: rdfFiles, + SchemaFiles: schemaFiles, + GqlSchemaFiles: gqlSchemaFiles, + } + if err := c.LiveLoad(opts); err != nil { + return errors.Wrapf(err, "error running live loader: %v", err) + } + return nil +} diff --git a/dgraphtest/local_cluster.go b/dgraphtest/local_cluster.go index 5119a63b210..a246d5044cc 100644 --- a/dgraphtest/local_cluster.go +++ b/dgraphtest/local_cluster.go @@ -64,6 +64,7 @@ type UpgradeStrategy int const ( BackupRestore UpgradeStrategy = iota + ExportImport StopStart ) @@ -73,6 +74,8 @@ func (u UpgradeStrategy) String() string { return "backup-restore" case StopStart: return "stop-start" + case ExportImport: + return "export-import" default: panic("unknown upgrade strategy") } @@ -107,6 +110,7 @@ func (c *LocalCluster) init() error { if err != nil { return errors.Wrap(err, "error while creating temp dir") } + log.Printf("[INFO] tempBinDir: %v", c.tempBinDir) if err := os.Mkdir(binDir, os.ModePerm); err != nil && !os.IsExist(err) { return errors.Wrap(err, "error while making binDir") } @@ -489,16 +493,18 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error { if err != nil { return err } - if err := hc.LoginIntoNamespace(DefaultUser, DefaultPassword, x.GalaxyNamespace); err != nil { - return errors.Wrapf(err, "error during login before upgrade") + if c.conf.acl { + if err := hc.LoginIntoNamespace(DefaultUser, DefaultPassword, x.GalaxyNamespace); err != nil { + return errors.Wrapf(err, "error during login before upgrade") + } } if err := hc.Backup(c, true, DefaultBackupDir); err != nil { return errors.Wrap(err, "error taking backup during upgrade") } - if err := c.Stop(); err != nil { return err } + c.conf.version = version if err := c.setupBinary(); err != nil { return err @@ -509,17 +515,19 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error { if err := c.Start(); err != nil { return err } + var encPath string if c.conf.encryption { encPath = encKeyMountPath } - hc, err = c.HTTPClient() if err != nil { return errors.Wrapf(err, "error creating HTTP client after upgrade") } - if err := hc.LoginIntoNamespace(DefaultUser, DefaultPassword, x.GalaxyNamespace); err != nil { - return errors.Wrapf(err, "error during login after upgrade") + if c.conf.acl { + if err := hc.LoginIntoNamespace(DefaultUser, DefaultPassword, x.GalaxyNamespace); err != nil { + return errors.Wrapf(err, "error during login after upgrade") + } } if err := hc.Restore(c, DefaultBackupDir, "", 0, 1, encPath); err != nil { return errors.Wrap(err, "error doing restore during upgrade") @@ -529,6 +537,38 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error { } return nil + case ExportImport: + hc, err := c.HTTPClient() + if err != nil { + return err + } + if c.conf.acl { + if err := hc.LoginIntoNamespace(DefaultUser, DefaultPassword, x.GalaxyNamespace); err != nil { + return errors.Wrapf(err, "error during login before upgrade") + } + } + if err := hc.Export(DefaultExportDir); err != nil { + return errors.Wrap(err, "error taking export during upgrade") + } + if err := c.Stop(); err != nil { + return err + } + + c.conf.version = version + if err := c.setupBinary(); err != nil { + return err + } + if err := c.recreateContainers(); err != nil { + return err + } + if err := c.Start(); err != nil { + return err + } + if err := c.LiveLoadFromExport(DefaultExportDir); err != nil { + return errors.Wrap(err, "error doing import using live loader") + } + return nil + case StopStart: if err := c.Stop(); err != nil { return err @@ -733,7 +773,7 @@ func (c *LocalCluster) getLogs(containerID string) (string, error) { data, err := io.ReadAll(ro) if err != nil { - log.Printf("[WARN] error in reading logs for [%v]: %v", containerID, err) + log.Printf("[WARNING] error in reading logs for [%v]: %v", containerID, err) } return string(data), nil } diff --git a/dgraphtest/paths.go b/dgraphtest/paths.go index 88a7c176545..4dba9047aa7 100644 --- a/dgraphtest/paths.go +++ b/dgraphtest/paths.go @@ -39,7 +39,7 @@ const ( ) func init() { - // init log + // init logging log.SetFlags(log.LstdFlags | log.Lshortfile) // setup paths diff --git a/query/cloud_test.go b/query/cloud_test.go index 9f25517aee7..9aea8b04e3e 100644 --- a/query/cloud_test.go +++ b/query/cloud_test.go @@ -37,7 +37,7 @@ func TestMain(m *testing.M) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - x.Panic(dg.LoginIntoNamespace(ctx, dgraphtest.DefaultUser, dgraphtest.DefaultPassword, 0)) + x.Panic(dg.LoginIntoNamespace(ctx, dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) dc = c client = dg.Dgraph diff --git a/query/upgrade_test.go b/query/upgrade_test.go index 40bd186759b..c9e561d12e7 100644 --- a/query/upgrade_test.go +++ b/query/upgrade_test.go @@ -34,7 +34,8 @@ func TestMain(m *testing.M) { dg, cleanup, err := c.Client() x.Panic(err) defer cleanup() - x.Panic(dg.LoginIntoNamespace(context.Background(), dgraphtest.DefaultUser, dgraphtest.DefaultPassword, 0)) + x.Panic(dg.LoginIntoNamespace(context.Background(), dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace)) client = dg.Dgraph dc = c @@ -45,7 +46,8 @@ func TestMain(m *testing.M) { dg, cleanup, err := c.Client() x.Panic(err) defer cleanup() - x.Panic(dg.LoginIntoNamespace(context.Background(), dgraphtest.DefaultUser, dgraphtest.DefaultPassword, 0)) + x.Panic(dg.LoginIntoNamespace(context.Background(), dgraphtest.DefaultUser, + dgraphtest.DefaultPassword, x.GalaxyNamespace)) client = dg.Dgraph dc = c