diff --git a/dbs/migrate.go b/dbs/migrate.go index d589cdef..24e4a97d 100644 --- a/dbs/migrate.go +++ b/dbs/migrate.go @@ -44,24 +44,28 @@ DBS Migration APIs: - totat reports number of migration requests - cleanup clean migration DB -DBS migration status codes: - migration_status: +DBS migration status codes (in sync with old python migration server): 0=PENDING 1=IN PROGRESS 2=COMPLETED 3=FAILED (will be retried) 4=EXIST_IN_DB + 5=QUEUED 9=Terminally FAILED status change: - 0 -> 1 - 1 -> 2 - 1 -> 3 - 1 -> 4 - 1 -> 9 + QUEUED -> PENDING (5 -> 0) + PENDING -> IN PROGRESS (0 -> 1) + IN PROGRESS -> COMPLETED (1 -> 2) + IN PROGRESS -> FAILED (1 -> 3) + IN PROGRESS -> EXIST_IN_DB (1 -> 4) + IN PROGRESS -> (Terminally FAILED) (1 -> 9) are only allowed changes for working through migration. - 3 -> 1 is allowed for retrying and retry count +1. + FAILED -> IN PROGRESS (3 -> 1) is allowed for retrying and retry count +1. */ +// MigrationAsyncTimeout defines timeout of asynchrounous migration request process +var MigrationAsyncTimeout int + // MigrationCodes represents all migration codes const ( PENDING = iota @@ -69,6 +73,7 @@ const ( COMPLETED FAILED EXIST_IN_DB + QUEUED TERM_FAILED = 9 ) @@ -536,7 +541,24 @@ func GetParentDatasetBlocks(rurl, dataset string, order int) ([]MigrationBlock, // helper function to check if migration input is already queued func alreadyQueued(input string) error { - // TODO: check if given migration input is already queued + stm := getSQL("check_migration_request") + var args []interface{} + args = append(args, input) + if utils.VERBOSE > 0 { + utils.PrintSQL(stm, args, "execute") + } + var mid int64 + err := DB.QueryRow(stm, args...).Scan(&mid) + if err != nil { + if err == sql.ErrNoRows { + return nil + } + return err + } + if mid != 0 { + msg := fmt.Sprintf("migration request %s is already in queue with id=%d", input, mid) + return errors.New(msg) + } return nil } @@ -593,6 +615,8 @@ func statusString(status int64) string { s = "TERMINATED" } else if status == EXIST_IN_DB { s = "EXIST_IN_DB" + } else if status == QUEUED { + s = "QUEUED" } return s } @@ -608,7 +632,7 @@ func (a *API) SubmitMigration() error { } tstamp := time.Now().Unix() rec := MigrationRequest{ - MIGRATION_STATUS: PENDING, + MIGRATION_STATUS: QUEUED, CREATE_BY: a.CreateBy, CREATION_DATE: tstamp, LAST_MODIFIED_BY: a.CreateBy, @@ -623,7 +647,7 @@ func (a *API) SubmitMigration() error { // check if migration input is already queued input := rec.MIGRATION_INPUT mid := rec.MIGRATION_REQUEST_ID - mstr := fmt.Sprintf("Migration request %d", mid) + mstr := fmt.Sprintf("Migration request %s, id=%d", input, mid) if err := alreadyQueued(input); err != nil { msg := fmt.Sprintf("%s already queued error %v", mstr, err) if utils.VERBOSE > 1 { @@ -636,12 +660,34 @@ func (a *API) SubmitMigration() error { return Error(err, MigrationErrorCode, "not allowed for migration", "dbs.migrate.SubmitMigration") } - // start migration request - reports, err := startMigrationRequest(rec) + // migration output + var reports []MigrationReport + msg := "Migration request is started" + + // insert migration request + tx, err := DB.Begin() + defer tx.Rollback() if err != nil { - log.Println("unable to start migration request", err) - return Error(err, MigrationErrorCode, "", "dbs.migrate.SubmitMigration") + msg = fmt.Sprintf("%s, DB connection error %v", mstr, err) + } else { + err = rec.Insert(tx) + if err != nil { + msg = fmt.Sprintf("%s, insert error %v", mstr, err) + } else { + // commit transaction + err = tx.Commit() + if err != nil { + msg = fmt.Sprintf("%s commit transaction error %v", mstr, err) + } + } } + + // start migration request + go StartMigrationRequest(rec) + + // create final report for our migration request + reports = append(reports, migrationReport(rec, msg, QUEUED, nil)) + data, err = json.Marshal(reports) if err != nil { return Error(err, MarshalErrorCode, "", "dbs.migrate.SubmitMigration") @@ -650,23 +696,53 @@ func (a *API) SubmitMigration() error { return nil } +// StartMigrationRequest starts asynchronously migration request process via +// goroutine with timeout context +// the code is based on the following example: +// https://medium.com/geekculture/timeout-context-in-go-e88af0abd08d +func StartMigrationRequest(rec MigrationRequest) { + // setup context with timeout + ctx, cancel := context.WithTimeout( + context.Background(), + time.Duration(MigrationAsyncTimeout)*time.Second) + defer cancel() + ch := make(chan string, 1) + go func(ctx context.Context, ch chan string) { + reports, err := startMigrationRequest(rec) + if err != nil { + ch <- fmt.Sprintf("fail to start migration request %v, error %v", rec, err) + } else { + ch <- fmt.Sprintf("finished %v with %d migration requests", rec, len(reports)) + } + }(ctx, ch) + select { + case <-ctx.Done(): + msg := fmt.Sprintf("Migration request %v with context is cancelled %v", rec, ctx.Err()) + log.Println(msg) + case response := <-ch: + log.Println(response) + // case <-time.After(50 * time.Millisecond): + // msg := fmt.Sprintf("Migration request %v timeout", rec) + // log.Println(msg) + } +} + // helper function to start migration request and return list of migration ids //gocyclo:ignore -func startMigrationRequest(rec MigrationRequest) ([]MigrationReport, error) { +func startMigrationRequest(req MigrationRequest) ([]MigrationReport, error) { var err error status := int64(PENDING) msg := "Migration request is started" - var req MigrationRequest var reports []MigrationReport - input := rec.MIGRATION_INPUT + input := req.MIGRATION_INPUT mstr := fmt.Sprintf("Migration request for %+v", input) if utils.VERBOSE > 0 { - log.Printf("%s %+v", mstr, rec) + log.Printf("%s %+v", mstr, req) } var dstParentBlocks, srcParentBlocks []string - rurl := rec.MIGRATION_URL + rurl := req.MIGRATION_URL localhost := fmt.Sprintf("%s%s", utils.Localhost, utils.BASE) // get parent blocks at destination DBS instance for given input time0 := time.Now() @@ -720,11 +796,11 @@ func startMigrationRequest(rec MigrationRequest) ([]MigrationReport, error) { // if no migration blocks found to process return immediately if len(migBlocks) == 0 { - rec.MIGRATION_STATUS = EXIST_IN_DB - updateMigrationStatus(rec, EXIST_IN_DB) + req.MIGRATION_STATUS = EXIST_IN_DB + updateMigrationStatus(req, EXIST_IN_DB) msg = fmt.Sprintf("%s is already fulfilled, no blocks found for migration", mstr) log.Println(msg) - return []MigrationReport{migrationReport(rec, msg, status, err)}, nil + return []MigrationReport{migrationReport(req, msg, status, err)}, nil } if utils.VERBOSE > 0 { log.Printf("%s will migrate %d blocks", mstr, len(migBlocks)) @@ -757,22 +833,27 @@ func startMigrationRequest(rec MigrationRequest) ([]MigrationReport, error) { var ids []int64 for idx, blk := range migBlocks { - // insert MigrationRequest object + // create and insert MigrationRequest object with migration blocks + rec := req.Copy() rec.MIGRATION_REQUEST_ID = 0 rec.MIGRATION_INPUT = blk + rec.MIGRATION_STATUS = int64(PENDING) if utils.VERBOSE > 0 { log.Printf("%s insert MigrationRequest record %+v", mstr, rec) } - err = rec.Insert(tx) - if err != nil { - msg = fmt.Sprintf("unable to insert MigrationRequest record %+v, error %v", rec, err) - log.Println(msg) - if strings.Contains(err.Error(), "unique") { - // we inserted the same block - continue + // we skip insert for migration request input since it is inserted upstream + if blk != input { + err = rec.Insert(tx) + if err != nil { + msg = fmt.Sprintf("unable to insert MigrationRequest record %+v, error %v", rec, err) + log.Println(msg) + if strings.Contains(err.Error(), "unique") { + // we inserted the same block + continue + } + return []MigrationReport{migrationReport(req, msg, status, err)}, + Error(err, InsertErrorCode, "", "dbs.migrate.SubmitMigration") } - return []MigrationReport{migrationReport(req, msg, status, err)}, - Error(err, InsertErrorCode, "", "dbs.migrate.SubmitMigration") } // get inserted migration ID @@ -825,6 +906,11 @@ func startMigrationRequest(rec MigrationRequest) ([]MigrationReport, error) { if utils.VERBOSE > 0 { log.Printf("%s finished, migration ids %v", mstr, ids) } + + // after we done with insertion of migration blocks + // we update original migration request status and set it to PENDING + updateMigrationStatus(req, PENDING) + return reports, nil } diff --git a/dbs/migration_requests.go b/dbs/migration_requests.go index b81049ab..f983e86d 100644 --- a/dbs/migration_requests.go +++ b/dbs/migration_requests.go @@ -26,6 +26,23 @@ type MigrationRequest struct { RETRY_COUNT int64 `json:"retry_count"` } +// Copy creates a new copy of migration request +func (r *MigrationRequest) Copy() MigrationRequest { + req := MigrationRequest{ + MIGRATION_REQUEST_ID: r.MIGRATION_REQUEST_ID, + MIGRATION_URL: r.MIGRATION_URL, + MIGRATION_INPUT: r.MIGRATION_INPUT, + MIGRATION_STATUS: r.MIGRATION_STATUS, + MIGRATION_SERVER: r.MIGRATION_SERVER, + CREATE_BY: r.CREATE_BY, + CREATION_DATE: r.CREATION_DATE, + LAST_MODIFIED_BY: r.LAST_MODIFIED_BY, + LAST_MODIFICATION_DATE: r.LAST_MODIFICATION_DATE, + RETRY_COUNT: r.RETRY_COUNT, + } + return req +} + // Insert implementation of MigrationRequest func (r *MigrationRequest) Insert(tx *sql.Tx) error { var tid int64 diff --git a/docs/MigrationServer.md b/docs/MigrationServer.md index cab0e248..d10a2d7c 100644 --- a/docs/MigrationServer.md +++ b/docs/MigrationServer.md @@ -34,18 +34,21 @@ from underlying DB backend on periodic basis migration server settings (by default 3 times) - 4 migration request is already exist in DB, i.e. the requested block or dataset is already found in database + - 5 migration request is queued, i.e. initially submitted by a client - 9 migration request termindated, this can happen in two scenarios - migration request has been cancelled explicitly by user - migration request failed N times and will no longer be retried automatically The migration request goes throught the followin cycle: +(using notations of Go-based server, see +[DBS Migrate code](https://github.com/dmwm/dbs2go/blob/master/dbs/migrate.go) ``` -status change: -0 -> 1, request in progress -1 -> 2, request is completed successfully -1 -> 3, and if failed 3->1 -1 -> 4, if request is alaready in DB -1 -> 9, request is terminated +QUEUED -> PENDING (5 -> 0), request is accepted for processing +PENDING -> IN PROGRESS (0 -> 1), request is in progress by DBS migration server +IN PROGRESS -> COMPLETED (1 -> 2), request is completed successfully +IN PROGRESS -> FAILED (1 -> 3), request failed but can be retried +IN PROGRESS -> EXIST_IN_DB (1 -> 4), request is alaready in DB +IN PROGRESS -> (Terminally FAILED) (1 -> 9), request is terminated after all retries ``` ### Examples diff --git a/static/sql/check_migration_request.sql b/static/sql/check_migration_request.sql new file mode 100644 index 00000000..6caf609d --- /dev/null +++ b/static/sql/check_migration_request.sql @@ -0,0 +1,4 @@ +SELECT MR.MIGRATION_REQUEST_ID +FROM {{.Owner}}.MIGRATION_REQUESTS MR +JOIN {{.Owner}}.MIGRATION_BLOCKS MB ON MB.MIGRATION_REQUEST_ID=MR.MIGRATION_REQUEST_ID +WHERE MR.MIGRATION_INPUT=:migration_input diff --git a/web/config.go b/web/config.go index 1aade0e6..df8f68d4 100644 --- a/web/config.go +++ b/web/config.go @@ -40,6 +40,7 @@ type Configuration struct { MigrationCleanupInterval int `json:"migration_cleanup_interval"` // migration cleanup interval MigrationCleanupOffset int64 `json:"migration_cleanup_offset"` // migration cleanup offset MigrationRetries int64 `json:"migration_retries"` // migration retries + MigrationAsyncTimeout int `json:"migration_async_timeout"` // timeout for aysnc migration request // db related configuration DBFile string `json:"dbfile"` // dbs db file with secrets @@ -106,6 +107,9 @@ func ParseConfig(configFile string) error { if Config.LimiterPeriod == "" { Config.LimiterPeriod = "100-S" } + if Config.MigrationAsyncTimeout == 0 { + Config.MigrationAsyncTimeout = 600 // in seconds + } if Config.MigrationProcessTimeout == 0 { Config.MigrationProcessTimeout = 300 // in seconds } diff --git a/web/server.go b/web/server.go index 91d0a8ed..3f7f6682 100644 --- a/web/server.go +++ b/web/server.go @@ -413,6 +413,7 @@ func Server(configFile string) { dbs.DBOWNER = dbowner // migration settings + dbs.MigrationAsyncTimeout = Config.MigrationAsyncTimeout dbs.MigrationProcessTimeout = Config.MigrationProcessTimeout dbs.MigrationServerInterval = Config.MigrationServerInterval dbs.MigrationCleanupInterval = Config.MigrationCleanupInterval