Skip to content

Commit

Permalink
Switch DBSMigrate server to async behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Aug 17, 2022
1 parent 69580d1 commit 134af01
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 39 deletions.
152 changes: 119 additions & 33 deletions dbs/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,36 @@ DBS Migration APIs:
- totat reports number of migration requests
- cleanup clean migration DB
DBS migration status codes:
migration_status:
DBS migration status codes (in sync with old python migration server):
0=PENDING
1=IN PROGRESS
2=COMPLETED
3=FAILED (will be retried)
4=EXIST_IN_DB
5=QUEUED
9=Terminally FAILED
status change:
0 -> 1
1 -> 2
1 -> 3
1 -> 4
1 -> 9
QUEUED -> PENDING (5 -> 0)
PENDING -> IN PROGRESS (0 -> 1)
IN PROGRESS -> COMPLETED (1 -> 2)
IN PROGRESS -> FAILED (1 -> 3)
IN PROGRESS -> EXIST_IN_DB (1 -> 4)
IN PROGRESS -> (Terminally FAILED) (1 -> 9)
are only allowed changes for working through migration.
3 -> 1 is allowed for retrying and retry count +1.
FAILED -> IN PROGRESS (3 -> 1) is allowed for retrying and retry count +1.
*/

// MigrationAsyncTimeout defines timeout of asynchrounous migration request process
var MigrationAsyncTimeout int

// MigrationCodes represents all migration codes
const (
PENDING = iota
IN_PROGRESS
COMPLETED
FAILED
EXIST_IN_DB
QUEUED
TERM_FAILED = 9
)

