From 39bf685b122782ea549b02c9521a404b928ea18f Mon Sep 17 00:00:00 2001 From: Zhang Mingli Date: Fri, 24 Jan 2025 15:36:17 +0800 Subject: [PATCH] Investigate Parallel Execution of SELECT Plan in REFRESH Command. This commit introduces a new approach to determine if the SELECT part of the REFRESH command is executed in parallel within Cloudberry. Since REFRESH is a utility command, it cannot be analyzed using EXPLAIN. Previously, I developed the `refresh_compare` function in `cbdb_parallel.sql`, which relied on actual execution times, with the expectation that parallel execution would consistently outperform non-parallel execution. However, since our CI transitioned to Apache, resource limitations have led to frequent failures in parallel refresh tests, making it challenging to obtain reliable results. The new method focuses on assessing whether the REFRESH plan is parallel, specifically for AO/AOCS materialized views. I have retained the original test cases to allow other databases built on Cloudberry to evaluate actual execution times, as the new approach only identifies parallel execution without measuring performance. Authored-by: Zhang Mingli avamingli@gmail.com --- src/backend/commands/matview.c | 9 ++++ src/backend/optimizer/plan/planner.c | 48 +++++++++++++++++++++ src/backend/utils/misc/guc_gp.c | 14 ++++++ src/include/optimizer/planner.h | 2 + src/include/utils/guc.h | 2 + src/include/utils/unsync_guc_name.h | 1 + src/test/regress/expected/cbdb_parallel.out | 31 +++++++------ src/test/regress/sql/cbdb_parallel.sql | 19 ++++++-- 8 files changed, 108 insertions(+), 18 deletions(-) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 0d3e9f407dc..df0246b3349 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -50,6 +50,7 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "optimizer/optimizer.h" +#include "optimizer/planner.h" #include "parser/analyze.h" #include "parser/parse_clause.h" #include "parser/parse_func.h" @@ -826,6 +827,14 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL); + if (!refreshClause->skipData && + enable_parallel && + check_refresh_plan_parallel && + plan_has_parallel(plan)) + { + ereport(NOTICE, errmsg("Plan of REFRESH is parallel")); + } + plan->refreshClause = refreshClause; /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 2a175b7c3b9..82d6611e761 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -9007,3 +9007,51 @@ make_new_rollups_for_hash_grouping_set(PlannerInfo *root, return srd; } + +typedef struct ParallelFinderContext +{ + plan_tree_base_prefix base; + bool has_parallel; +} ParallelFinderContext; + +/* + * Walker to find a motion node that matches a particular motionID + */ +static bool +ParallelFinderWalker(Node *node, + void *context) +{ + Assert(context); + ParallelFinderContext *ctx = (ParallelFinderContext *) context; + + if (node == NULL) + return false; + + if (!is_plan_node(node)) + return false; + + Plan *plan = (Plan *) node; + + if (plan->parallel > 1 || plan->parallel_aware) + { + ctx->has_parallel = true; + return true; /* found our node; no more visit */ + } + + /* Continue walking */ + return plan_tree_walker((Node*)node, ParallelFinderWalker, ctx, true); +} + +/* + * plan_has_parallel + * Does plan has slice who is parallel? + */ +bool plan_has_parallel(PlannedStmt *plannedstmt) +{ + Plan *planTree = plannedstmt->planTree; + ParallelFinderContext ctx; + ctx.base.node = (Node*)plannedstmt; + ctx.has_parallel = false; + ParallelFinderWalker((Node *) planTree, &ctx); + return ctx.has_parallel; +} diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 3501dbb4daf..150e52fe8f7 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -463,6 +463,9 @@ bool gp_allow_date_field_width_5digits = false; /* Avoid do a real REFRESH materialized view if possibile. */ bool gp_enable_refresh_fast_path = true; +/* check if plan of REFRESH command is parallel. */ +bool check_refresh_plan_parallel = false; + static const struct config_enum_entry gp_log_format_options[] = { {"text", 0}, {"csv", 1}, @@ -3157,6 +3160,17 @@ struct config_bool ConfigureNamesBool_gp[] = NULL, NULL, NULL }, + { + {"check_refresh_plan_parallel", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Check the SELECT part plan of a REFRESH command is parallel."), + NULL, + GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE + }, + &check_refresh_plan_parallel, + false, + NULL, NULL, NULL + }, + { {"gp_enable_refresh_fast_path", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Avoid do a real REFRESH materialized view if possibile."), diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index 29aac021f38..813307dc6a5 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -64,4 +64,6 @@ extern bool optimizer_init; extern void preprocess_qual_conditions(PlannerInfo *root, Node *jtnode); +extern bool plan_has_parallel(PlannedStmt *plannedstmt); + #endif /* PLANNER_H */ diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 79fd59b866d..01a4a0dfbd3 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -630,6 +630,8 @@ extern bool gp_enable_global_deadlock_detector; extern bool gp_enable_refresh_fast_path; +extern bool check_refresh_plan_parallel; + extern bool gp_enable_predicate_pushdown; extern int gp_predicate_pushdown_sample_rows; diff --git a/src/include/utils/unsync_guc_name.h b/src/include/utils/unsync_guc_name.h index 4c7226e33cb..9f66c877a06 100644 --- a/src/include/utils/unsync_guc_name.h +++ b/src/include/utils/unsync_guc_name.h @@ -40,6 +40,7 @@ "bonjour", "bonjour_name", "check_function_bodies", + "check_refresh_plan_parallel", "checkpoint_completion_target", "checkpoint_flush_after", "checkpoint_timeout", diff --git a/src/test/regress/expected/cbdb_parallel.out b/src/test/regress/expected/cbdb_parallel.out index 648259d3798..40ff1c5ddd3 100644 --- a/src/test/regress/expected/cbdb_parallel.out +++ b/src/test/regress/expected/cbdb_parallel.out @@ -2612,21 +2612,24 @@ end $$ language plpgsql; begin; set local max_parallel_workers_per_gather = 8; -select * from refresh_compare(true, false); - parallel_is_better --------------------- - t -(1 row) - -select * from refresh_compare(false, false); - parallel_is_better --------------------- - t -(1 row) - +--select * from refresh_compare(true, false); +--select * from refresh_compare(false, false); +-- check plan of REFRESH command is parallel +create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1); +insert into t_p select i, i+1 from generate_series(1, 10000)i; +analyze t_p; +create materialized view matv_ao using ao_row as + select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); +create materialized view matv_aoco using ao_column as + select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); +set local enable_parallel = on; +set local check_refresh_plan_parallel = on; +refresh materialized view matv_ao; +NOTICE: Plan of REFRESH is parallel +refresh materialized view matv_aoco; +NOTICE: Plan of REFRESH is parallel +abort; drop function refresh_compare; -reset max_parallel_workers_per_gather; -end; -- -- Parallel Create AO/AOCO Table AS -- diff --git a/src/test/regress/sql/cbdb_parallel.sql b/src/test/regress/sql/cbdb_parallel.sql index 351422d82c3..7587a00dd79 100644 --- a/src/test/regress/sql/cbdb_parallel.sql +++ b/src/test/regress/sql/cbdb_parallel.sql @@ -867,11 +867,22 @@ end $$ language plpgsql; begin; set local max_parallel_workers_per_gather = 8; -select * from refresh_compare(true, false); -select * from refresh_compare(false, false); +--select * from refresh_compare(true, false); +--select * from refresh_compare(false, false); +-- check plan of REFRESH command is parallel +create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1); +insert into t_p select i, i+1 from generate_series(1, 10000)i; +analyze t_p; +create materialized view matv_ao using ao_row as + select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); +create materialized view matv_aoco using ao_column as + select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); +set local enable_parallel = on; +set local check_refresh_plan_parallel = on; +refresh materialized view matv_ao; +refresh materialized view matv_aoco; +abort; drop function refresh_compare; -reset max_parallel_workers_per_gather; -end; -- -- Parallel Create AO/AOCO Table AS