From 4b8fa39bad91d103502523568ffae4dee892fdc3 Mon Sep 17 00:00:00 2001 From: Valentin Kuznetsov Date: Mon, 21 Nov 2022 09:12:19 -0500 Subject: [PATCH] Add helper function checkBlockExist; use it as first part of bulblocks API logic --- dbs/bulkblocks2.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/dbs/bulkblocks2.go b/dbs/bulkblocks2.go index c8f49a8..69ea084 100644 --- a/dbs/bulkblocks2.go +++ b/dbs/bulkblocks2.go @@ -445,6 +445,21 @@ func getDatasetID( return datasetID, nil } +// helper function to check if block exist in DBS database +func checkBlockExist(bName, hash string) error { + tx, err := DB.Begin() + if err != nil { + return Error(err, TransactionErrorCode, hash, "dbs.bulkblocks.checkBlockExist") + } + defer tx.Rollback() + if rid, err := GetID(tx, "BLOCKS", "block_id", "block_name", bName); err == nil && rid != 0 { + err := errors.New(fmt.Sprintf("Block %s already exists", bName)) + msg := "Data already exist in DBS" + return Error(err, BlockAlreadyExists, msg, "dbs.bulkblocks.checkBlockExist") + } + return nil +} + // InsertBulkBlocksConcurrently DBS API provides concurrent bulk blocks // insertion. It inherits the same logic as BulkBlocks API but perform // Files and FileLumis injection concurrently via chunk of record. @@ -507,6 +522,12 @@ func (a *API) InsertBulkBlocksConcurrently() error { var dataTierID, physicsGroupID, processedDatasetID, datasetAccessTypeID int64 creationDate := time.Now().Unix() + // check if give block name exist in DBS, if it does, we + // abort the entire process + if err = checkBlockExist(rec.Block.BlockName, hash); err != nil { + return err + } + // check if is_file_valid was present in request, if not set it to 1 if !strings.Contains(string(data), "is_file_valid") { isFileValid = 1 @@ -666,11 +687,8 @@ func (a *API) InsertBulkBlocksConcurrently() error { } // check if give block name exist in DBS, if it does, we // abort the entire process - bName := rec.Block.BlockName - if rid, err := GetID(tx, "BLOCKS", "block_id", "block_name", bName); err == nil && rid != 0 { - err := errors.New(fmt.Sprintf("Block %s already exists", bName)) - msg := "Data already exist in DBS" - return Error(err, BlockAlreadyExists, msg, "dbs.bulkblocks.InsertBulkBlocksConcurrently") + if err = checkBlockExist(rec.Block.BlockName, hash); err != nil { + return err } // get blockID