Expand Down Expand Up @@ -536,7 +541,24 @@ func GetParentDatasetBlocks(rurl, dataset string, order int) ([]MigrationBlock,

// helper function to check if migration input is already queued
func alreadyQueued(input string) error {
// TODO: check if given migration input is already queued
stm := getSQL("check_migration_request")
var args []interface{}
args = append(args, input)
if utils.VERBOSE > 0 {
utils.PrintSQL(stm, args, "execute")
}
var mid int64
err := DB.QueryRow(stm, args...).Scan(&mid)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
if mid != 0 {
msg := fmt.Sprintf("migration request %s is already in queue with id=%d", input, mid)
return errors.New(msg)
}
return nil
}

Expand Down Expand Up @@ -593,6 +615,8 @@ func statusString(status int64) string {
s = "TERMINATED"
} else if status == EXIST_IN_DB {
s = "EXIST_IN_DB"
} else if status == QUEUED {
s = "QUEUED"
}
return s
}
Expand All @@ -608,7 +632,7 @@ func (a *API) SubmitMigration() error {
}
tstamp := time.Now().Unix()
rec := MigrationRequest{
MIGRATION_STATUS: PENDING,
MIGRATION_STATUS: QUEUED,
CREATE_BY: a.CreateBy,
CREATION_DATE: tstamp,
LAST_MODIFIED_BY: a.CreateBy,
Expand All @@ -623,7 +647,7 @@ func (a *API) SubmitMigration() error {
// check if migration input is already queued
input := rec.MIGRATION_INPUT
mid := rec.MIGRATION_REQUEST_ID
mstr := fmt.Sprintf("Migration request %d", mid)
mstr := fmt.Sprintf("Migration request %s, id=%d", input, mid)
if err := alreadyQueued(input); err != nil {
msg := fmt.Sprintf("%s already queued error %v", mstr, err)
if utils.VERBOSE > 1 {
Expand All @@ -636,12 +660,34 @@ func (a *API) SubmitMigration() error {
return Error(err, MigrationErrorCode, "not allowed for migration", "dbs.migrate.SubmitMigration")
}

// start migration request
reports, err := startMigrationRequest(rec)
// migration output
var reports []MigrationReport
msg := "Migration request is started"

// insert migration request
tx, err := DB.Begin()
defer tx.Rollback()
if err != nil {
log.Println("unable to start migration request", err)
return Error(err, MigrationErrorCode, "", "dbs.migrate.SubmitMigration")
msg = fmt.Sprintf("%s, DB connection error %v", mstr, err)
} else {
err = rec.Insert(tx)
if err != nil {
msg = fmt.Sprintf("%s, insert error %v", mstr, err)
} else {
// commit transaction
err = tx.Commit()
if err != nil {
msg = fmt.Sprintf("%s commit transaction error %v", mstr, err)
}
}
}

// start migration request
go StartMigrationRequest(rec)

// create final report for our migration request
reports = append(reports, migrationReport(rec, msg, QUEUED, nil))

data, err = json.Marshal(reports)
if err != nil {
return Error(err, MarshalErrorCode, "", "dbs.migrate.SubmitMigration")
Expand All @@ -650,23 +696,53 @@ func (a *API) SubmitMigration() error {
return nil
}

// StartMigrationRequest starts asynchronously migration request process via
// goroutine with timeout context
// the code is based on the following example:
// https://medium.com/geekculture/timeout-context-in-go-e88af0abd08d
func StartMigrationRequest(rec MigrationRequest) {
// setup context with timeout
ctx, cancel := context.WithTimeout(
context.Background(),
time.Duration(MigrationAsyncTimeout)*time.Second)
defer cancel()
ch := make(chan string, 1)
go func(ctx context.Context, ch chan string) {
reports, err := startMigrationRequest(rec)
if err != nil {
ch <- fmt.Sprintf("fail to start migration request %v, error %v", rec, err)
} else {
ch <- fmt.Sprintf("finished %v with %d migration requests", rec, len(reports))
}
}(ctx, ch)
select {
case <-ctx.Done():
msg := fmt.Sprintf("Migration request %v with context is cancelled %v", rec, ctx.Err())
log.Println(msg)
case response := <-ch:
log.Println(response)
// case <-time.After(50 * time.Millisecond):
// msg := fmt.Sprintf("Migration request %v timeout", rec)
// log.Println(msg)
}
}

// helper function to start migration request and return list of migration ids
//gocyclo:ignore
func startMigrationRequest(rec MigrationRequest) ([]MigrationReport, error) {
func startMigrationRequest(req MigrationRequest) ([]MigrationReport, error) {
var err error
status := int64(PENDING)
msg := "Migration request is started"
var req MigrationRequest
var reports []MigrationReport

input := rec.MIGRATION_INPUT
input := req.MIGRATION_INPUT
mstr := fmt.Sprintf("Migration request for %+v", input)
if utils.VERBOSE > 0 {
log.Printf("%s %+v", mstr, rec)
log.Printf("%s %+v", mstr, req)
}

var dstParentBlocks, srcParentBlocks []string
rurl := rec.MIGRATION_URL
rurl := req.MIGRATION_URL
localhost := fmt.Sprintf("%s%s", utils.Localhost, utils.BASE)
// get parent blocks at destination DBS instance for given input
time0 := time.Now()
Expand Down Expand Up @@ -720,11 +796,11 @@ func startMigrationRequest(rec MigrationRequest) ([]MigrationReport, error) {

// if no migration blocks found to process return immediately
if len(migBlocks) == 0 {
rec.MIGRATION_STATUS = EXIST_IN_DB
updateMigrationStatus(rec, EXIST_IN_DB)
req.MIGRATION_STATUS = EXIST_IN_DB
updateMigrationStatus(req, EXIST_IN_DB)
msg = fmt.Sprintf("%s is already fulfilled, no blocks found for migration", mstr)
log.Println(msg)
return []MigrationReport{migrationReport(rec, msg, status, err)}, nil
return []MigrationReport{migrationReport(req, msg, status, err)}, nil
}
if utils.VERBOSE > 0 {
log.Printf("%s will migrate %d blocks", mstr, len(migBlocks))
Expand Down Expand Up @@ -757,22 +833,27 @@ func startMigrationRequest(rec MigrationRequest) ([]MigrationReport, error) {
var ids []int64
for idx, blk := range migBlocks {

// insert MigrationRequest object
// create and insert MigrationRequest object with migration blocks
rec := req.Copy()
rec.MIGRATION_REQUEST_ID = 0
rec.MIGRATION_INPUT = blk
rec.MIGRATION_STATUS = int64(PENDING)
if utils.VERBOSE > 0 {
log.Printf("%s insert MigrationRequest record %+v", mstr, rec)
}
err = rec.Insert(tx)
if err != nil {
msg = fmt.Sprintf("unable to insert MigrationRequest record %+v, error %v", rec, err)
log.Println(msg)
if strings.Contains(err.Error(), "unique") {
// we inserted the same block
continue
// we skip insert for migration request input since it is inserted upstream
if blk != input {
err = rec.Insert(tx)
if err != nil {
msg = fmt.Sprintf("unable to insert MigrationRequest record %+v, error %v", rec, err)
log.Println(msg)
if strings.Contains(err.Error(), "unique") {
// we inserted the same block
continue
}
return []MigrationReport{migrationReport(req, msg, status, err)},
Error(err, InsertErrorCode, "", "dbs.migrate.SubmitMigration")
}
return []MigrationReport{migrationReport(req, msg, status, err)},
Error(err, InsertErrorCode, "", "dbs.migrate.SubmitMigration")
}

// get inserted migration ID
Expand Down Expand Up @@ -825,6 +906,11 @@ func startMigrationRequest(rec MigrationRequest) ([]MigrationReport, error) {
if utils.VERBOSE > 0 {
log.Printf("%s finished, migration ids %v", mstr, ids)
}

// after we done with insertion of migration blocks
// we update original migration request status and set it to PENDING
updateMigrationStatus(req, PENDING)

return reports, nil
}

Expand Down
17 changes: 17 additions & 0 deletions dbs/migration_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ type MigrationRequest struct {
RETRY_COUNT int64 `json:"retry_count"`
}

// Copy creates a new copy of migration request
func (r *MigrationRequest) Copy() MigrationRequest {
req := MigrationRequest{
MIGRATION_REQUEST_ID: r.MIGRATION_REQUEST_ID,
MIGRATION_URL: r.MIGRATION_URL,
MIGRATION_INPUT: r.MIGRATION_INPUT,
MIGRATION_STATUS: r.MIGRATION_STATUS,
MIGRATION_SERVER: r.MIGRATION_SERVER,
CREATE_BY: r.CREATE_BY,
CREATION_DATE: r.CREATION_DATE,
LAST_MODIFIED_BY: r.LAST_MODIFIED_BY,
LAST_MODIFICATION_DATE: r.LAST_MODIFICATION_DATE,
RETRY_COUNT: r.RETRY_COUNT,
}
return req
}

// Insert implementation of MigrationRequest
func (r *MigrationRequest) Insert(tx *sql.Tx) error {
var tid int64
Expand Down
15 changes: 9 additions & 6 deletions docs/MigrationServer.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,21 @@ from underlying DB backend on periodic basis
migration server settings (by default 3 times)
- 4 migration request is already exist in DB, i.e. the requested block or
dataset is already found in database
- 5 migration request is queued, i.e. initially submitted by a client
- 9 migration request termindated, this can happen in two scenarios
- migration request has been cancelled explicitly by user
- migration request failed N times and will no longer be retried
automatically
The migration request goes throught the followin cycle:
(using notations of Go-based server, see
[DBS Migrate code](https://github.com/dmwm/dbs2go/blob/master/dbs/migrate.go)
```
status change:
0 -> 1, request in progress
1 -> 2, request is completed successfully
1 -> 3, and if failed 3->1
1 -> 4, if request is alaready in DB
1 -> 9, request is terminated
QUEUED -> PENDING (5 -> 0), request is accepted for processing
PENDING -> IN PROGRESS (0 -> 1), request is in progress by DBS migration server
IN PROGRESS -> COMPLETED (1 -> 2), request is completed successfully
IN PROGRESS -> FAILED (1 -> 3), request failed but can be retried
IN PROGRESS -> EXIST_IN_DB (1 -> 4), request is alaready in DB
IN PROGRESS -> (Terminally FAILED) (1 -> 9), request is terminated after all retries
```

### Examples
Expand Down
4 changes: 4 additions & 0 deletions static/sql/check_migration_request.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SELECT MR.MIGRATION_REQUEST_ID
FROM {{.Owner}}.MIGRATION_REQUESTS MR
JOIN {{.Owner}}.MIGRATION_BLOCKS MB ON MB.MIGRATION_REQUEST_ID=MR.MIGRATION_REQUEST_ID
WHERE MR.MIGRATION_INPUT=:migration_input
4 changes: 4 additions & 0 deletions web/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Configuration struct {
MigrationCleanupInterval int `json:"migration_cleanup_interval"` // migration cleanup interval
MigrationCleanupOffset int64 `json:"migration_cleanup_offset"` // migration cleanup offset
MigrationRetries int64 `json:"migration_retries"` // migration retries
MigrationAsyncTimeout int `json:"migration_async_timeout"` // timeout for aysnc migration request

// db related configuration
DBFile string `json:"dbfile"` // dbs db file with secrets
Expand Down Expand Up @@ -106,6 +107,9 @@ func ParseConfig(configFile string) error {
if Config.LimiterPeriod == "" {
Config.LimiterPeriod = "100-S"
}
if Config.MigrationAsyncTimeout == 0 {
Config.MigrationAsyncTimeout = 600 // in seconds
}
if Config.MigrationProcessTimeout == 0 {
Config.MigrationProcessTimeout = 300 // in seconds
}
Expand Down
1 change: 1 addition & 0 deletions web/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ func Server(configFile string) {
dbs.DBOWNER = dbowner

// migration settings
dbs.MigrationAsyncTimeout = Config.MigrationAsyncTimeout
dbs.MigrationProcessTimeout = Config.MigrationProcessTimeout
dbs.MigrationServerInterval = Config.MigrationServerInterval
dbs.MigrationCleanupInterval = Config.MigrationCleanupInterval
Expand Down

0 comments on commit 134af01

Please sign in to comment.