Skip to content

Commit

Permalink
New import (#153)
Browse files Browse the repository at this point in the history
* Initial import workflow complete

* New  workflow

* Remove bulk-data-utilities as dependency

* Error handling reworked following the most recent deqmig

* Add headers check for

* Add better error messages

* Conditional requestIdentity, as written

* Additional error handling for ndjson worker

* Update src/server/ndjsonWorker.js

Co-authored-by: hossenlopp <[email protected]>

* Remove 'new bulk import' phrasing

* Readme updates

---------

Co-authored-by: hossenlopp <[email protected]>
  • Loading branch information
elsaperelli and hossenlopp authored Jun 28, 2024
1 parent dc89180 commit 1e0a3e1
Show file tree
Hide file tree
Showing 27 changed files with 412 additions and 712 deletions.
17 changes: 3 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,11 @@ Check out the [Patient-everything operation spec](https://www.hl7.org/fhir/opera

### Bulk Data Access

The server contains functionality for the FHIR Bulk Data Import operation using the [Ping and Pull Approach](https://github.com/smart-on-fhir/bulk-import/blob/master/import-pnp.md).
The server contains functionality for the bulk $import operation as defined by the [Data Exchange for Quality Measures Implementation Guide](https://build.fhir.org/ig/HL7/davinci-deqm/branches/bulk-import-draft/bulk-import.html#data-exchange-using-bulk-import).

To implement a bulk data import operation of all the resources on a FHIR Bulk Data Export server, POST a valid FHIR parameters object to `http://localhost:3000/$import`. Use the parameter format below to specify a bulk export server.
The first step in the bulk $import operation work flow is to gather data for submission and organize that data into a set of inputs that this server will retrieve. This is outlined in detail in the [DEQM IG Data Exchange Using Bulk $import Section](https://build.fhir.org/ig/HL7/davinci-deqm/branches/bulk-import-draft/bulk-import.html#data-exchange-using-bulk-import) and _by type_ inputs can be gathered using the [bulk-export-server](https://github.com/projecttacoma/bulk-export-server) $export operation. _by subject_ and hybrid _by type_ and _by subject_ inputs are not yet implemented in this server.

To implement the bulk data import operation from the data requirements for a specific measure, first POST a valid transaction bundle. Then, POST a valid FHIR parameters object to `http://localhost:3000/4_0_1/Measure/$bulk-submit-data` or `http://localhost:3000/4_0_1/Measure/<your-measure-id>/$bulk-submit-data` with the `"prefer": "respond-async"` header populated. This will kick off the "ping and pull" bulk import.

For the bulk data import operation to be successful, the user must specify an export URL to a FHIR Bulk Data Export server in the request body of the FHIR parameters object. For example, in the `parameter` array of the FHIR parameters object, the user can include

```bash
{
"name": "exportUrl",
"valueString": "https://example-export.com"
}
```

with a valid kickoff endpoint URL for the `valueString`.
To kickoff a bulk data import operation, POST a valid [Import Manifest](https://build.fhir.org/ig/HL7/davinci-deqm/branches/bulk-import-draft/StructureDefinition-ImportManifest.html) object to `http://localhost:3000/$import`.

The user can check the status of an $import or $bulk-submit-data request by copying the content-location header in the response, and sending a GET request to `http://localhost:3000/<content-location-header>`.

Expand Down
97 changes: 0 additions & 97 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"@projecttacoma/node-fhir-server-core": "^2.2.7",
"axios": "^0.28.0",
"bee-queue": "^1.4.0",
"bulk-data-utilities": "git+https://[email protected]/projecttacoma/bulk-data-utilities",
"cors": "^2.8.5",
"cql-exec-fhir-mongo": "git+https://[email protected]/projecttacoma/cql-exec-fhir-mongo",
"dotenv": "^10.0.0",
Expand All @@ -54,4 +53,4 @@
"./test/globalSetup.js"
]
}
}
}
33 changes: 30 additions & 3 deletions src/database/dbOperations.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ const findResourcesWithAggregation = async (query, resourceType) => {
* which can be queried to get updates on the status of the bulk import
* @returns {string} the id of the inserted client
*/
const addPendingBulkImportRequest = async () => {
const addPendingBulkImportRequest = async body => {
const collection = db.collection('bulkImportStatuses');
const clientId = uuidv4();
const bulkImportClient = {
Expand All @@ -146,7 +146,8 @@ const addPendingBulkImportRequest = async () => {
totalFileCount: -1,
exportedResourceCount: -1,
totalResourceCount: -1,
failedOutcomes: []
failedOutcomes: [],
importManifest: body
};
logger.debug(`Adding a bulkImportStatus for clientId: ${clientId}`);
await collection.insertOne(bulkImportClient);
Expand Down Expand Up @@ -195,6 +196,30 @@ const pushBulkFailedOutcomes = async (clientId, failedOutcomes) => {
await collection.findOneAndUpdate({ id: clientId }, { $push: { failedOutcomes: { $each: failedOutcomes } } });
};

/**
* Pushes an array of error messages to a ndjson status entry to later be converted to
* OperationOutcomes and made accessible via ndjson file to requestor
* @param {String} clientId The id associated with the bulkImport request
* @param {String} fileUrl The url for the resource ndjson
* @param {Array} failedOutcomes An array of strings with messages detailing why the resource failed import
*/
const pushNdjsonFailedOutcomes = async (clientId, fileUrl, failedOutcomes) => {
const collection = db.collection('ndjsonStatuses');
await collection.insertOne({ id: clientId + fileUrl, failedOutcomes: failedOutcomes });
return clientId;
};

/**
* Wrapper for the findResourceById function that only searches ndjsonStatuses db
* @param {string} clientId The id signifying the bulk status request
* @param {string} fileUrl The ndjson fileUrl
* @returns {Object} The ndjson status entry for the passed in clientId and fileUrl
*/
const getNdjsonFileStatus = async (clientId, fileUrl) => {
const status = await findResourceById(clientId + fileUrl, 'ndjsonStatuses');
return status;
};

/**
* Wrapper for the findResourceById function that only searches bulkImportStatuses db
* @param {string} clientId The id signifying the bulk status request
Expand Down Expand Up @@ -318,5 +343,7 @@ module.exports = {
removeResource,
updateResource,
updateSuccessfulImportCount,
getCountOfCollection
getCountOfCollection,
pushNdjsonFailedOutcomes,
getNdjsonFileStatus
};
49 changes: 8 additions & 41 deletions src/server/importWorker.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Sets up queue which processes the jobs pushed to Redis
// This queue is run in a child process when the server is started
const Queue = require('bee-queue');
const { BulkImportWrappers } = require('bulk-data-utilities');
const { failBulkImportRequest, initializeBulkFileCount } = require('../database/dbOperations');
const mongoUtil = require('../database/connection');
const ndjsonQueue = require('../queue/ndjsonProcessQueue');
Expand All @@ -16,16 +15,14 @@ const importQueue = new Queue('import', {
removeOnSuccess: true
});

// This handler pulls down the jobs on Redis to handle
importQueue.process(async job => {
// Payload of createJob exists on job.data
const { clientEntry, exportURL, exportType, measureBundle, useTypeFilters } = job.data;
const { clientEntry, inputUrls } = job.data;
logger.info(`import-worker-${process.pid}: Processing Request: ${clientEntry}`);

await mongoUtil.client.connect();
// Call the existing export ndjson function that writes the files
logger.info(`import-worker-${process.pid}: Kicking off export request: ${exportURL}`);
const result = await executePingAndPull(clientEntry, exportURL, exportType, measureBundle, useTypeFilters);
// Call function to get the ndjson files
const result = await executeImportWorkflow(clientEntry, inputUrls);
if (result) {
logger.info(`import-worker-${process.pid}: Enqueued jobs for: ${clientEntry}`);
} else {
Expand All @@ -34,50 +31,20 @@ importQueue.process(async job => {
await mongoUtil.client.close();
});

/**
* Calls the bulk-data-utilities wrapper function to get data requirements for the passed in measure, convert those to
* export requests from a bulk export server, then retrieve ndjson from that server and parse it into valid transaction bundles.
* Finally, uploads the resulting transaction bundles to the server and updates the bulkstatus endpoint
* @param {string} clientEntryId The unique identifier which corresponds to the bulkstatus content location for update
* @param {string} exportUrl The url of the bulk export fhir server
* @param {string} exportType The code of the exportType
* @param {Object} measureBundle The measure bundle for which to retrieve data requirements
* @param {boolean} useTypeFilters optional boolean for whether to use type filters for bulk submit data
*/
const executePingAndPull = async (clientEntryId, exportUrl, exportType, measureBundle, useTypeFilters) => {
const executeImportWorkflow = async (clientEntryId, inputUrls) => {
try {
// Default to not use typeFilters for measure specific import
const output = await BulkImportWrappers.executeBulkImport(
exportUrl,
exportType,
measureBundle,
useTypeFilters || false
);

if (output.length === 0) {
throw new Error('Export server failed to export any resources');
}
// Calculate number of resources to export, if available. Otherwise, set to -1.
const resourceCount = output.reduce((resources, fileInfo) => {
if (resources === -1 || fileInfo.count === undefined) {
return -1;
}
return resources + fileInfo.count;
}, 0);

await initializeBulkFileCount(clientEntryId, output.length, resourceCount);
await initializeBulkFileCount(clientEntryId, inputUrls.length, -1);

// Enqueue a parsing job for each ndjson file
await ndjsonQueue.saveAll(
output.map(locationInfo =>
inputUrls.map(url =>
ndjsonQueue.createJob({
fileUrl: locationInfo.url,
fileUrl: url.url,
clientId: clientEntryId,
resourceCount: resourceCount === -1 ? -1 : locationInfo.count
resourceCount: -1
})
)
);

return true;
} catch (e) {
await failBulkImportRequest(clientEntryId, e);
Expand Down
Loading

0 comments on commit 1e0a3e1

Please sign in to comment.