Skip to content

Commit

Permalink
pg,abci: unblock notice subscribers on DB shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed Sep 27, 2024
1 parent efe878f commit cd9a85f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 17 deletions.
13 changes: 8 additions & 5 deletions common/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,13 @@ type AccessModer interface {
// When subscribed, the passed channel will receive notifications.
// Only one subscription is allowed per transaction.
type Subscriber interface {
// Subscribe subscribes to notifications passed using the special
// `notice()` function. Only `notice()` calls made after the subscription,
// on this tx, will be received. It returns a done function that should be
// called when the subscription is no longer needed. It is the callers
// responsibility to call done and close the channel.
// Subscribe subscribes to notifications passed using the special `notice()`
// function. Only `notice()` calls made after the subscription, on this tx,
// will be received. A done function is returned that should be called to
// signal that no more statements that emit notices will be executed. The
// channel will only be closed after all notices have been sent AND the done
// is called, or if the database shuts down prematurely. In case of an
// unexpected DB shutdown, an empty string is sent on the channel before it
// is closed. It is the caller's responsibility to call done.
Subscribe(ctx context.Context) (ch <-chan string, done func(context.Context) error, err error)
}
37 changes: 25 additions & 12 deletions internal/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,21 +606,28 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal
defer done(ctx)

// wait group to wait at the end of the function for all logs to be received
var wg sync.WaitGroup
wg.Add(1)
logsDone := make(chan error, 1)
go func() {
defer close(logsDone)

// we enforce that the cumulative size of logs is less than 1KB
// per tx. This is a work-around until we have gas costs to protect
// against log spam.
for {
log, ok := <-logs
if !ok {
break
if !ok { // empty and closed (normal completion)
return // logsDone receiver gets nil error
}
if log == "" {
// The DB has shut down with active subscribers. Fail.
logsDone <- errors.New("premature notice stream termination")
return
}
txid, notice, err := parse.ParseNotice(log)
if err != nil {
// should be deterministic so nbd to not halt here
a.log.Errorf("failed to parse notice: %v", err)
// will still be deterministic so nbd to not halt here
a.log.Errorf("failed to parse notice (%.20s...): %v", log, err)
continue // since txid is invalid and won't match any result.TxHash
}

currentLog, ok := logMap[txid]
Expand All @@ -641,10 +648,6 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal
currentLog.logs += "\n" + notice
}
}

// logs channel will be closed when the tx is precommitted,
// so finish the wait group
wg.Done()
}()

for i, tx := range req.Txs {
Expand Down Expand Up @@ -859,8 +862,18 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal
}
}

// wait for all logs to be received
wg.Wait()
// wait for all logs to be received, or a premature shutdown
select {
case <-ctx.Done():
// NOTE: this will not happen until cometbft v1.0 since in v0.38
// FinalizeBlock is still called with context.TODO().
return nil, ctx.Err()
case err := <-logsDone:
if err != nil {
return nil, fmt.Errorf("DB failure: %w", err)
} // else we got all the notice
}


for _, result := range resultArr {
logs, ok := logMap[hex.EncodeToString(result.TxHash)]
Expand Down
14 changes: 14 additions & 0 deletions internal/sql/pg/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,20 @@ func NewDB(ctx context.Context, cfg *DBConfig) (*DB, error) {

db.pool.Close()

// If the DB has shut down, there will be no more notices, which may be
// needed to unblock a receiver e.g. FinalizeBlock, so we close the
// channels to unblock them and allow the application to return.
db.pool.subscribers.Exclusive(func(m map[int64]chan<- string) {
for txid, sub := range m {
// We won't be sending any more message with the 'pgtx:' prefix,
// so we send an empty string PRIOR to closing the channel to
// signal premature completion.
sub <- ""
close(sub)
delete(m, txid)
}
})

// Potentially we can have a newReplMon restart loop here instead of
// shutdown. However, this proved complex and unlikely to succeed.
}()
Expand Down

0 comments on commit cd9a85f

Please # to comment.