Skip to content

Commit

Permalink
pg_resgroup_move_query() improvements. (#15411)
Browse files Browse the repository at this point in the history
The pg_resgroup_move_query() function implemented in 51ee26b has
several disadvantages:
1) Slot leaking. The slot acquired in initiator process doesn't free if
target process isn't alive, didn't receive a signal, or even received,
but at the time it was in idle state.
2) Race condition between UnassignResGroup() called at the end of
transaction (or honestly any other code) and handler called immediately
after USR1 signal.
3) Not working entrydb process moving. Previously, the USR1 signal was
sent to only one process, target or entrydb. Entrydb moving was never
tested properly.

Improvements added to solve the first problem:
1) Feedback from target process to initiator. Target process can set
initiator's latch to quickly interrupt pg_resgroup_move_query() from
awaiting.
2) Existed movetoResSlot parameter acts like a mark. Initiator sets it
and waits on latch. If movetoResSlot become NULL, slot control is on
the target process side.
3) Initiator PID. Used by target process to get initiator's latch.
4) Mutex. To guard all critical moveto* parameters from parallel
changes.

To solve the second problem, there was an attempt to use Postgres
interruptions. I was unhappy to know GPDB use raw InterruptPending
value to do some pre-cancellation of dispatched queries. GPDB thinks
InterruptPending can be triggered only by "negative" events, which
leads to cancelation. The temporary solution with InterruptPending-like
"positive" flag showed that we may wait for next CHECK_FOR_INTERRUPTS()
call for a long. For example, some fat insert may do group moving only
at the end of dispatching, which makes no sense.
Thus, I decided to decrease the probability of races by using
additional IsTransactionState() check.
IMPORTANT NOTICE. The existed solution still affected by race
conditions. The current handler's implementation is an example of bad
design and should be reworked.

To solve the third problem, now we send signals to target and entrydb
processes separately. We do UnassignResGroup() for one process only,
not cleaning all counters for still working entrydb process inside
handler. More, entrydb moving logic is now separated from segments
moving, and so, it became more readable.

New GUC (gp_resource_group_move_timeout) was added to limit a time
we're waiting for feedback from target.

New regression tests shows there is no slot leaking with some rare and
now correctly resolved cases.

* Move faulty assertions so they not shoot in case process already moving.

