Skip to content

Commit

Permalink
clean up some logging and add more details
Browse files Browse the repository at this point in the history
  • Loading branch information
kris-watts-gravwell committed May 9, 2023
1 parent da0ff40 commit c5b6a32
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.swp
54 changes: 50 additions & 4 deletions pkg/ftpstore/ftpstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gravwell/cloudarchive/pkg/shardpacker"
"github.com/gravwell/cloudarchive/pkg/tags"
"github.com/gravwell/cloudarchive/pkg/util"
"github.com/gravwell/gravwell/v3/ingest/log"
"github.com/jlaffaye/ftp"

"github.com/google/uuid"
Expand All @@ -32,6 +33,8 @@ import (
var (
ErrMissingBaseDir = errors.New("Empty base directory for file store")

errNotImplemented = `502 Command not implemented.`

ftpSync sync.Mutex
)

Expand All @@ -46,9 +49,13 @@ type FtpStoreConfig struct {
BaseDir string // base directory *on the server*
Username string
Password string
Lgr *log.Logger
}

func NewFtpStoreHandler(cfg FtpStoreConfig) (*ftpstore, error) {
if cfg.Lgr == nil {
cfg.Lgr = log.New(os.Stderr)
}
return &ftpstore{
cfg: cfg,
UploadTracker: util.NewUploadTracker(),
Expand All @@ -59,9 +66,12 @@ func (f *ftpstore) getFtpClient() (*ftp.ServerConn, error) {
do := ftp.DialWithTimeout(10 * time.Second)
c, err := ftp.Dial(f.cfg.FtpServer, do)
if err != nil {
f.cfg.Lgr.Error("Failed to dial server", log.KV("address", f.cfg.FtpServer), log.KVErr(err))
return nil, err
}
err = c.Login(f.cfg.Username, f.cfg.Password)
if err = c.Login(f.cfg.Username, f.cfg.Password); err != nil {
f.cfg.Lgr.Error("Failed to log in", log.KV("address", f.cfg.FtpServer), log.KVErr(err))
}
return c, err
}

Expand Down Expand Up @@ -99,6 +109,9 @@ func (f *ftpstore) ListIndexerWells(cid uint64, guid uuid.UUID) ([]string, error
}
idxDir := filepath.Join(f.cfg.BaseDir, strconv.FormatUint(cid, 10), guid.String())
if ents, err = c.List(idxDir); err != nil {
f.cfg.Lgr.Error("Failed to list index directory",
log.KV("directory", idxDir),
log.KVErr(err))
return wells, err
}
for _, info := range ents {
Expand All @@ -121,6 +134,9 @@ func (f *ftpstore) GetWellTimeframe(cid uint64, guid uuid.UUID, well string) (t
var ents []*ftp.Entry
ents, err = c.List(wellDir)
if err != nil {
f.cfg.Lgr.Error("Failed to list well directory",
log.KV("directory", wellDir),
log.KVErr(err))
return
}
for _, info := range ents {
Expand Down Expand Up @@ -149,6 +165,9 @@ func (f *ftpstore) GetShardsInTimeframe(cid uint64, guid uuid.UUID, well string,
var ents []*ftp.Entry
ents, err = c.List(wellDir)
if err != nil {
f.cfg.Lgr.Error("Failed to list well directory",
log.KV("directory", wellDir),
log.KVErr(err))
return
}
for _, info := range ents {
Expand Down Expand Up @@ -185,6 +204,7 @@ func (f *ftpstore) UnpackShard(cid uint64, idxUUID uuid.UUID, well, shard string
}

if err = f.EnterUpload(uid); err != nil {
f.cfg.Lgr.Error("Failed to enter upload", log.KVErr(err))
return
}

Expand Down Expand Up @@ -212,6 +232,9 @@ func (f *ftpstore) UnpackShard(cid uint64, idxUUID uuid.UUID, well, shard string
}
if err = ftpMkdirAll(c, shardDir); err != nil {
f.ExitUpload(uid)
f.cfg.Lgr.Error("Failed to make shard directory",
log.KV("directory", shardDir),
log.KVErr(err))
return
}

Expand All @@ -228,12 +251,22 @@ func (f *ftpstore) UnpackShard(cid uint64, idxUUID uuid.UUID, well, shard string
if up, err = shardpacker.NewUnpacker(shard, rdr); err != nil {
c.RemoveDirRecur(shardDir)
f.ExitUpload(uid)
f.cfg.Lgr.Error("Failed to create new shard unpacker",
log.KV("client-id", cid),
log.KV("uuid", idxUUID),
log.KV("shard", shardDir),
log.KVErr(err))
return
}
//perform the actual unpack
if err = up.Unpack(h); err != nil {
c.RemoveDirRecur(shardDir)
f.ExitUpload(uid)
f.cfg.Lgr.Error("Failed to unpack shard",
log.KV("client-id", cid),
log.KV("uuid", idxUUID),
log.KV("shard", shardDir),
log.KVErr(err))
return
}

Expand Down Expand Up @@ -379,6 +412,7 @@ func (f *ftpstore) GetTags(cid uint64, guid uuid.UUID) (tgs []tags.TagPair, err
localBaseDir := filepath.Join(h.localStore, indexerDir)
var tm tags.TagManager
if tm, err = tags.GetTagMan(cid, guid, localBaseDir); err != nil {
f.cfg.Lgr.Error("Failed enumerate tags", log.KVErr(err))
return
}
tgs, err = tm.TagSet()
Expand Down Expand Up @@ -408,12 +442,14 @@ func (f *ftpstore) SyncTags(cid uint64, guid uuid.UUID, idxTags []tags.TagPair)
localBaseDir := filepath.Join(h.localStore, indexerDir)
var tm tags.TagManager
if tm, err = tags.GetTagMan(cid, guid, localBaseDir); err != nil {
f.cfg.Lgr.Error("Failed enumerate tags", log.KVErr(err))
return
}
// Now merge
_, err = tm.Merge(idxTags)
if err != nil {
tags.ReleaseTagMan(cid, guid)
f.cfg.Lgr.Error("Failed merge tags", log.KVErr(err))
return
}
// Fetch the updated tagset to return
Expand All @@ -427,9 +463,20 @@ func (f *ftpstore) SyncTags(cid uint64, guid uuid.UUID, idxTags []tags.TagPair)
}

func ftpDirExists(c *ftp.ServerConn, path string) bool {
_, err := c.GetEntry(path)
if err == nil {
//first we try GetEntry, but if that is not implemented we will use List which is more expensive
//check if its a 502 response of not-implemented which apparently is a thing
if _, err := c.GetEntry(path); err == nil {
return true
} else if err.Error() == errNotImplemented {
//ok, do the more expensive change directory command
if cdir, err := c.CurrentDir(); err == nil {
if err = c.ChangeDir(path); err == nil {
if err = c.ChangeDir(cdir); err == nil {
return true
}
}
}
return false
}
return false
}
Expand Down Expand Up @@ -465,7 +512,6 @@ func (h handler) HandleFile(pth string, rdr io.Reader) error {
if dir != `` {
err := ftpMkdirAll(h.client, filepath.Join(h.sdir, dir))
if err != nil {
fmt.Printf("error when creating file: %v\n", err)
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func main() {
FtpServer: cfg.Global.FTP_Server,
Username: cfg.Global.FTP_Username,
Password: cfg.Global.FTP_Password,
Lgr: lgr,
}
handler, err = ftpstore.NewFtpStoreHandler(fcfg)
if err != nil {
Expand Down

0 comments on commit c5b6a32

Please sign in to comment.