Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

HP-2536 Add source type to patient change audit events #762

Merged
merged 1 commit into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/tests/patient/change_events/change_events.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const patient1Resource = require('./fixtures/patient/patient1.json');
const person1withlinkResource = require('./fixtures/person/person1_withlink.json');
const person1nolinkResource = require('./fixtures/person/person1_nolink.json');
const observation1Resource = require('./fixtures/observation/observation1.json');
const observation2Resource = require('./fixtures/observation/observation2.json');

// expected
const {
Expand Down Expand Up @@ -253,5 +254,51 @@ describe('Patient Change Event Tests', () => {
expect(messageValue2.action).toBe('C');
expect(messageValue2.agent[0].who.reference).toBe('Observation/2354-InAgeCohort');
});
test('creating a new observation includes sourceType, if any', async () => {
const request = await createTestRequest();
/**
* @type {PostRequestProcessor}
*/
const postRequestProcessor = getTestContainer().postRequestProcessor;
/**
* @type {MockKafkaClient}
*/
const mockKafkaClient = getTestContainer().kafkaClient;

let resp = await request
.post('/4_0_0/Person/81236/$merge')
.send(person1withlinkResource)
.set(getHeaders());
// noinspection JSUnresolvedFunction
expect(resp).toHaveMergeResponse({created: true});

await postRequestProcessor.waitTillDoneAsync({requestId: getRequestId(resp)});
mockKafkaClient.clear();

resp = await request
.get('/4_0_0/Observation')
.set(getHeaders());
// noinspection JSUnresolvedFunction
expect(resp).toHaveResourceCount(0);

resp = await request
.post('/4_0_0/Observation/0/$merge')
.send(observation2Resource)
.set(getHeaders());
// noinspection JSUnresolvedFunction
expect(resp).toHaveMergeResponse({created: true});

// wait for post request processing to finish
await postRequestProcessor.waitTillDoneAsync({requestId: getRequestId(resp)});
/**
* @type {KafkaClientMessage[]}
*/
const messages = mockKafkaClient.getMessages();
expect(messages.length).toBe(3);
for (let message of messages) {
const messageValue = JSON.parse(message.value);
expect(messageValue.source.type[0].code).toBe('cql-engine');
}
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"resourceType": "Observation",
"id": "2354-InAgeCohort",
"meta": {
"versionId": "2",
"lastUpdated": "2021-12-15T15:30:30+00:00",
"source": "/patients",
"security": [
{
"system": "https://www.icanbwell.com/owner",
"code": "A"
},
{
"system": "https://www.icanbwell.com/access",
"code": "A"
},
{
"system": "https://www.icanbwell.com/vendor",
"code": "B"
},
{
"system": "https://www.icanbwell.com/access",
"code": "B"
}
]
},
"status": "final",
"code": {
"coding": [
{
"system": "http://www.icanbwell.com/cql/library",
"code": "BMI001"
},
{
"system": "http://www.icanbwell.com/cql/libraryVersion",
"code": "1.0.0"
},
{
"system": "http://www.icanbwell.com/cql/rule",
"code": "InAgeCohort"
}
]
},
"extension": [
{
"url": "https://www.icanbwell.com/sourceType",
"valueString": "cql-engine"
}
],
"subject": {
"reference": "Patient/2354"
},
"effectivePeriod": {
"start": "2021-01-01T00:00:00.000Z",
"end": "2021-12-31T00:00:00.000Z"
},
"issued": "2021-01-01T12:00:00Z",
"valueBoolean": false
}
69 changes: 46 additions & 23 deletions src/utils/changeEventProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class ChangeEventProducer {
* @param {boolean} isCreate
* @param {string} resourceType
* @param {string} eventName
* @param {string} sourceType
* @return {AuditEvent}
* @private
*/
Expand All @@ -122,11 +123,12 @@ class ChangeEventProducer {
timestamp,
isCreate,
resourceType,
eventName
eventName,
sourceType
}
) {
const currentDate = moment.utc().format('YYYY-MM-DD');
return new AuditEvent(
let auditEvent = new AuditEvent(
{
'id': generateUUID(),
'action': isCreate ? 'C' : 'U',
Expand Down Expand Up @@ -168,16 +170,21 @@ class ChangeEventProducer {
)
})
});
if (sourceType) {
auditEvent.source.type = new Coding({system: 'https://www.icanbwell.com/sourceType', code: sourceType });
}
return auditEvent;
}

/**
* Fire event for patient create
* @param {string} requestId
* @param {string} patientId
* @param {string} timestamp
* @param {string} sourceType
* @return {Promise<void>}
*/
async onPatientCreateAsync({requestId, patientId, timestamp}) {
async onPatientCreateAsync({requestId, patientId, timestamp, sourceType}) {
const isCreate = true;

const resourceType = 'Patient';
Expand All @@ -187,7 +194,8 @@ class ChangeEventProducer {
timestamp,
isCreate,
resourceType: resourceType,
eventName: 'Patient Create'
eventName: 'Patient Create',
sourceType
});
const key = `${patientId}`;
this.getPatientMessageMap({requestId}).set(key, messageJson);
Expand All @@ -198,16 +206,18 @@ class ChangeEventProducer {
* @param {string} requestId
* @param {string} patientId
* @param {string} timestamp
* @param {string} sourceType
* @return {Promise<void>}
*/
async onPatientChangeAsync({requestId, patientId, timestamp}) {
async onPatientChangeAsync({requestId, patientId, timestamp, sourceType}) {
const isCreate = false;

const resourceType = 'Patient';
const messageJson = this._createMessage({
requestId, id: patientId, timestamp, isCreate,
resourceType: resourceType,
eventName: 'Patient Change'
eventName: 'Patient Change',
sourceType
});

const key = `${patientId}`;
Expand Down Expand Up @@ -335,9 +345,10 @@ class ChangeEventProducer {
* @param {string} id
* @param {string} resourceType
* @param {string} timestamp
* @param {string} sourceType
* @return {Promise<void>}
*/
async onObservationCreateAsync({requestId, id, resourceType, timestamp}) {
async onObservationCreateAsync({requestId, id, resourceType, timestamp, sourceType}) {
const isCreate = true;

const messageJson = this._createMessage({
Expand All @@ -346,7 +357,8 @@ class ChangeEventProducer {
timestamp,
isCreate,
resourceType: resourceType,
eventName: 'Observation Create'
eventName: 'Observation Create',
sourceType
});
const key = `${id}`;
this.getObservationMessageMap({requestId}).set(key, messageJson);
Expand All @@ -358,9 +370,10 @@ class ChangeEventProducer {
* @param {string} id
* @param {string} resourceType
* @param {string} timestamp
* @param {string} sourceType
* @return {Promise<void>}
*/
async onObservationChangeAsync({requestId, id, resourceType, timestamp}) {
async onObservationChangeAsync({requestId, id, resourceType, timestamp, sourceType}) {
const isCreate = false;

const messageJson = this._createMessage({
Expand All @@ -369,7 +382,8 @@ class ChangeEventProducer {
timestamp,
isCreate,
resourceType: resourceType,
eventName: 'Observation Change'
eventName: 'Observation Change',
sourceType
});

const key = `${id}`;
Expand All @@ -387,9 +401,10 @@ class ChangeEventProducer {
* @param {string} id
* @param {string} resourceType
* @param {string} timestamp
* @param {string} sourceType
* @return {Promise<void>}
*/
async onObservationCompleteAsync({requestId, id, resourceType, timestamp}) {
async onObservationCompleteAsync({requestId, id, resourceType, timestamp, sourceType}) {
const isCreate = false;

const messageJson = this._createMessage({
Expand All @@ -398,7 +413,8 @@ class ChangeEventProducer {
timestamp,
isCreate,
resourceType: resourceType,
eventName: 'Observation Complete'
eventName: 'Observation Complete',
sourceType
});

const key = `${id}`;
Expand All @@ -416,9 +432,10 @@ class ChangeEventProducer {
* @param {string} id
* @param {string} resourceType
* @param {string} timestamp
* @param {string} sourceType
* @return {Promise<void>}
*/
async onObservationCanceledAsync({requestId, id, resourceType, timestamp}) {
async onObservationCanceledAsync({requestId, id, resourceType, timestamp, sourceType}) {
const isCreate = false;

const messageJson = this._createMessage({
Expand All @@ -427,7 +444,8 @@ class ChangeEventProducer {
timestamp,
isCreate,
resourceType: resourceType,
eventName: 'Observation Canceled'
eventName: 'Observation Canceled',
sourceType
});

const key = `${id}`;
Expand All @@ -453,6 +471,11 @@ class ChangeEventProducer {
*/
const currentDate = moment.utc().format('YYYY-MM-DD');

let sourceType;
if (doc.extension && doc.extension.some(x => x.url === 'https://www.icanbwell.com/sourceType')) {
sourceType = doc.extension.find(x => x.url === 'https://www.icanbwell.com/sourceType').valueString;
}

/**
* @type {string|null}
*/
Expand All @@ -461,19 +484,19 @@ class ChangeEventProducer {
if (eventType === 'C' && resourceType === 'Patient') {
await this.onPatientCreateAsync(
{
requestId, patientId, timestamp: currentDate
requestId, patientId, timestamp: currentDate, sourceType
});
} else {
await this.onPatientChangeAsync({
requestId, patientId, timestamp: currentDate
requestId, patientId, timestamp: currentDate, sourceType
}
);

let personId = await this.bwellPersonFinder.getBwellPersonIdAsync({patientId: patientId});
if (personId) {
const proxyPatientId = `person.${personId}`;
await this.onPatientChangeAsync({
requestId, patientId: proxyPatientId, timestamp: currentDate
requestId, patientId: proxyPatientId, timestamp: currentDate, sourceType
}
);
}
Expand All @@ -483,12 +506,12 @@ class ChangeEventProducer {
const proxyPatientId = `person.${doc.id}`;
if (eventType === 'C') {
await this.onPatientCreateAsync({
requestId, patientId: proxyPatientId, timestamp: currentDate
requestId, patientId: proxyPatientId, timestamp: currentDate, sourceType
}
);
} else {
await this.onPatientChangeAsync({
requestId, patientId: proxyPatientId, timestamp: currentDate
requestId, patientId: proxyPatientId, timestamp: currentDate, sourceType
}
);
}
Expand All @@ -515,19 +538,19 @@ class ChangeEventProducer {
if (resourceType === 'Observation') {
if (eventType === 'C') {
await this.onObservationCreateAsync({
requestId, id: doc.id, resourceType, timestamp: currentDate
requestId, id: doc.id, resourceType, timestamp: currentDate, sourceType
});
} else if (doc.status === 'final') {
await this.onObservationCompleteAsync({
requestId, id: doc.id, resourceType, timestamp: currentDate
requestId, id: doc.id, resourceType, timestamp: currentDate, sourceType
});
} else if (doc.status === 'cancelled') {
await this.onObservationCanceledAsync({
requestId, id: doc.id, resourceType, timestamp: currentDate
requestId, id: doc.id, resourceType, timestamp: currentDate, sourceType
});
} else {
await this.onObservationChangeAsync({
requestId, id: doc.id, resourceType, timestamp: currentDate
requestId, id: doc.id, resourceType, timestamp: currentDate, sourceType
});
}
}
Expand Down