* Fix entrydb process wrong slot resetting on moving (apache#386)

Entrydb and main processes may share same resgroup slot or, in other words, they
have same session slot. When moving process to new group, we free old slot only
when last (in our case entrydb) process moved to new group. At the same time we
zeroing current session slot. The `sessionResetSlot()` called from
`UnassignResGroup()` didn't check which slot we zeroing. This caused a bug, when
we was unable to move same session twice (with the error like `ERROR: process
XXX is in IDLE state`). The reason - new session slot set in main process, was
zeroed by entrydb process. From now, we analyze current session slot before
zeroing.

---------

Co-authored-by: Alexey Gordeev <goa@arenadata.io>
  • Loading branch information
2 people authored and jiaqizho committed Jul 22, 2024
1 parent b5b1db1 commit 6953d9a
Show file tree
Hide file tree
Showing 21 changed files with 1,316 additions and 118 deletions.
11 changes: 11 additions & 0 deletions concourse/scripts/ic_gpdb_resgroup.bash
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ make_cgroups_dir() {
EOF
}

install_python_dependency() {
local host_alias=$1

ssh $host_alias bash -ex <<EOF
pip3.9 install paramiko==3.1.0
EOF
}

run_resgroup_test() {
local gpdb_master_alias=$1

Expand Down Expand Up @@ -142,6 +150,9 @@ mount_cgroups ccp-${CLUSTER_NAME}-0
mount_cgroups ccp-${CLUSTER_NAME}-1
make_cgroups_dir ccp-${CLUSTER_NAME}-0
make_cgroups_dir ccp-${CLUSTER_NAME}-1
install_python_dependency ccp-${CLUSTER_NAME}-0
install_python_dependency ccp-${CLUSTER_NAME}-1
install_python_dependency mdw
run_resgroup_test mdw

#
Expand Down
11 changes: 11 additions & 0 deletions concourse/scripts/ic_gpdb_resgroup_v2.bash
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ enable_cgroup_subtree_control() {
EOF
}

install_python_dependency() {
local host_alias=$1

ssh $host_alias bash -ex <<EOF
pip3.9 install paramiko==3.1.0
EOF
}

run_resgroup_test() {
local gpdb_master_alias=$1

Expand Down Expand Up @@ -102,6 +110,9 @@ EOF

enable_cgroup_subtree_control ccp-${CLUSTER_NAME}-0
enable_cgroup_subtree_control ccp-${CLUSTER_NAME}-1
install_python_dependency ccp-${CLUSTER_NAME}-0
install_python_dependency ccp-${CLUSTER_NAME}-1
install_python_dependency cdw
run_resgroup_test cdw

#
Expand Down
1 change: 1 addition & 0 deletions gpMgmt/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ google_apitools<=0.5.30
httplib2<=0.16.0
google_reauth<=0.1.0
mock<=2.0.0
paramiko==3.1.0
monotonic<=1.5
oauth2client<=4.1.3
pyOpenSSL<=19.1.0
Expand Down
1 change: 1 addition & 0 deletions python-dependencies.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
psutil==5.7.0
pygresql==5.2
pyyaml==5.3.1
paramiko==3.1.0
8 changes: 4 additions & 4 deletions src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -3097,7 +3097,7 @@ CommitTransaction(void)

/* Release resource group slot at the end of a transaction */
if (ShouldUnassignResGroup())
UnassignResGroup(false);
UnassignResGroup();
}

/*
Expand Down Expand Up @@ -3432,7 +3432,7 @@ PrepareTransaction(void)

/* Release resource group slot at the end of prepare transaction on segment */
if (ShouldUnassignResGroup())
UnassignResGroup(false);
UnassignResGroup();
}


Expand Down Expand Up @@ -3700,7 +3700,7 @@ AbortTransaction(void)

/* Release resource group slot at the end of a transaction */
if (ShouldUnassignResGroup())
UnassignResGroup(false);
UnassignResGroup();
}

/*
Expand Down Expand Up @@ -3756,7 +3756,7 @@ CleanupTransaction(void)

/* Release resource group slot at the end of a transaction */
if (ShouldUnassignResGroup())
UnassignResGroup(false);
UnassignResGroup();
}

