Skip to content

Commit

Permalink
Add batch upload capability to concurrent fhirstore.Uploader and refa…
Browse files Browse the repository at this point in the history
…ctor NewUploader to take a config struct.

PiperOrigin-RevId: 462466349
  • Loading branch information
suyashkumar authored and copybara-github committed Jul 21, 2022
1 parent 6e2d9ae commit 03a40c6
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 25 deletions.
12 changes: 11 additions & 1 deletion cmd/bcda_fetch/bcda_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,17 @@ func rectifyAndWrite(r io.Reader, filePrefix string, cfg mainWrapperConfig) {
defer w.Close()
}

uploader, err := fhirstore.NewUploader(cfg.fhirStoreEndpoint, *fhirStoreGCPProject, *fhirStoreGCPLocation, *fhirStoreGCPDatasetID, *fhirStoreID, *maxFHIRStoreUploadWorkers, fhirStoreUploadErrorCounter, *fhirStoreUploadErrorFileDir)
uploader, err := fhirstore.NewUploader(fhirstore.UploaderConfig{
FHIRStoreEndpoint: cfg.fhirStoreEndpoint,
FHIRStoreID: *fhirStoreID,
FHIRProjectID: *fhirStoreGCPProject,
FHIRLocation: *fhirStoreGCPLocation,
FHIRDatasetID: *fhirStoreGCPDatasetID,
MaxWorkers: *maxFHIRStoreUploadWorkers,
ErrorCounter: fhirStoreUploadErrorCounter,
ErrorFileOutputPath: *fhirStoreUploadErrorFileDir,
})

if err != nil {
log.Exitf("unable to init uploader: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion fhirstore/fhirstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func TestUploadBatch(t *testing.T) {
t.Run("ValidResponse", func(t *testing.T) {
// FHIRStoreServerBatch will check that the uploaded bundle matches resources
// in inputJSONs.
serverURL := testhelpers.FHIRStoreServerBatch(t, inputJSONs, projectID, location, datasetID, fhirStoreID)
expectedFullBatchSize := 2
serverURL := testhelpers.FHIRStoreServerBatch(t, inputJSONs, expectedFullBatchSize, projectID, location, datasetID, fhirStoreID)

c, err := fhirstore.NewClient(context.Background(), serverURL)
if err != nil {
Expand Down
113 changes: 99 additions & 14 deletions fhirstore/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/google/medical_claims_tools/internal/counter"
)

// defaultBatchSize is the deafult batch size for FHIR store uploads in batch
// mode.
const defaultBatchSize = 5

// Uploader is a convenience wrapper for concurrent upload to FHIR store.
type Uploader struct {
fhirStoreEndpoint string
Expand All @@ -19,6 +23,11 @@ type Uploader struct {
fhirStoreDatasetID string
fhirStoreID string

// batchUpload indicates if fhirJSONs should be uploaded to FHIR store in
// batches using executeBundle in batch mode.
batchUpload bool
batchSize int

errorCounter *counter.Counter

fhirJSONs chan string
Expand All @@ -31,22 +40,46 @@ type Uploader struct {
errorNDJSONFile *os.File
}

// TODO(b/226586131): consider a config struct based parameter for NewUploader.
// UploaderConfig is a config struct used when creating a NewUploader.
type UploaderConfig struct {
FHIRStoreEndpoint string
FHIRStoreID string
FHIRProjectID string
FHIRLocation string
FHIRDatasetID string

BatchUpload bool
BatchSize int

MaxWorkers int

ErrorCounter *counter.Counter
ErrorFileOutputPath string
}

// NewUploader initializes and returns an Uploader.
func NewUploader(fhirStoreEndpoint, projectID, location, datasetID, fhirStoreID string, maxWorkers int, errorCounter *counter.Counter, errorFileOutputPath string) (*Uploader, error) {
func NewUploader(config UploaderConfig) (*Uploader, error) {

batchSize := defaultBatchSize
if config.BatchSize != 0 {
batchSize = config.BatchSize
}

u := &Uploader{
fhirStoreEndpoint: fhirStoreEndpoint,
fhirStoreProjectID: projectID,
fhirStoreLocation: location,
fhirStoreDatasetID: datasetID,
fhirStoreID: fhirStoreID,
errorCounter: errorCounter,
maxWorkers: maxWorkers,
errorFileOutputPath: errorFileOutputPath}

if errorFileOutputPath != "" {
f, err := os.OpenFile(path.Join(errorFileOutputPath, "resourcesWithErrors.ndjson"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
fhirStoreEndpoint: config.FHIRStoreEndpoint,
fhirStoreProjectID: config.FHIRProjectID,
fhirStoreLocation: config.FHIRLocation,
fhirStoreDatasetID: config.FHIRDatasetID,
fhirStoreID: config.FHIRStoreID,
errorCounter: config.ErrorCounter,
maxWorkers: config.MaxWorkers,
errorFileOutputPath: config.ErrorFileOutputPath,
batchUpload: config.BatchUpload,
batchSize: batchSize,
}

if config.ErrorFileOutputPath != "" {
f, err := os.OpenFile(path.Join(config.ErrorFileOutputPath, "resourcesWithErrors.ndjson"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
Expand All @@ -61,7 +94,11 @@ func (u *Uploader) init() {
u.wg = &sync.WaitGroup{}

for i := 0; i < u.maxWorkers; i++ {
go u.uploadWorker()
if u.batchUpload {
go u.uploadBatchWorker()
} else {
go u.uploadWorker()
}
}
}

Expand Down Expand Up @@ -112,6 +149,54 @@ func (u *Uploader) uploadWorker() {
}
}

func (u *Uploader) uploadBatchWorker() {
c, err := NewClient(context.Background(), u.fhirStoreEndpoint)
if err != nil {
log.Fatalf("error initializing FHIR store client: %v", err)
}

fhirBatchBuffer := make([][]byte, u.batchSize)
lastChannelReadOK := true
for lastChannelReadOK {
var fhirJSON string
numBufferItemsPopulated := 0
// Attempt to populate the fhirBatchBuffer. Note that this could populate
// from 0 up to u.batchSize elements before lastChannelReadOK is false.
for i := 0; i < u.batchSize; i++ {
fhirJSON, lastChannelReadOK = <-u.fhirJSONs
if !lastChannelReadOK {
break
}
fhirBatchBuffer[i] = []byte(fhirJSON)
numBufferItemsPopulated++
}

if numBufferItemsPopulated == 0 {
break
}

fhirBatch := fhirBatchBuffer[0:numBufferItemsPopulated]

// Upload batch
if err := c.UploadBatch(fhirBatch, u.fhirStoreProjectID, u.fhirStoreLocation, u.fhirStoreDatasetID, u.fhirStoreID); err != nil {
log.Errorf("error uploading batch: %v", err)
// TODO(b/225916126): in the future, try to unpack the error and only
// write out the resources within the bundle that failed. For now, we
// write out all resources in the bundle to be safe.
for _, errResource := range fhirBatch {
if u.errorCounter != nil {
u.errorCounter.Increment()
}
u.writeError(string(errResource), err)
}
}

for j := 0; j < numBufferItemsPopulated; j++ {
u.wg.Done()
}
}
}

func (u *Uploader) writeError(fhirJSON string, err error) {
if u.errorNDJSONFile != nil {
data, jsonErr := json.Marshal(errorNDJSONLine{Err: err.Error(), FHIRResource: fhirJSON})
Expand Down
Loading

0 comments on commit 03a40c6

Please sign in to comment.