@@ -1285,24 +1285,46 @@ func (pi *PSQLInteractor) KillClient(clientID uint) error {
1285
1285
//
1286
1286
// Returns:
1287
1287
// - error: An error if any occurred during the operation.
1288
- func (pi * PSQLInteractor ) BackendConnections (ctx context.Context , shs []shard.Shardinfo ) error {
1289
- if err := pi .WriteHeader ("backend connection id" , "router" , "shard key name" , "hostname" , "pid" , "user" , "dbname" , "sync" , "tx_served" , "tx status" ); err != nil {
1288
+ func (pi * PSQLInteractor ) BackendConnections (ctx context.Context , shs []shard.Shardinfo , groupByClause * spqrparser.Group ) error {
1289
+ headers := []string {"backend connection id" , "router" , "shard key name" , "hostname" , "pid" , "user" , "dbname" , "sync" , "tx_served" , "tx status" }
1290
+ getters := []func (sh shard.Shardinfo ) string {
1291
+ func (sh shard.Shardinfo ) string { return fmt .Sprintf ("%d" , sh .ID ()) },
1292
+ func (sh shard.Shardinfo ) string {
1293
+ router := "no data"
1294
+ s , ok := sh .(shard.CoordShardinfo )
1295
+ if ok {
1296
+ router = s .Router ()
1297
+ }
1298
+ return router
1299
+ },
1300
+ func (sh shard.Shardinfo ) string { return sh .ShardKeyName () },
1301
+ func (sh shard.Shardinfo ) string { return sh .InstanceHostname () },
1302
+ func (sh shard.Shardinfo ) string { return fmt .Sprintf ("%d" , sh .Pid ()) },
1303
+ func (sh shard.Shardinfo ) string { return sh .Usr () },
1304
+ func (sh shard.Shardinfo ) string { return sh .DB () },
1305
+ func (sh shard.Shardinfo ) string { return strconv .FormatInt (sh .Sync (), 10 ) },
1306
+ func (sh shard.Shardinfo ) string { return strconv .FormatInt (sh .TxServed (), 10 ) },
1307
+ func (sh shard.Shardinfo ) string { return sh .TxStatus ().String () },
1308
+ }
1309
+
1310
+ if groupByClause != nil {
1311
+ return groupBy (headers , shs , getters , groupByClause .Col .ColName , pi )
1312
+ }
1313
+
1314
+ if err := pi .WriteHeader (headers ... ); err != nil {
1290
1315
spqrlog .Zero .Error ().Err (err ).Msg ("" )
1291
1316
return err
1292
1317
}
1293
1318
1294
1319
for _ , sh := range shs {
1295
- router := "no data"
1296
- s , ok := sh .(shard.CoordShardinfo )
1297
- if ok {
1298
- router = s .Router ()
1320
+ vals := []string {}
1321
+ for _ , getter := range getters {
1322
+ vals = append (vals , getter (sh ))
1299
1323
}
1300
-
1301
- if err := pi .WriteDataRow (fmt .Sprintf ("%d" , sh .ID ()), router , sh .ShardKeyName (), sh .InstanceHostname (), fmt .Sprintf ("%d" , sh .Pid ()), sh .Usr (), sh .DB (), strconv .FormatInt (sh .Sync (), 10 ), strconv .FormatInt (sh .TxServed (), 10 ), sh .TxStatus ().String ()); err != nil {
1324
+ if err := pi .WriteDataRow (vals ... ); err != nil {
1302
1325
spqrlog .Zero .Error ().Err (err ).Msg ("" )
1303
1326
return err
1304
1327
}
1305
-
1306
1328
}
1307
1329
1308
1330
return pi .CompleteMsg (len (shs ))
@@ -1390,3 +1412,29 @@ func (pi *PSQLInteractor) PreparedStatements(ctx context.Context, shs []shard.Pr
1390
1412
1391
1413
return pi .CompleteMsg (len (shs ))
1392
1414
}
1415
+
1416
+ func groupBy [T any ](headers []string , values []T , getters []func (s T ) string , groupBy string , pi * PSQLInteractor ) error {
1417
+ ind := - 1
1418
+ for i , header := range headers {
1419
+ if header == groupBy {
1420
+ if err := pi .WriteHeader (groupBy , "count" ); err != nil {
1421
+ return err
1422
+ }
1423
+ ind = i
1424
+ break
1425
+ }
1426
+ }
1427
+
1428
+ cnt := make (map [string ]int )
1429
+ for _ , value := range values {
1430
+ cnt [getters [ind ](value )]++
1431
+ }
1432
+
1433
+ for k , v := range cnt {
1434
+ if err := pi .WriteDataRow (k , fmt .Sprintf ("%d" , v )); err != nil {
1435
+ return err
1436
+ }
1437
+ }
1438
+
1439
+ return pi .CompleteMsg (len (cnt ))
1440
+ }
0 commit comments