/*
Expand Down
203 changes: 189 additions & 14 deletions src/backend/storage/ipc/procarray.c
Original file line number Diff line number Diff line change
Expand Up @@ -6357,39 +6357,214 @@ GetSessionIdByPid(int pid)
/*
* Set the destination group slot or group id in PGPROC, and send a signal to the proc.
* slot is NULL on QE.
* The process we want to notify on coordinator can act as executor(GP_ROLE_EXECUTE) in case of
* entrydb. 'isExecutor' helps us to determine a process to which we need to send signal.
*/
void
ResGroupSignalMoveQuery(int sessionId, void *slot, Oid groupId)
bool
ResGroupMoveSignalTarget(int sessionId, void *slot, Oid groupId,
bool isExecutor)
{
pid_t pid;
BackendId backendId;
pid_t pid;
BackendId backendId;
ProcArrayStruct *arrayP = procArray;
bool sent = false;
bool found = false;

Assert(groupId != InvalidOid);
Assert(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE);
AssertImply(Gp_role == GP_ROLE_EXECUTE, isExecutor);

LWLockAcquire(ProcArrayLock, LW_SHARED);
for (int i = 0; i < arrayP->numProcs; i++)
{
volatile PGPROC *proc = &allProcs[arrayP->pgprocnos[i]];
PGPROC *proc = &allProcs[arrayP->pgprocnos[i]];

if (proc->mppSessionId != sessionId)
continue;

/*
* Before, we didn't distinguish entrydb processes from main target
* process on coordinator. There was a case with entrydb executors
* when we can send a signal to target process only, but not to
* entrydb executor process or vice versa. As a mediocre solution we
* assume mppIsWriter for entrydb processes is always false.
*
* We can send a signal to target or entrydb processes only from QD.
* The second (XOR) part of condition checks did we find entrydb
* (isExecutor && !mppIsWriter) or target (!isExecutor &&
* mppIsWriter). If neither, we continue the search.
*/
if (Gp_role == GP_ROLE_DISPATCH && !(isExecutor ^ proc->mppIsWriter))
continue;

found = true;
pid = proc->pid;
backendId = proc->backendId;
if (Gp_role == GP_ROLE_DISPATCH)

SpinLockAcquire(&proc->movetoMutex);
/* only target process needs slot and callerPid to operate */
if (Gp_role == GP_ROLE_DISPATCH && proc->mppIsWriter)
{
/*
* movetoCallerPid is a guard which marks there is currently
* active initiator process
*/
if (proc->movetoCallerPid != InvalidPid)
{
SpinLockRelease(&proc->movetoMutex);
elog(NOTICE, "cannot move process, which is already moving");
break;
}
Assert(proc->movetoCallerPid == InvalidPid);
Assert(proc->movetoResSlot == NULL);
Assert(slot != NULL);

proc->movetoResSlot = slot;
SendProcSignal(pid, PROCSIG_RESOURCE_GROUP_MOVE_QUERY, backendId);
break;
proc->movetoCallerPid = MyProc->pid;
}
else if (Gp_role == GP_ROLE_EXECUTE)
proc->movetoGroupId = groupId;
SpinLockRelease(&proc->movetoMutex);

if (SendProcSignal(pid, PROCSIG_RESOURCE_GROUP_MOVE_QUERY, backendId))
{
Assert(groupId != InvalidOid);
Assert(proc->movetoGroupId == InvalidOid);
proc->movetoGroupId = groupId;
SendProcSignal(pid, PROCSIG_RESOURCE_GROUP_MOVE_QUERY, backendId);
/* don't break, need to signal all the procs of this session */
SpinLockAcquire(&proc->movetoMutex);
if (Gp_role == GP_ROLE_DISPATCH && proc->mppIsWriter)
{
proc->movetoResSlot = NULL;
proc->movetoCallerPid = InvalidPid;
}
proc->movetoGroupId = InvalidOid;
SpinLockRelease(&proc->movetoMutex);

/*
* It's not an error, if we can't notify, for example, already
* finished QE process (because of async nature of resgroup
* moving). If we can't notify QD, the caller should raise an
* error by itself, based on returned value.
*/
elog(NOTICE, "cannot send signal to backend %d with PID %d",
backendId, pid);
}
else
sent = true;

/*
* Don't break for executors, need to signal all the procs of this
* session. It's safe to break if we are QD, because we want to notify
* only one process at once - main target or entrydb.
*/
if (Gp_role == GP_ROLE_DISPATCH)
break;
}
LWLockRelease(ProcArrayLock);

if (!found && !isExecutor)
elog(NOTICE, "cannot find target process");

return sent;
}

