Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Check running queries to decide whether we should refresh a view #1837

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions backend/src/bin/jobs/refreshMaterializedViewsForCube.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
import { Logger, logExecutionTimeV2 } from '@crowd/logging'
import { QueryTypes } from 'sequelize'
import { CrowdJob } from '../../types/jobTypes'
import { databaseInit } from '../../database/databaseConnection'

let processingRefreshCubeMVs = false
function createRefreshQuery(view: string) {
return `REFRESH MATERIALIZED VIEW CONCURRENTLY "${view}"`
}

const job: CrowdJob = {
name: 'Refresh Materialized View For Cube',
cronTime: '1,31 * * * *',
onTrigger: async (log: Logger) => {
if (!processingRefreshCubeMVs) {
processingRefreshCubeMVs = true
} else {
log.warn(
"Materialized views will not be refreshed because there's already an ongoing refresh!",
)
return
}

try {
// initialize database with 15 minutes query timeout
const forceNewDbInstance = true
Expand All @@ -30,9 +24,36 @@ const job: CrowdJob = {
]

for (const view of materializedViews) {
const refreshQuery = createRefreshQuery(view)
const runningQuery = await database.sequelize.query(
`
SELECT 1
FROM pg_stat_activity
WHERE query = :refreshQuery
AND state != 'idle'
AND pid != pg_backend_pid()
`,
{
replacements: {
refreshQuery,
},
type: QueryTypes.SELECT,
useMaster: true,
},
)

if (runningQuery.length > 0) {
log.warn(
`Materialized views for cube will not be refreshed because there's already an ongoing refresh of ${view}!`,
)
return
}
}
for (const view of materializedViews) {
const refreshQuery = createRefreshQuery(view)
await logExecutionTimeV2(
() =>
database.sequelize.query(`REFRESH MATERIALIZED VIEW CONCURRENTLY "${view}"`, {
database.sequelize.query(refreshQuery, {
useMaster: true,
}),
log,
Expand All @@ -41,8 +62,6 @@ const job: CrowdJob = {
}
} catch (e) {
log.error({ error: e }, `Error while refreshing materialized views!`)
} finally {
processingRefreshCubeMVs = false
}
},
}
Expand Down
Loading