Skip to content

Commit

Permalink
feat(#138): move polling to openhim channel config (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
witash authored Oct 25, 2024
1 parent ef558a2 commit d8cb295
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 44 deletions.
16 changes: 10 additions & 6 deletions mediator/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as dotenv from 'dotenv';
dotenv.config();

export const PORT = process.env.PORT || 6000;
const REQUEST_TIMEOUT = Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000'));

export const OPENHIM = {
username: getEnvironmentVariable('OPENHIM_USERNAME', '[email protected]'),
Expand All @@ -11,27 +12,30 @@ export const OPENHIM = {
};

export const FHIR = {
url: getEnvironmentVariable('FHIR_URL', 'http://openhim-core:5001/fhir'),
url: getEnvironmentVariable('FHIR_URL', 'https://openhim-core:5001/fhir'),
username: getEnvironmentVariable('FHIR_USERNAME', 'interop-client'),
password: getEnvironmentVariable('FHIR_PASSWORD', 'interop-password'),
timeout: Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000'))
timeout: REQUEST_TIMEOUT
};

export const CHT = {
url: getEnvironmentVariable('CHT_URL', 'https://nginx'),
username: getEnvironmentVariable('CHT_USERNAME', 'admin'),
password: getEnvironmentVariable('CHT_PASSWORD', 'password'),
timeout: Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000'))
timeout: REQUEST_TIMEOUT
};

export const OPENMRS = {
url: getEnvironmentVariable('OPENMRS_CHANNEL_URL', 'http://openhim-core:5001/openmrs'),
url: getEnvironmentVariable('OPENMRS_CHANNEL_URL', 'https://openhim-core:5001/openmrs'),
username: getEnvironmentVariable('OPENMRS_CHANNEL_USERNAME', 'interop-client'),
password: getEnvironmentVariable('OPENMRS_CHANNEL_PASSWORD', 'interop-password'),
timeout: Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000'))
timeout: REQUEST_TIMEOUT
};

export const SYNC_INTERVAL = getEnvironmentVariable('SYNC_INTERVAL', '60000');
// how often in seconds the sync should run. hardcoded to 1 minute
export const SYNC_INTERVAL = '60';
// how far back should the sync look for new resources. Defaults to one hour
export const SYNC_PERIOD = getEnvironmentVariable('SYNC_PERIOD', '3600');

function getEnvironmentVariable(env: string, def: string) {
if (process.env.NODE_ENV === 'test') {
Expand Down
21 changes: 20 additions & 1 deletion mediator/config/openmrs_mediator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,30 @@ export const openMRSMediatorConfig = {
version: '1.0.0',
name: 'OpenMRS Mediator',
description: 'A mediator to sync CHT data with OpenMRS',
defaultChannelConfig: [
{
name: 'OpenMRS Sync',
urlPattern: '^/trigger$',
routes: [
{
name: 'OpenMRS polling Mediator',
host: 'mediator',
path: '/openmrs/sync',
port: 6000,
primary: true,
type: 'http',
},
],
allow: ['interop'],
type: 'polling',
pollingSchedule: '1 minute'
},
],
endpoints: [
{
name: 'OpenMRS Mediator',
host: 'mediator',
path: '/',
path: '/openmrs/sync',
port: '6000',
primary: true,
type: 'http',
Expand Down
23 changes: 7 additions & 16 deletions mediator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ import { mediatorConfig } from './config/mediator';
import { openMRSMediatorConfig } from './config/openmrs_mediator';
import { logger } from './logger';
import bodyParser from 'body-parser';
import {PORT, OPENHIM, SYNC_INTERVAL, OPENMRS} from './config';
import {PORT, OPENHIM, OPENMRS} from './config';
import patientRoutes from './src/routes/patient';
import serviceRequestRoutes from './src/routes/service-request';
import encounterRoutes from './src/routes/encounter';
import organizationRoutes from './src/routes/organization';
import endpointRoutes from './src/routes/endpoint';
import chtRoutes from './src/routes/cht';
import openMRSRoutes from './src/routes/openmrs';
import { registerMediatorCallback } from './src/utils/openhim';
import { syncPatients, syncEncounters } from './src/utils/openmrs_sync'
import os from 'os';

const {registerMediator} = require('openhim-mediator-utils');
Expand All @@ -21,7 +21,7 @@ const app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({extended: true}));

app.get('*', (_: Request, res: Response) => {
app.get('/', (_: Request, res: Response) => {
const osUptime = os.uptime();
const processUptime = process.uptime();
res.send({status: 'success', osuptime: osUptime, processuptime: processUptime});
Expand All @@ -34,30 +34,21 @@ app.use('/encounter', encounterRoutes);
app.use('/organization', organizationRoutes);
app.use('/endpoint', endpointRoutes);

// routes for cht docs
// routes for CHT docs
app.use('/cht', chtRoutes);

// routes for OpenMRS
app.use('/openmrs', openMRSRoutes);

if (process.env.NODE_ENV !== 'test') {
app.listen(PORT, () => logger.info(`Server listening on port ${PORT}`));

// TODO => inject the 'port' and 'http scheme' into 'mediatorConfig'
registerMediator(OPENHIM, mediatorConfig, registerMediatorCallback);

// if OPENMRS is specified, register its mediator
// and start the sync background task
if (OPENMRS.url) {
registerMediator(OPENHIM, openMRSMediatorConfig, registerMediatorCallback);
// start patient and ecnounter sync in the background
setInterval(async () => {
try {
const startTime = new Date();
startTime.setHours(startTime.getHours() - 1);
await syncPatients(startTime);
await syncEncounters(startTime);
} catch (error: any) {
logger.error(error);
}
}, Number(SYNC_INTERVAL));
}
}

Expand Down
18 changes: 18 additions & 0 deletions mediator/src/controllers/openmrs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { logger } from '../../logger';
import { syncPatients, syncEncounters } from '../utils/openmrs_sync'
import { SYNC_PERIOD } from '../../config'

export async function sync() {
try {
let now = Date.now();
let syncPeriod = parseInt(SYNC_PERIOD, 10);
let startTime = new Date(now - syncPeriod);

await syncPatients(startTime);
await syncEncounters(startTime);
return { status: 200, data: { message: `OpenMRS sync completed successfully`} };
} catch(error: any) {
logger.error(error);
return { status: 500, data: { message: `Error during OpenMRS Sync`} };
}
}
12 changes: 12 additions & 0 deletions mediator/src/routes/openmrs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Router } from 'express';
import { requestHandler } from '../utils/request';
import { sync } from '../controllers/openmrs'

const router = Router();

router.get(
'/sync',
requestHandler((req) => sync())
);

export default router;
37 changes: 37 additions & 0 deletions mediator/src/routes/tests/openmrs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import request from 'supertest';
import app from '../../..';
import * as openmrs_sync from '../../utils/openmrs_sync';
import axios from 'axios';
import { logger } from '../../../logger';

jest.mock('axios');
jest.mock('../../../logger');

describe('GET /openmrs/sync', () => {
it('calls syncPatients and syncEncouners', async () => {
jest.spyOn(openmrs_sync, 'syncPatients').mockImplementation(async (startTime) => {
});

jest.spyOn(openmrs_sync, 'syncEncounters').mockImplementation(async (startTime) => {
});

const res = await request(app).get('/openmrs/sync').send();

expect(res.status).toBe(200);

expect(openmrs_sync.syncPatients).toHaveBeenCalled();
expect(openmrs_sync.syncEncounters).toHaveBeenCalled();
});

it('returns 500 if syncPatients throws an error', async () => {
jest.spyOn(openmrs_sync, 'syncPatients').mockImplementation(async (startTime) => {
throw new Error('Sync Failed');
});

const res = await request(app).get('/openmrs/sync').send();

expect(res.status).toBe(500);

expect(openmrs_sync.syncPatients).toHaveBeenCalled();
});
});
35 changes: 19 additions & 16 deletions mediator/src/utils/openmrs_sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,31 +83,34 @@ export async function compare(
// get the key for each resource and create a Map
const fhirIds = new Map(comparison.fhirResources.map(resource => [getKey(resource), resource]));

function isValidDate(resource: fhir4.Resource) {
// if lastUpdated is missing or invalid, cannot proceed, throw an error
if (!resource.meta?.lastUpdated) {
throw new Error("Last updated missing");
}
const lastUpdated = new Date(resource.meta.lastUpdated);
if (isNaN(lastUpdated.getTime()) || isNaN(startTime.getTime())) {
throw new Error("Invalid date format");
}

// don't sync resources created with 2 * SYNC_INTERVAL of start time
const syncWindow = (Number(SYNC_INTERVAL) * 1000) * 2
const diff = lastUpdated.getTime() - startTime.getTime();
return diff > syncWindow;
}

comparison.openMRSResources.forEach((openMRSResource) => {
const key = getKey(openMRSResource);
if (fhirIds.has(key)) {
// ok so the fhir server already has it
results.toupdate.push(openMRSResource);
fhirIds.delete(key);
} else {
const lastUpdated = new Date(openMRSResource.meta?.lastUpdated!);
if (isNaN(lastUpdated.getTime()) || isNaN(startTime.getTime())) {
throw new Error("Invalid date format");
}
const diff = lastUpdated.getTime() - startTime.getTime();
if (diff > (Number(SYNC_INTERVAL) * 2)){
results.incoming.push(openMRSResource);
}
} else if (isValidDate(openMRSResource)){
results.incoming.push(openMRSResource);
}
});

fhirIds.forEach((resource, key) => {
const lastUpdated = new Date(resource.meta?.lastUpdated || '');
if (isNaN(lastUpdated.getTime()) || isNaN(startTime.getTime())) {
throw new Error("Invalid date format");
}
const diff = lastUpdated.getTime() - startTime.getTime();
if (diff > (Number(SYNC_INTERVAL) * 2)){
if (isValidDate(resource)) {
results.outgoing.push(resource);
}
});
Expand Down
9 changes: 4 additions & 5 deletions mediator/test/workflows.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,17 @@ describe('Workflows', () => {
.auth(FHIR.username, FHIR.password);

expect(checkMediatorResponse.status).toBe(200);
expect(checkMediatorResponse.body.status).toBe('success');

//Create a patient using openMRS api
//TODO: Create a patient using openMRS api

/*const retrieveFhirPatientIdResponse = await request(FHIR.url)
const retrieveFhirPatientIdResponse = await request(FHIR.url)
.get('/fhir/Patient/?identifier=' + patientId)
.auth(FHIR.username, FHIR.password);

expect(retrieveFhirPatientIdResponse.status).toBe(200);
expect(retrieveFhirPatientIdResponse.body.total).toBe(1);*/
expect(retrieveFhirPatientIdResponse.body.total).toBe(1);

//retrieve and validate patient from CHT api
//TODO: retrieve and validate patient from CHT api
//trigger openmrs sync
//validate id
});
Expand Down

0 comments on commit d8cb295

Please sign in to comment.