/*
* Check if slot control is on the target side and clean all target's
* moveto* params.
*
* Cleaning and checking should be performed as one atomic operation inside one
* mutex.
* 'clean' flag is bidirectional. If 'clean' is set to true, then all moveto*
* params will be cleaned, no matter was target handled them or not.
* More, it will be forcefully set to true, if target process handled our
* command. Thus, if function returned true in 'clean', it should be treated
* as terminal state and all new calls to ResGroupMoveCheckTargetReady()
* before calling ResGroupMoveSignalTarget() make no sense.
*/
void
ResGroupMoveCheckTargetReady(int sessionId, bool *clean, bool *result)
{
pid_t pid;
BackendId backendId;
ProcArrayStruct *arrayP = procArray;

Assert(Gp_role == GP_ROLE_DISPATCH);

*result = false;

LWLockAcquire(ProcArrayLock, LW_SHARED);
for (int i = 0; i < arrayP->numProcs; i++)
{
PGPROC *proc = &allProcs[arrayP->pgprocnos[i]];

/*
* Also ignore entrydb processes. We use mppIsWriter which described
* in ResGroupMoveSignalTarget().
*/
if (proc->mppSessionId != sessionId || !proc->mppIsWriter)
continue;

pid = proc->pid;
backendId = proc->backendId;

SpinLockAcquire(&proc->movetoMutex);
/* If proc->movetoCallerPid not equals to MyProc->pid, the target
* process could is handling signal from another caller.After we
* get the movetoMutex check it again.
*/
if (proc->movetoCallerPid == MyProc->pid)
{
/*
* InvalidOid of movetoGroupId means target process tried to
* handle our command
*/
if (proc->movetoGroupId == InvalidOid)
{
/*
* empty movetoResSlot means target process got all the
* control over slot
*/
*result = (proc->movetoResSlot == NULL);
*clean = true;
}

/*
* Clean all params, especially movetoCallerPid, which guards
* target processes from another initiators. After releasing
* spinlock any other process allowed to start new move command.
*/
if (*clean)
{
proc->movetoResSlot = NULL;
proc->movetoGroupId = InvalidOid;
proc->movetoCallerPid = InvalidPid;
}
}
SpinLockRelease(&proc->movetoMutex);
break;
}
LWLockRelease(ProcArrayLock);
}

/*
* Notify initiator process that target process is ready to move to a new
* group. This is an optional feature to speed up initiator's awakening.
* Inititator will get the actual command result by changed movetoResSlot
* and movetoGroupId values.
*/
void
ResGroupMoveNotifyInitiator(pid_t callerPid)
{
ProcArrayStruct *arrayP = procArray;

Assert(Gp_role == GP_ROLE_DISPATCH);

LWLockAcquire(ProcArrayLock, LW_SHARED);
for (int i = 0; i < arrayP->numProcs; i++)
{
PGPROC *proc = &allProcs[arrayP->pgprocnos[i]];

if (proc->pid != callerPid)
continue;

SetLatch(&proc->procLatch);
break;
}
LWLockRelease(ProcArrayLock);
}
Expand Down
2 changes: 2 additions & 0 deletions src/backend/storage/lmgr/proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,10 @@ InitProcess(void)
MyProc->waitProcLock = NULL;
pg_atomic_write_u64(&MyProc->waitStart, 0);
MyProc->resSlot = NULL;
SpinLockInit(&MyProc->movetoMutex);
MyProc->movetoResSlot = NULL;
MyProc->movetoGroupId = InvalidOid;
MyProc->movetoCallerPid = InvalidPid;

/*
* mppLocalProcessSerial uniquely identifies this backend process among
Expand Down
10 changes: 10 additions & 0 deletions src/backend/utils/misc/guc_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -4013,6 +4013,16 @@ struct config_int ConfigureNamesInt_gp[] =
0, 0, INT_MAX,
NULL, NULL, NULL
},
{
{"gp_resource_group_move_timeout", PGC_USERSET, RESOURCES_MGM,
gettext_noop("Wait up to the specified time (in ms) while moving process to another resource group (after queuing on it) before give up."),
NULL,
GUC_UNIT_MS
},
&gp_resource_group_move_timeout,
30000, 10, INT_MAX,
NULL, NULL, NULL
},
{
{"gp_blockdirectory_entry_min_range", PGC_USERSET, GP_ARRAY_TUNING,
gettext_noop("Minimal range in bytes one block directory entry covers."),
Expand Down
Loading

0 comments on commit 6953d9a

Please # to comment.