From 08ceac635832a128978a82c8d80cfd102896dd7b Mon Sep 17 00:00:00 2001 From: Misha Savelyev Date: Fri, 10 Nov 2023 12:23:55 +0000 Subject: [PATCH] Check running queries to decide whether we should refresh a view --- .../jobs/refreshMaterializedViewsForCube.ts | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/backend/src/bin/jobs/refreshMaterializedViewsForCube.ts b/backend/src/bin/jobs/refreshMaterializedViewsForCube.ts index 55410ad2e6..b09f119522 100644 --- a/backend/src/bin/jobs/refreshMaterializedViewsForCube.ts +++ b/backend/src/bin/jobs/refreshMaterializedViewsForCube.ts @@ -1,22 +1,16 @@ import { Logger, logExecutionTimeV2 } from '@crowd/logging' +import { QueryTypes } from 'sequelize' import { CrowdJob } from '../../types/jobTypes' import { databaseInit } from '../../database/databaseConnection' -let processingRefreshCubeMVs = false +function createRefreshQuery(view: string) { + return `REFRESH MATERIALIZED VIEW CONCURRENTLY "${view}"` +} const job: CrowdJob = { name: 'Refresh Materialized View For Cube', cronTime: '1,31 * * * *', onTrigger: async (log: Logger) => { - if (!processingRefreshCubeMVs) { - processingRefreshCubeMVs = true - } else { - log.warn( - "Materialized views will not be refreshed because there's already an ongoing refresh!", - ) - return - } - try { // initialize database with 15 minutes query timeout const forceNewDbInstance = true @@ -30,9 +24,36 @@ const job: CrowdJob = { ] for (const view of materializedViews) { + const refreshQuery = createRefreshQuery(view) + const runningQuery = await database.sequelize.query( + ` + SELECT 1 + FROM pg_stat_activity + WHERE query = :refreshQuery + AND state != 'idle' + AND pid != pg_backend_pid() + `, + { + replacements: { + refreshQuery, + }, + type: QueryTypes.SELECT, + useMaster: true, + }, + ) + + if (runningQuery.length > 0) { + log.warn( + `Materialized views for cube will not be refreshed because there's already an ongoing refresh of ${view}!`, + ) + return + } + } + for (const view of materializedViews) { + const refreshQuery = createRefreshQuery(view) await logExecutionTimeV2( () => - database.sequelize.query(`REFRESH MATERIALIZED VIEW CONCURRENTLY "${view}"`, { + database.sequelize.query(refreshQuery, { useMaster: true, }), log, @@ -41,8 +62,6 @@ const job: CrowdJob = { } } catch (e) { log.error({ error: e }, `Error while refreshing materialized views!`) - } finally { - processingRefreshCubeMVs = false } }, }