Skip to content

Commit

Permalink
Add helper function checkBlockExist; use it as first part of bulblock…
Browse files Browse the repository at this point in the history
…s API logic
  • Loading branch information
vkuznet committed Nov 21, 2022
1 parent 000d8b9 commit 4b8fa39
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions dbs/bulkblocks2.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,21 @@ func getDatasetID(
return datasetID, nil
}

// helper function to check if block exist in DBS database
func checkBlockExist(bName, hash string) error {
tx, err := DB.Begin()
if err != nil {
return Error(err, TransactionErrorCode, hash, "dbs.bulkblocks.checkBlockExist")
}
defer tx.Rollback()
if rid, err := GetID(tx, "BLOCKS", "block_id", "block_name", bName); err == nil && rid != 0 {
err := errors.New(fmt.Sprintf("Block %s already exists", bName))
msg := "Data already exist in DBS"
return Error(err, BlockAlreadyExists, msg, "dbs.bulkblocks.checkBlockExist")
}
return nil
}

// InsertBulkBlocksConcurrently DBS API provides concurrent bulk blocks
// insertion. It inherits the same logic as BulkBlocks API but perform
// Files and FileLumis injection concurrently via chunk of record.
Expand Down Expand Up @@ -507,6 +522,12 @@ func (a *API) InsertBulkBlocksConcurrently() error {
var dataTierID, physicsGroupID, processedDatasetID, datasetAccessTypeID int64
creationDate := time.Now().Unix()

// check if give block name exist in DBS, if it does, we
// abort the entire process
if err = checkBlockExist(rec.Block.BlockName, hash); err != nil {
return err
}

// check if is_file_valid was present in request, if not set it to 1
if !strings.Contains(string(data), "is_file_valid") {
isFileValid = 1
Expand Down Expand Up @@ -666,11 +687,8 @@ func (a *API) InsertBulkBlocksConcurrently() error {
}
// check if give block name exist in DBS, if it does, we
// abort the entire process
bName := rec.Block.BlockName
if rid, err := GetID(tx, "BLOCKS", "block_id", "block_name", bName); err == nil && rid != 0 {
err := errors.New(fmt.Sprintf("Block %s already exists", bName))
msg := "Data already exist in DBS"
return Error(err, BlockAlreadyExists, msg, "dbs.bulkblocks.InsertBulkBlocksConcurrently")
if err = checkBlockExist(rec.Block.BlockName, hash); err != nil {
return err
}

// get blockID
Expand Down

0 comments on commit 4b8fa39

Please sign in to comment.