diff --git a/mediator/config/index.ts b/mediator/config/index.ts index 892e710c..a3c6d1ea 100644 --- a/mediator/config/index.ts +++ b/mediator/config/index.ts @@ -2,6 +2,7 @@ import * as dotenv from 'dotenv'; dotenv.config(); export const PORT = process.env.PORT || 6000; +const REQUEST_TIMEOUT = Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000')); export const OPENHIM = { username: getEnvironmentVariable('OPENHIM_USERNAME', 'interop@openhim.org'), @@ -11,27 +12,30 @@ export const OPENHIM = { }; export const FHIR = { - url: getEnvironmentVariable('FHIR_URL', 'http://openhim-core:5001/fhir'), + url: getEnvironmentVariable('FHIR_URL', 'https://openhim-core:5001/fhir'), username: getEnvironmentVariable('FHIR_USERNAME', 'interop-client'), password: getEnvironmentVariable('FHIR_PASSWORD', 'interop-password'), - timeout: Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000')) + timeout: REQUEST_TIMEOUT }; export const CHT = { url: getEnvironmentVariable('CHT_URL', 'https://nginx'), username: getEnvironmentVariable('CHT_USERNAME', 'admin'), password: getEnvironmentVariable('CHT_PASSWORD', 'password'), - timeout: Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000')) + timeout: REQUEST_TIMEOUT }; export const OPENMRS = { - url: getEnvironmentVariable('OPENMRS_CHANNEL_URL', 'http://openhim-core:5001/openmrs'), + url: getEnvironmentVariable('OPENMRS_CHANNEL_URL', 'https://openhim-core:5001/openmrs'), username: getEnvironmentVariable('OPENMRS_CHANNEL_USERNAME', 'interop-client'), password: getEnvironmentVariable('OPENMRS_CHANNEL_PASSWORD', 'interop-password'), - timeout: Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000')) + timeout: REQUEST_TIMEOUT }; -export const SYNC_INTERVAL = getEnvironmentVariable('SYNC_INTERVAL', '60000'); +// how often in seconds the sync should run. hardcoded to 1 minute +export const SYNC_INTERVAL = '60'; +// how far back should the sync look for new resources. Defaults to one hour +export const SYNC_PERIOD = getEnvironmentVariable('SYNC_PERIOD', '3600'); function getEnvironmentVariable(env: string, def: string) { if (process.env.NODE_ENV === 'test') { diff --git a/mediator/config/openmrs_mediator.ts b/mediator/config/openmrs_mediator.ts index 968cced7..829878b3 100644 --- a/mediator/config/openmrs_mediator.ts +++ b/mediator/config/openmrs_mediator.ts @@ -3,11 +3,30 @@ export const openMRSMediatorConfig = { version: '1.0.0', name: 'OpenMRS Mediator', description: 'A mediator to sync CHT data with OpenMRS', + defaultChannelConfig: [ + { + name: 'OpenMRS Sync', + urlPattern: '^/trigger$', + routes: [ + { + name: 'OpenMRS polling Mediator', + host: 'mediator', + path: '/openmrs/sync', + port: 6000, + primary: true, + type: 'http', + }, + ], + allow: ['interop'], + type: 'polling', + pollingSchedule: '1 minute' + }, + ], endpoints: [ { name: 'OpenMRS Mediator', host: 'mediator', - path: '/', + path: '/openmrs/sync', port: '6000', primary: true, type: 'http', diff --git a/mediator/index.ts b/mediator/index.ts index 864582fc..4f9eb578 100644 --- a/mediator/index.ts +++ b/mediator/index.ts @@ -3,15 +3,15 @@ import { mediatorConfig } from './config/mediator'; import { openMRSMediatorConfig } from './config/openmrs_mediator'; import { logger } from './logger'; import bodyParser from 'body-parser'; -import {PORT, OPENHIM, SYNC_INTERVAL, OPENMRS} from './config'; +import {PORT, OPENHIM, OPENMRS} from './config'; import patientRoutes from './src/routes/patient'; import serviceRequestRoutes from './src/routes/service-request'; import encounterRoutes from './src/routes/encounter'; import organizationRoutes from './src/routes/organization'; import endpointRoutes from './src/routes/endpoint'; import chtRoutes from './src/routes/cht'; +import openMRSRoutes from './src/routes/openmrs'; import { registerMediatorCallback } from './src/utils/openhim'; -import { syncPatients, syncEncounters } from './src/utils/openmrs_sync' import os from 'os'; const {registerMediator} = require('openhim-mediator-utils'); @@ -21,7 +21,7 @@ const app = express(); app.use(bodyParser.json()); app.use(bodyParser.urlencoded({extended: true})); -app.get('*', (_: Request, res: Response) => { +app.get('/', (_: Request, res: Response) => { const osUptime = os.uptime(); const processUptime = process.uptime(); res.send({status: 'success', osuptime: osUptime, processuptime: processUptime}); @@ -34,9 +34,12 @@ app.use('/encounter', encounterRoutes); app.use('/organization', organizationRoutes); app.use('/endpoint', endpointRoutes); -// routes for cht docs +// routes for CHT docs app.use('/cht', chtRoutes); +// routes for OpenMRS +app.use('/openmrs', openMRSRoutes); + if (process.env.NODE_ENV !== 'test') { app.listen(PORT, () => logger.info(`Server listening on port ${PORT}`)); @@ -44,20 +47,8 @@ if (process.env.NODE_ENV !== 'test') { registerMediator(OPENHIM, mediatorConfig, registerMediatorCallback); // if OPENMRS is specified, register its mediator - // and start the sync background task if (OPENMRS.url) { registerMediator(OPENHIM, openMRSMediatorConfig, registerMediatorCallback); - // start patient and ecnounter sync in the background - setInterval(async () => { - try { - const startTime = new Date(); - startTime.setHours(startTime.getHours() - 1); - await syncPatients(startTime); - await syncEncounters(startTime); - } catch (error: any) { - logger.error(error); - } - }, Number(SYNC_INTERVAL)); } } diff --git a/mediator/src/controllers/openmrs.ts b/mediator/src/controllers/openmrs.ts new file mode 100644 index 00000000..9a05b831 --- /dev/null +++ b/mediator/src/controllers/openmrs.ts @@ -0,0 +1,18 @@ +import { logger } from '../../logger'; +import { syncPatients, syncEncounters } from '../utils/openmrs_sync' +import { SYNC_PERIOD } from '../../config' + +export async function sync() { + try { + let now = Date.now(); + let syncPeriod = parseInt(SYNC_PERIOD, 10); + let startTime = new Date(now - syncPeriod); + + await syncPatients(startTime); + await syncEncounters(startTime); + return { status: 200, data: { message: `OpenMRS sync completed successfully`} }; + } catch(error: any) { + logger.error(error); + return { status: 500, data: { message: `Error during OpenMRS Sync`} }; + } +} diff --git a/mediator/src/routes/openmrs.ts b/mediator/src/routes/openmrs.ts new file mode 100644 index 00000000..ac487654 --- /dev/null +++ b/mediator/src/routes/openmrs.ts @@ -0,0 +1,12 @@ +import { Router } from 'express'; +import { requestHandler } from '../utils/request'; +import { sync } from '../controllers/openmrs' + +const router = Router(); + +router.get( + '/sync', + requestHandler((req) => sync()) +); + +export default router; diff --git a/mediator/src/routes/tests/openmrs.spec.ts b/mediator/src/routes/tests/openmrs.spec.ts new file mode 100644 index 00000000..ee7e8b89 --- /dev/null +++ b/mediator/src/routes/tests/openmrs.spec.ts @@ -0,0 +1,37 @@ +import request from 'supertest'; +import app from '../../..'; +import * as openmrs_sync from '../../utils/openmrs_sync'; +import axios from 'axios'; +import { logger } from '../../../logger'; + +jest.mock('axios'); +jest.mock('../../../logger'); + +describe('GET /openmrs/sync', () => { + it('calls syncPatients and syncEncouners', async () => { + jest.spyOn(openmrs_sync, 'syncPatients').mockImplementation(async (startTime) => { + }); + + jest.spyOn(openmrs_sync, 'syncEncounters').mockImplementation(async (startTime) => { + }); + + const res = await request(app).get('/openmrs/sync').send(); + + expect(res.status).toBe(200); + + expect(openmrs_sync.syncPatients).toHaveBeenCalled(); + expect(openmrs_sync.syncEncounters).toHaveBeenCalled(); + }); + + it('returns 500 if syncPatients throws an error', async () => { + jest.spyOn(openmrs_sync, 'syncPatients').mockImplementation(async (startTime) => { + throw new Error('Sync Failed'); + }); + + const res = await request(app).get('/openmrs/sync').send(); + + expect(res.status).toBe(500); + + expect(openmrs_sync.syncPatients).toHaveBeenCalled(); + }); +}); diff --git a/mediator/src/utils/openmrs_sync.ts b/mediator/src/utils/openmrs_sync.ts index 585b3e03..af8efd6e 100644 --- a/mediator/src/utils/openmrs_sync.ts +++ b/mediator/src/utils/openmrs_sync.ts @@ -83,31 +83,34 @@ export async function compare( // get the key for each resource and create a Map const fhirIds = new Map(comparison.fhirResources.map(resource => [getKey(resource), resource])); + function isValidDate(resource: fhir4.Resource) { + // if lastUpdated is missing or invalid, cannot proceed, throw an error + if (!resource.meta?.lastUpdated) { + throw new Error("Last updated missing"); + } + const lastUpdated = new Date(resource.meta.lastUpdated); + if (isNaN(lastUpdated.getTime()) || isNaN(startTime.getTime())) { + throw new Error("Invalid date format"); + } + + // don't sync resources created with 2 * SYNC_INTERVAL of start time + const syncWindow = (Number(SYNC_INTERVAL) * 1000) * 2 + const diff = lastUpdated.getTime() - startTime.getTime(); + return diff > syncWindow; + } + comparison.openMRSResources.forEach((openMRSResource) => { const key = getKey(openMRSResource); if (fhirIds.has(key)) { - // ok so the fhir server already has it results.toupdate.push(openMRSResource); fhirIds.delete(key); - } else { - const lastUpdated = new Date(openMRSResource.meta?.lastUpdated!); - if (isNaN(lastUpdated.getTime()) || isNaN(startTime.getTime())) { - throw new Error("Invalid date format"); - } - const diff = lastUpdated.getTime() - startTime.getTime(); - if (diff > (Number(SYNC_INTERVAL) * 2)){ - results.incoming.push(openMRSResource); - } + } else if (isValidDate(openMRSResource)){ + results.incoming.push(openMRSResource); } }); fhirIds.forEach((resource, key) => { - const lastUpdated = new Date(resource.meta?.lastUpdated || ''); - if (isNaN(lastUpdated.getTime()) || isNaN(startTime.getTime())) { - throw new Error("Invalid date format"); - } - const diff = lastUpdated.getTime() - startTime.getTime(); - if (diff > (Number(SYNC_INTERVAL) * 2)){ + if (isValidDate(resource)) { results.outgoing.push(resource); } }); diff --git a/mediator/test/workflows.spec.ts b/mediator/test/workflows.spec.ts index c060ab33..c76d1600 100644 --- a/mediator/test/workflows.spec.ts +++ b/mediator/test/workflows.spec.ts @@ -166,18 +166,17 @@ describe('Workflows', () => { .auth(FHIR.username, FHIR.password); expect(checkMediatorResponse.status).toBe(200); - expect(checkMediatorResponse.body.status).toBe('success'); - //Create a patient using openMRS api + //TODO: Create a patient using openMRS api - /*const retrieveFhirPatientIdResponse = await request(FHIR.url) + const retrieveFhirPatientIdResponse = await request(FHIR.url) .get('/fhir/Patient/?identifier=' + patientId) .auth(FHIR.username, FHIR.password); expect(retrieveFhirPatientIdResponse.status).toBe(200); - expect(retrieveFhirPatientIdResponse.body.total).toBe(1);*/ + expect(retrieveFhirPatientIdResponse.body.total).toBe(1); - //retrieve and validate patient from CHT api + //TODO: retrieve and validate patient from CHT api //trigger openmrs sync //validate id });