Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

SQL: show local node processlist #15455

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package sql
import (
"reflect"
"sort"
"strings"
"time"
"unsafe"

"github.com/pkg/errors"
"golang.org/x/net/context"
Expand All @@ -28,6 +30,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

var crdbInternal = virtualSchema{
Expand All @@ -37,11 +41,44 @@ var crdbInternal = virtualSchema{
crdbInternalTablesTable,
crdbInternalLeasesTable,
crdbInternalSchemaChangesTable,
crdbInternalSessionsTable,
crdbInternalTxnsTable,
crdbInternalStmtStatsTable,
crdbInternalJobsTable,
},
}

type sessionRegistry struct {
syncutil.Mutex
store map[*Session]struct{}
}

func makeSessionRegistry() sessionRegistry {
return sessionRegistry{store: make(map[*Session]struct{})}
}

func (r *sessionRegistry) register(s *Session) {
r.Lock()
r.store[s] = struct{}{}
r.Unlock()
}

func (r *sessionRegistry) deregister(s *Session) {
r.Lock()
delete(r.store, s)
r.Unlock()
}

type queryInfo struct {
client string
user string
database string
defaultIsolationLevel enginepb.IsolationType
location *time.Location
searchPath parser.SearchPath
stmts map[planNode]string
}

var crdbInternalBuildInfoTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.node_build_info (
Expand Down Expand Up @@ -212,6 +249,157 @@ CREATE TABLE crdb_internal.schema_changes (
},
}

var crdbInternalSessionsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.sessions (
NODE_ID INT NOT NULL,
SESSION_ID INT NOT NULL,
CLIENT STRING NOT NULL,
"USER" STRING NOT NULL,
STATEMENT STRING NOT NULL,
TXN_STATE STRING NOT NULL,
TXN_ID INT,
AUTO_RETRY BOOL NOT NULL,
SQL_TIMESTAMP TIMESTAMP NOT NULL,
DEFAULT_DATABASE STRING NOT NULL,
DEFAULT_ISOLATION_LEVEL STRING NOT NULL,
TIME_ZONE STRING NOT NULL,
SEARCH_PATH STRING NOT NULL
);
`,
populate: func(_ context.Context, p *planner, addRow func(...parser.Datum) error) error {
if p.session.sessionRegistry == nil {
return nil
}
registry := p.session.sessionRegistry
registry.Lock()
defer registry.Unlock()
leaseMgr := p.LeaseMgr()
nodeID := parser.NewDInt(parser.DInt(int64(leaseMgr.nodeID.Get())))
for s := range registry.store {
s.Lock()
info := queryInfo{
client: s.Client,
user: s.User,
database: s.Database,
defaultIsolationLevel: s.DefaultIsolationLevel,
location: s.Location,
searchPath: s.SearchPath,
stmts: s.QueryStatements,
}
s.Unlock()
if info.user != p.session.User && p.session.User != security.RootUser {
continue
}

txnDatum := parser.DNull
txn := s.TxnState.txn
if txn != nil {
txnDatum = parser.NewDInt(parser.DInt(int64(uintptr(unsafe.Pointer(txn)))))
}
for _, stmt := range info.stmts {
if err := addRow(
nodeID,
parser.NewDInt(parser.DInt(int64(uintptr(unsafe.Pointer(s))))),
parser.NewDString(info.client),
parser.NewDString(info.user),
parser.NewDString(stmt),
parser.NewDString(s.TxnState.State.String()),
txnDatum,
parser.MakeDBool(parser.DBool(s.TxnState.autoRetry)),
parser.MakeDTimestamp(s.TxnState.sqlTimestamp, time.Microsecond),
parser.NewDString(info.database),
parser.NewDString(info.defaultIsolationLevel.String()),
parser.NewDString(info.location.String()),
parser.NewDString(strings.Join(info.searchPath, ",")),
); err != nil {
return err
}
}

}
return nil
},
}

var crdbInternalTxnsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.txns (
NODE_ID INT NOT NULL,
TXN_ID INT NOT NULL,
KV_TXN_UUID STRING,
KEY BYTES,
EPOCH INT NOT NULL,
TIMESTAMP TIMESTAMP NOT NULL,
TIMESTAMP_L DECIMAL NOT NULL,
PRIORITY INT NOT NULL,
SEQUENCE INT NOT NULL,
ISOLATION STRING NOT NULL,
NAME STRING NOT NULL,
STATUS STRING NOT NULL,
ORIG_TIMESTAMP TIMESTAMP NOT NULL,
ORIG_TIMESTAMP_L DECIMAL NOT NULL,
WRITING BOOL NOT NULL,
NUM_INTENTS INT NOT NULL
);
`,
populate: func(_ context.Context, p *planner, addRow func(...parser.Datum) error) error {
// TODO(knz We probably would like this table to report on all
// Txn objects currently active, but we do not yet have a proper
// registry for that. So for now the virtual table reports only
// the Txns known to Session objects).
if p.session.sessionRegistry == nil {
return nil
}
registry := p.session.sessionRegistry
registry.Lock()
defer registry.Unlock()
leaseMgr := p.LeaseMgr()
nodeID := parser.NewDInt(parser.DInt(int64(leaseMgr.nodeID.Get())))
for s := range registry.store {
if s.User != p.session.User && p.session.User != security.RootUser {
continue
}

txn := s.TxnState.txn
if txn == nil {
continue
}
id := txn.Proto().ID
idDatum := parser.DNull
if id != nil {
idDatum = parser.NewDString(id.String())
}
key := txn.Proto().Key
keyDatum := parser.DNull
if key != nil {
keyDatum = parser.NewDBytes(parser.DBytes(key))
}
if err := addRow(
nodeID,
parser.NewDInt(parser.DInt(int64(uintptr(unsafe.Pointer(txn))))),
idDatum,
keyDatum,
parser.NewDInt(parser.DInt(int64(txn.Proto().Epoch))),
parser.MakeDTimestamp(time.Unix(0, txn.Proto().Timestamp.WallTime), time.Microsecond),
parser.TimestampToDecimal(txn.Proto().Timestamp),
parser.NewDInt(parser.DInt(int64(txn.Proto().Priority))),
parser.NewDInt(parser.DInt(int64(txn.Proto().Sequence))),
parser.NewDString(txn.Proto().Isolation.String()),
parser.NewDString(txn.Proto().Name),
parser.NewDString(txn.Proto().Status.String()),
parser.MakeDTimestamp(time.Unix(0, txn.Proto().OrigTimestamp.WallTime), time.Microsecond),
parser.TimestampToDecimal(txn.Proto().OrigTimestamp),
parser.MakeDBool(parser.DBool(txn.Proto().Writing)),
parser.NewDInt(parser.DInt(int64(len(txn.Proto().Intents)))),
); err != nil {
return err
}
}
return nil
},
}

var crdbInternalLeasesTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.leases (
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ type Executor struct {
reCache *parser.RegexpCache
virtualSchemas virtualSchemaHolder

sessionRegistry sessionRegistry
// Transient stats.
SelectCount *metric.Counter
// The subset of SELECTs that are processed through DistSQL.
Expand Down Expand Up @@ -305,6 +306,8 @@ func NewExecutor(cfg ExecutorConfig, stopper *stop.Stopper) *Executor {
stopper: stopper,
reCache: parser.NewRegexpCache(512),

sessionRegistry: makeSessionRegistry(),

TxnBeginCount: metric.NewCounter(MetaTxnBegin),
TxnCommitCount: metric.NewCounter(MetaTxnCommit),
TxnAbortCount: metric.NewCounter(MetaTxnAbort),
Expand Down Expand Up @@ -1575,6 +1578,17 @@ func (e *Executor) execStmt(
return Result{}, err
}

session.Lock()
session.QueryStatements[plan] = stmt.String()
session.Unlock()

defer func() {
session.Lock()
delete(session.QueryStatements, plan)
session.Unlock()

}()

planner.phaseTimes[plannerStartExecStmt] = timeutil.Now()
if useDistSQL {
err = e.execDistSQL(planner, plan, &result)
Expand Down Expand Up @@ -1628,6 +1642,17 @@ func (e *Executor) execStmtInParallel(stmt parser.Statement, planner *planner) (
}
defer result.Close(ctx)

session.Lock()
session.QueryStatements[plan] = stmt.String()
session.Unlock()

defer func() {
session.Lock()
delete(session.QueryStatements, plan)
session.Unlock()

}()

planner.phaseTimes[plannerStartExecStmt] = timeutil.Now()
err = e.execClassic(planner, plan, &result)
planner.phaseTimes[plannerEndExecStmt] = timeutil.Now()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/parser/keywords.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/parser/reserved_keywords.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/sql/parser/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ func (node *ShowUsers) Format(buf *bytes.Buffer, f FmtFlags) {
buf.WriteString("SHOW USERS")
}

// ShowProcessList represents a SHOW PRPCESSLIST statement.
type ShowProcessList struct {
}

// Format implements the NodeFormatter interface.
func (node *ShowProcessList) Format(buf *bytes.Buffer, f FmtFlags) {
buf.WriteString("SHOW PROCESSLIST")
}

// Help represents a HELP statement.
type Help struct {
Name Name
Expand Down
Loading