Skip to content

Commit

Permalink
Automatically include "SQL" with the original query in context
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Apr 11, 2024
1 parent b769576 commit 91d8c71
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 27 deletions.
22 changes: 10 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,18 @@ app.use(
);
```

Make database changes and make sure they're all stored in a table called `changes`
Make database changes and make sure they're all stored in a table called `changes` in the destination DB

```
psql -h us-west-1-prod-destination-pool.ctbxbtz4ojdc.us-west-1.rds.amazonaws.com -p 5432 -U u_9adb30103a55 -d db_9adb30103a55 -c \
'SELECT "primary_key", "table", "operation", "before", "after", "context", "committed_at" FROM changes;'
Password for user u_9adb30103a55:
primary_key | table | operation | before | after | context | committed_at
-------------+-------+-----------+---------------------------------------------------+----------------------------------------------------+---------------------------------------------------------------------------------------------+------------------------
26 | todo | CREATE | {} | {"id": 26, "task": "Sleep", "isCompleted": false} | {"userId": 187234, "endpoint": "/todo", "params": {"task": "Sleep", "isCompleted": false}} | 2023-12-11 17:09:09+00
27 | todo | CREATE | {} | {"id": 27, "task": "Eat", "isCompleted": false} | {"userId": 187234, "endpoint": "/todo", "params": {"task": "Eat", "isCompleted": false}} | 2023-12-11 17:09:11+00
28 | todo | CREATE | {} | {"id": 28, "task": "Repeat", "isCompleted": false} | {"userId": 187234, "endpoint": "/todo", "params": {"task": "Repeat", "isCompleted": false}} | 2023-12-11 17:09:13+00
26 | todo | UPDATE | {"id": 26, "task": "Sleep", "isCompleted": false} | {"id": 26, "task": "Sleep", "isCompleted": true} | {"userId": 187234, "endpoint": "/todo/complete", "params": {"id": 26}} | 2023-12-11 17:09:15+00
27 | todo | DELETE | {"id": 27, "task": "Eat", "isCompleted": false} | {} | {"userId": 187234, "endpoint": "/todo/27", "params": {"id": 27}} | 2023-12-11 17:09:18+00
psql -h [HOSTNAME] -U [USERNAME] -d [DATABASE] -c 'SELECT "primary_key", "table", "operation", "before", "after", "context", "committed_at" FROM changes;'
primary_key | table | operation | before | after | context | committed_at
-------------+-------+-----------+---------------------------------------------------+----------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+------------------------
26 | todo | CREATE | {} | {"id": 26, "task": "Sleep", "isCompleted": false} | {"userId": 187234, "endpoint": "/todo", "params": {"task": "Sleep", "isCompleted": false}, "SQL": "INSERT INTO ..."} | 2023-12-11 17:09:09+00
27 | todo | CREATE | {} | {"id": 27, "task": "Eat", "isCompleted": false} | {"userId": 187234, "endpoint": "/todo", "params": {"task": "Eat", "isCompleted": false}, "SQL": "INSERT INTO ..."} | 2023-12-11 17:09:11+00
28 | todo | CREATE | {} | {"id": 28, "task": "Repeat", "isCompleted": false} | {"userId": 187234, "endpoint": "/todo", "params": {"task": "Repeat", "isCompleted": false}, "SQL": "INSERT INTO ..."} | 2023-12-11 17:09:13+00
26 | todo | UPDATE | {"id": 26, "task": "Sleep", "isCompleted": false} | {"id": 26, "task": "Sleep", "isCompleted": true} | {"userId": 187234, "endpoint": "/todo/complete", "params": {"id": 26}, "SQL": "UPDATE ..."} | 2023-12-11 17:09:15+00
27 | todo | DELETE | {"id": 27, "task": "Eat", "isCompleted": false} | {} | {"userId": 187234, "endpoint": "/todo/27", "params": {"id": 27}, "SQL": "DELETE FROM ..."} | 2023-12-11 17:09:18+00
```

Check out our [Prisma Docs](https://docs.bemi.io/orms/prisma) for more details.
Expand Down
6 changes: 3 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Pool } from "pg";
import { Request, Response, NextFunction } from "express";

import { PrismaPg } from './pg-adapter';
import { isBemiContext, isWriteQuery } from './pg-utils'
import { isContextComment, isWriteQuery, contextToSqlComment } from './pg-utils'
import { log } from './logger'

const WRITE_OPERATIONS = ["create", "update", "upsert", "delete", "createMany", "updateMany", "deleteMany"]
Expand All @@ -29,7 +29,7 @@ export const withPgAdapter = <PrismaClientType>(originalPrisma: PrismaClientType
}

// Injected context query
if (operation === '$executeRawUnsafe' && args[0] && isBemiContext(args[0])) {
if (operation === '$executeRawUnsafe' && args[0] && isContextComment(args[0])) {
return query(args)
}

Expand All @@ -42,7 +42,7 @@ export const withPgAdapter = <PrismaClientType>(originalPrisma: PrismaClientType
// The PG adapter will remove the transaction and add the comment
// to the query directly to be executed as a single SQL statement
const [, result] = await prisma.$transaction([
prisma.$executeRawUnsafe(`/*Bemi ${JSON.stringify(context)} Bemi*/`),
prisma.$executeRawUnsafe(contextToSqlComment(context)),
query(args),
]);
return result
Expand Down
20 changes: 15 additions & 5 deletions src/pg-adapter.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import { Client, Pool } from 'pg'
import { PrismaPg } from './pg-adapter'
import { contextToSqlComment } from './pg-utils'

const CONTEXT = {
endpoint: '/todo/complete',
userID: 1,
queryParams: { id: 37 },
}

const QUERIES = {
BEGIN: 'BEGIN',
COMMIT: 'COMMIT',
SELECT: 'SELECT "Todo"."Todo"."id", "Todo"."Todo"."task", "Todo"."Todo"."isCompleted" FROM "Todo"."Todo" WHERE "Todo"."Todo"."id" = $1 OFFSET $2',
UPDATE: 'UPDATE "Todo"."Todo" SET "isCompleted" = $1 WHERE ("Todo"."Todo"."id" = $2 AND 1=1) RETURNING "Todo"."Todo"."id", "Todo"."Todo"."task", "Todo"."Todo"."isCompleted"',
DELETE: 'DELETE FROM "Todo"."Todo" WHERE ("Todo"."Todo"."id" = $1 AND 1=1)',
CONTEXT: '/*Bemi {"endpoint":"/todo/complete","userID":1,"queryParams":{"id":37}} Bemi*/',
CONTEXT: contextToSqlComment(CONTEXT),
}

const callMockedPgAdapater = async (queries: string[]) => {
Expand All @@ -22,6 +28,10 @@ const callMockedPgAdapater = async (queries: string[]) => {
return query
}

const queryWithContext = (query: string) => {
return `${query} ${contextToSqlComment({ SQL: query, ...CONTEXT })}`
}

describe('PrismaPg', () => {
describe('performIO with a transaction', () => {
test('works with context & write operations', async () => {
Expand All @@ -35,7 +45,7 @@ describe('PrismaPg', () => {
const query = await callMockedPgAdapater(queries)

expect(query.mock.calls.map((c: any) => c[0].text)).toStrictEqual([
`${QUERIES.UPDATE} ${QUERIES.CONTEXT}`,
queryWithContext(QUERIES.UPDATE),
]);
})

Expand All @@ -53,7 +63,7 @@ describe('PrismaPg', () => {
expect(query.mock.calls.map((c: any) => c[0].text)).toStrictEqual([
QUERIES.BEGIN,
QUERIES.SELECT,
`${QUERIES.DELETE} ${QUERIES.CONTEXT}`,
queryWithContext(QUERIES.DELETE),
QUERIES.COMMIT,
]);
})
Expand Down Expand Up @@ -88,7 +98,7 @@ describe('PrismaPg', () => {
expect(query.mock.calls.map((c: any) => c[0].text)).toStrictEqual([
QUERIES.BEGIN,
QUERIES.SELECT,
`${QUERIES.DELETE} ${QUERIES.CONTEXT}`,
queryWithContext(QUERIES.DELETE),
QUERIES.COMMIT,
]);
})
Expand Down
17 changes: 11 additions & 6 deletions src/pg-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import {
StdClient,
TransactionClient,
EMPTY_RESULT,
isBemiContext,
contextToSqlComment,
sqlCommentToContext,
isContextComment,
isWriteQuery,
isBeginQuery,
isCommitQuery,
Expand Down Expand Up @@ -137,10 +139,13 @@ class PgQueryable<ClientT extends StdClient | TransactionClient> implements Quer

// Transaction queries
if (previousQueries) {
const isContext = isBemiContext(sql)
const isContext = isContextComment(sql)
const isWrite = isWriteQuery(sql)
const previousContext = previousQueries.find((q) => isBemiContext(q.sql))?.sql
text = previousContext && isWrite ? `${sql} ${previousContext}` : sql
const previousContextComment = previousQueries.find((q) => isContextComment(q.sql))?.sql

if (previousContextComment && isWrite) {
text = `${sql} ${contextToSqlComment({ SQL: sql, ...sqlCommentToContext(previousContextComment) })}`
}

if (!catchingUp) {
previousQueries.push(query)
Expand All @@ -152,7 +157,7 @@ class PgQueryable<ClientT extends StdClient | TransactionClient> implements Quer
if (isBeginQuery(sql) && previousQueries.length === 1) return EMPTY_RESULT

// Skip accumulated COMMIT
if (isCommitQuery(sql) && previousContext && previousQueries.length === 4) return EMPTY_RESULT
if (isCommitQuery(sql) && previousContextComment && previousQueries.length === 4) return EMPTY_RESULT

// Catch up and continue the entire transaction
if (
Expand All @@ -167,7 +172,7 @@ class PgQueryable<ClientT extends StdClient | TransactionClient> implements Quer
}

// Skip accumulated context
if (isBemiContext(sql)) return EMPTY_RESULT
if (isContextComment(sql)) return EMPTY_RESULT
}

// Log modified queries
Expand Down
10 changes: 9 additions & 1 deletion src/pg-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ export interface TransactionClient extends pg.PoolClient {

export const EMPTY_RESULT = { rowCount: null, fields: [], command: '', oid: 0, rows: [] } as pg.QueryResult

export const isBemiContext = (sql: string): boolean => {
export const contextToSqlComment = (context: any): string => {
return `/*Bemi ${JSON.stringify(context)} Bemi*/`
}

export const sqlCommentToContext = (sql: string): any => {
return JSON.parse(sql.replace('/*Bemi ', '').replace(' Bemi*/', ''))
}

export const isContextComment = (sql: string): boolean => {
return sql.startsWith('/*Bemi') && sql.endsWith('Bemi*/')
}

Expand Down

0 comments on commit 91d8c71

Please # to comment.