Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

New Approach to investigate Parallel Execution of SELECT Plan in REFRESH Command. #892

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/backend/commands/matview.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "optimizer/planner.h"
#include "parser/analyze.h"
#include "parser/parse_clause.h"
#include "parser/parse_func.h"
Expand Down Expand Up @@ -826,6 +827,14 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,

plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);

if (!refreshClause->skipData &&
enable_parallel &&
check_refresh_plan_parallel &&
plan_has_parallel(plan))
{
ereport(NOTICE, errmsg("Plan of REFRESH is parallel"));
Copy link
Member

@yjhjstz yjhjstz Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use client_min_messages to avoid introduce check_refresh_plan_parallel ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better then, but what should the log level be?
We don’t want to send this to the server logs, and displaying a message like 'Plan of REFRESH is parallel' to users by default is generally unnecessary and could disrupt their experience.
Additionally, it complicates the adjustment of many test cases.

Copy link
Member

@yjhjstz yjhjstz Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (client_min_messages == WARNING) then ereport ? then set once for this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like this?

		if(client_min_messages == WARNING)
			ereport(NOTICE, errmsg("Plan of REFRESH is parallel"));

very wired, and client_min_messages should be used only for client side, isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I found another way:
set debug_print_plan to on;
enable_parallel=on;

:numSlices 3 
   :slices[i].sliceIndex 0 
   :slices[i].parentIndex -1 
   :slices[i].gangType 4 
   :slices[i].numsegments 3 
   :slices[i].parallel_workers 0 
   :slices[i].segindex 0 
   :slices[i].directDispatch.isDirectDispatch false 
   :slices[i].directDispatch.contentIds <> 
   :slices[i].sliceIndex 1 
   :slices[i].parentIndex 0 
   :slices[i].gangType 2 
   :slices[i].numsegments 1 
   :slices[i].parallel_workers 0 
   :slices[i].segindex 2 
   :slices[i].directDispatch.isDirectDispatch false 
   :slices[i].directDispatch.contentIds <> 
   :slices[i].sliceIndex 2 
   :slices[i].parentIndex 1 
   :slices[i].gangType 3 
   :slices[i].numsegments 3 
   :slices[i].parallel_workers 2 
   :slices[i].segindex 0 
   :slices[i].directDispatch.isDirectDispatch false 
   :slices[i].directDispatch.contentIds <> 
   :rewindPlanIDs (b)

:slices[i].parallel_workers = 2, that's diff from parallel close case :slices[i].parallel_workers 0.

use udf parse output , no need to change code, no overhead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls refer to select wait_until_query_output_to_file('/tmp/bfv_cte.out');

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get it, could you provide codes to introduce your idea?
and:

How to print plan in REFRESH command,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to print plan in REFRESH command

@yjhjstz Could you show your steps?
Even with your settings like set debug_print_plan to on;, I couldn't see plan tree when exec 'refresh materialized view mv;'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,
step 1:

postgres=# create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1);
CREATE TABLE
postgres=#   insert into t_p select i, i+1 from generate_series(1, 10000000)i;
INSERT 0 10000000
postgres=#     create materialized view matv using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2);
CREATE MATERIALIZED VIEW

step2:

bash -c 'psql -X postgres -c "set optimizer=off; set enable_parallel=on;set client_min_messages to log; set debug_print_plan=on; refresh materialized view matv;" &> /tmp/refresh_plan.out'

step3:
cat /tmp/refresh_plan.out, found :slices[i].parallel_workers 2;

select wait_until_query_then_parse('/tmp/refresh_plan.out'); // TODO wait_until_query_then_parse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

}

plan->refreshClause = refreshClause;

/*
Expand Down
48 changes: 48 additions & 0 deletions src/backend/optimizer/plan/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -9007,3 +9007,51 @@ make_new_rollups_for_hash_grouping_set(PlannerInfo *root,

return srd;
}

typedef struct ParallelFinderContext
{
plan_tree_base_prefix base;
bool has_parallel;
} ParallelFinderContext;

/*
* Walker to find a motion node that matches a particular motionID
*/
static bool
ParallelFinderWalker(Node *node,
void *context)
{
Assert(context);
ParallelFinderContext *ctx = (ParallelFinderContext *) context;

if (node == NULL)
return false;

if (!is_plan_node(node))
return false;

Plan *plan = (Plan *) node;

if (plan->parallel > 1 || plan->parallel_aware)
{
ctx->has_parallel = true;
return true; /* found our node; no more visit */
}

/* Continue walking */
return plan_tree_walker((Node*)node, ParallelFinderWalker, ctx, true);
}

/*
* plan_has_parallel
* Does plan has slice who is parallel?
*/
bool plan_has_parallel(PlannedStmt *plannedstmt)
{
Plan *planTree = plannedstmt->planTree;
ParallelFinderContext ctx;
ctx.base.node = (Node*)plannedstmt;
ctx.has_parallel = false;
ParallelFinderWalker((Node *) planTree, &ctx);
return ctx.has_parallel;
}
14 changes: 14 additions & 0 deletions src/backend/utils/misc/guc_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ bool gp_allow_date_field_width_5digits = false;
/* Avoid do a real REFRESH materialized view if possibile. */
bool gp_enable_refresh_fast_path = true;

/* check if plan of REFRESH command is parallel. */
bool check_refresh_plan_parallel = false;

static const struct config_enum_entry gp_log_format_options[] = {
{"text", 0},
{"csv", 1},
Expand Down Expand Up @@ -3157,6 +3160,17 @@ struct config_bool ConfigureNamesBool_gp[] =
NULL, NULL, NULL
},

{
{"check_refresh_plan_parallel", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Check the SELECT part plan of a REFRESH command is parallel."),
NULL,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE
},
&check_refresh_plan_parallel,
false,
NULL, NULL, NULL
},

{
{"gp_enable_refresh_fast_path", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Avoid do a real REFRESH materialized view if possibile."),
Expand Down
2 changes: 2 additions & 0 deletions src/include/optimizer/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,6 @@ extern bool optimizer_init;

extern void preprocess_qual_conditions(PlannerInfo *root, Node *jtnode);

extern bool plan_has_parallel(PlannedStmt *plannedstmt);

#endif /* PLANNER_H */
2 changes: 2 additions & 0 deletions src/include/utils/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,8 @@ extern bool gp_enable_global_deadlock_detector;

extern bool gp_enable_refresh_fast_path;

extern bool check_refresh_plan_parallel;

extern bool gp_enable_predicate_pushdown;
extern int gp_predicate_pushdown_sample_rows;

Expand Down
1 change: 1 addition & 0 deletions src/include/utils/unsync_guc_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"bonjour",
"bonjour_name",
"check_function_bodies",
"check_refresh_plan_parallel",
"checkpoint_completion_target",
"checkpoint_flush_after",
"checkpoint_timeout",
Expand Down
31 changes: 17 additions & 14 deletions src/test/regress/expected/cbdb_parallel.out
Original file line number Diff line number Diff line change
Expand Up @@ -2612,21 +2612,24 @@ end
$$ language plpgsql;
begin;
set local max_parallel_workers_per_gather = 8;
select * from refresh_compare(true, false);
parallel_is_better
--------------------
t
(1 row)

select * from refresh_compare(false, false);
parallel_is_better
--------------------
t
(1 row)

--select * from refresh_compare(true, false);
--select * from refresh_compare(false, false);
-- check plan of REFRESH command is parallel
create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1);
insert into t_p select i, i+1 from generate_series(1, 10000)i;
analyze t_p;
create materialized view matv_ao using ao_row as
select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2);
create materialized view matv_aoco using ao_column as
select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2);
set local enable_parallel = on;
set local check_refresh_plan_parallel = on;
refresh materialized view matv_ao;
NOTICE: Plan of REFRESH is parallel
refresh materialized view matv_aoco;
NOTICE: Plan of REFRESH is parallel
abort;
drop function refresh_compare;
reset max_parallel_workers_per_gather;
end;
--
-- Parallel Create AO/AOCO Table AS
--
Expand Down
19 changes: 15 additions & 4 deletions src/test/regress/sql/cbdb_parallel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -867,11 +867,22 @@ end
$$ language plpgsql;
begin;
set local max_parallel_workers_per_gather = 8;
select * from refresh_compare(true, false);
select * from refresh_compare(false, false);
--select * from refresh_compare(true, false);
--select * from refresh_compare(false, false);
-- check plan of REFRESH command is parallel
create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1);
insert into t_p select i, i+1 from generate_series(1, 10000)i;
analyze t_p;
create materialized view matv_ao using ao_row as
select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2);
create materialized view matv_aoco using ao_column as
select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2);
set local enable_parallel = on;
set local check_refresh_plan_parallel = on;
refresh materialized view matv_ao;
refresh materialized view matv_aoco;
abort;
drop function refresh_compare;
reset max_parallel_workers_per_gather;
end;

--
-- Parallel Create AO/AOCO Table AS
Expand Down
Loading