Skip to content

Commit

Permalink
cpuallocator: add basic CPU clustering support.
Browse files Browse the repository at this point in the history
Try to satisfy satisfy allocation requests consuming one
or more full CPU clusters before falling back to smaller
blocks of allocation. Try to squeeze the full allocation
into a single die or a single package. Don't attempt any
cluster-based allocation if neither of these is possible.

Satisfy low-prio allocations using E-core clusters only.
Use P-core clusters only to satisfy priority preferences
above low-prio.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
  • Loading branch information
klihub committed Mar 14, 2024
1 parent 1c3d3da commit e7085e6
Show file tree
Hide file tree
Showing 2 changed files with 507 additions and 2 deletions.
270 changes: 268 additions & 2 deletions pkg/cpuallocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cpuallocator

import (
"fmt"
"math"
"sort"

"github.com/containers/nri-plugins/pkg/utils/cpuset"
Expand All @@ -35,10 +36,12 @@ const (
AllocIdlePackages AllocFlag = 1 << iota
// AllocIdleNodes requests allocation of full idle NUMA nodes.
AllocIdleNodes
// AllocIdleClusters requests allocation of full idle CPU clusters.
AllocIdleClusters
// AllocIdleCores requests allocation of full idle cores (all threads in core).
AllocIdleCores
// AllocDefault is the default allocation preferences.
AllocDefault = AllocIdlePackages | AllocIdleCores
AllocDefault = AllocIdlePackages | AllocIdleClusters | AllocIdleCores

logSource = "cpuallocator"
)
Expand Down Expand Up @@ -87,16 +90,29 @@ type topologyCache struct {
core map[idset.ID]cpuset.CPUSet

cpuPriorities cpuPriorities // CPU priority mapping
clusters []*cpuCluster // CPU clusters

}

type cpuPriorities [NumCPUPriorities]cpuset.CPUSet

type cpuCluster struct {
pkg idset.ID
die idset.ID
cluster idset.ID
cpus cpuset.CPUSet
kind sysfs.CoreKind
}

// IDFilter helps filtering Ids.
type IDFilter func(idset.ID) bool

// IDSorter helps sorting Ids.
type IDSorter func(int, int) bool

// clusterFilter helps picking/filtering CPU clusters.
type clusterFilter func(*cpuCluster) bool

// our logger instance
var log = logger.NewLogger(logSource)

Expand Down Expand Up @@ -126,6 +142,19 @@ func pickIds(idSlice []idset.ID, f IDFilter) []idset.ID {
return ids[0:idx]
}

// Pick CPU clusters by filtering according to a function.
func pickClusters(clusters []*cpuCluster, f clusterFilter) []*cpuCluster {
var picked []*cpuCluster

for _, c := range clusters {
if f == nil || f(c) {
picked = append(picked, c)
}
}

return picked
}

// newAllocatorHelper creates a new CPU allocatorHelper.
func newAllocatorHelper(sys sysfs.System, topo topologyCache) *allocatorHelper {
a := &allocatorHelper{
Expand Down Expand Up @@ -179,6 +208,186 @@ func (a *allocatorHelper) takeIdlePackages() {
}
}

// Try to fulfill the allocation using idle CPU clusters.
func (a *allocatorHelper) takeIdleClusters() {
a.Debug("* takeIdleClusters()...")

if len(a.topology.clusters) <= 1 {
return
}

var (
offline = a.sys.OfflineCPUs()
clusters = []*cpuCluster{}
dieCPUCount = map[idset.ID]map[idset.ID]int{}
pkgCPUCount = map[idset.ID]int{}
)

for _, c := range a.topology.clusters {
cset := c.cpus.Difference(offline)
free := cset.Intersection(a.from)
if free.IsEmpty() || !free.Equals(cset) {
continue
}

// we only take E-clusters for low-prio requests and P-clusters for other requests
if a.prefer != PriorityLow {
if c.kind == sysfs.EfficientCore {
continue
}
} else {
if c.kind == sysfs.PerformanceCore {
continue
}
}

// collect free clusters, calculate total free cpus per package and per die
clusters = append(clusters, c)
if _, ok := dieCPUCount[c.pkg]; !ok {
dieCPUCount[c.pkg] = map[idset.ID]int{}
}
dieCPUCount[c.pkg][c.die] += free.Size()
pkgCPUCount[c.pkg] += free.Size()
}

// filter out packages with too few free cpus
for pkg, cnt := range pkgCPUCount {
if cnt < a.cnt {
log.Debug(" => package #%d has too few CPUs in clusters (%d < %d)", pkg, cnt, a.cnt)
delete(pkgCPUCount, pkg)
delete(dieCPUCount, pkg)
}
}

if len(pkgCPUCount) == 0 {
log.Debug(" => none of the packages have enough free clusters")
return
}

// see if we can squeeze the allocation into a single die or a single package
dieCPUs := math.MaxInt
theDie := []int{}
pkgCPUs := math.MaxInt
thePkg := -1
for pkg, pkgCnt := range pkgCPUCount {
if pkgCnt < pkgCPUs {
pkgCnt = pkgCPUs
thePkg = pkg
}
if pkgCnt == pkgCPUs && pkg < thePkg {
thePkg = pkg
}
for die, cnt := range dieCPUCount[pkg] {
if cnt < a.cnt {
continue
}

if cnt > dieCPUs {
continue
}
if cnt < dieCPUs {
dieCPUs = cnt
theDie = []int{die, die}
continue
}

if pkg > theDie[0] {
continue
}
if pkg < theDie[0] {
theDie[0] = pkg
theDie[1] = die
continue
}
if die < theDie[1] {
theDie[1] = die
}
}
}

if len(theDie) == 2 {
pkg, die := theDie[0], theDie[1]
log.Debug(" => picking clusters from die #%d/%d (%d cpus total)",
pkg, die, dieCPUs)

rest := cpuset.New()
for _, c := range clusters {
log.Debug(" - considering cluster #%d/%d/%d (%s %s)", c.pkg, c.die, c.cluster,
c.kind, c.cpus)
if c.pkg < pkg || (c.pkg == pkg && c.die < die) {
continue
}
if c.pkg > pkg || (c.pkg == pkg && c.die > die) {
break
}

cset := c.cpus.Difference(offline)
if cset.Size() <= a.cnt {
log.Debug(" => picking full cluster #%d/%d/%d (%s cpus %s)",
c.pkg, c.die, c.cluster, c.kind, c.cpus)
a.result = a.result.Union(cset)
a.from = a.from.Difference(cset)
a.cnt -= cset.Size()
}
if a.cnt == 0 {
break
}
rest = rest.Union(cset)
}

if a.cnt == 0 {
return
}

if a.cnt > rest.Size() {
log.Fatalf("internal error: chosen die #%d/%d unexpectedly exhausted", pkg, die)
}

a.from = rest
return
}

if thePkg == -1 {
return
}

pkg := thePkg
log.Debug(" => picking clusters from package #%d (%d cpus total)", pkg, pkgCPUs)

rest := cpuset.New()
for _, c := range clusters {
if c.pkg < pkg {
continue
}
if c.pkg > pkg {
break
}

cset := c.cpus.Difference(offline)
if cset.Size() <= a.cnt {
log.Debug(" => picking full cluster #%d/%d/%d (%s cpus %s)",
c.pkg, c.die, c.cluster, c.kind, c.cpus)
a.result = a.result.Union(cset)
a.from = a.from.Difference(cset)
a.cnt -= cset.Size()
}
if a.cnt == 0 {
break
}
rest = rest.Union(cset)
}

if a.cnt == 0 {
return
}

if a.cnt > rest.Size() {
log.Fatalf("internal error: chosen package #%d exhausted", pkg)
}

a.from = rest
}

// Allocate full idle CPU cores.
func (a *allocatorHelper) takeIdleCores() {
a.Debug("* takeIdleCores()...")
Expand Down Expand Up @@ -329,6 +538,14 @@ func (a *allocatorHelper) allocate() cpuset.CPUSet {
if (a.flags & AllocIdlePackages) != 0 {
a.takeIdlePackages()
}
if a.cnt > 0 && (a.flags&AllocIdleClusters) != 0 {
a.takeIdleClusters()
}
/*
if a.cnt > 0 && (a.flags & AllocIdleClusters) != 0 {
a.takeSubCluster()
}
*/
if a.cnt > 0 && (a.flags&AllocIdleCores) != 0 {
a.takeIdleCores()
}
Expand Down Expand Up @@ -389,7 +606,8 @@ func newTopologyCache(sys sysfs.System) topologyCache {
c := topologyCache{
pkg: make(map[idset.ID]cpuset.CPUSet),
node: make(map[idset.ID]cpuset.CPUSet),
core: make(map[idset.ID]cpuset.CPUSet)}
core: make(map[idset.ID]cpuset.CPUSet),
}
if sys != nil {
for _, id := range sys.PackageIDs() {
c.pkg[id] = sys.Package(id).CPUSet()
Expand All @@ -403,6 +621,7 @@ func newTopologyCache(sys sysfs.System) topologyCache {
}

c.discoverCPUPriorities(sys)
c.discoverCPUClusters(sys)

return c
}
Expand Down Expand Up @@ -620,6 +839,37 @@ func (c *topologyCache) discoverCpufreqPriority(sys sysfs.System, pkgID idset.ID
return prios
}

func (c *topologyCache) discoverCPUClusters(sys sysfs.System) {
if sys == nil {
return
}

for _, id := range sys.PackageIDs() {
pkg := sys.Package(id)
clusters := []*cpuCluster{}
for _, die := range pkg.DieIDs() {
for _, cl := range pkg.LogicalDieClusterIDs(id) {
cpus := pkg.LogicalDieClusterCPUSet(die, cl)
clusters = append(clusters, &cpuCluster{
pkg: id,
die: die,
cluster: cl,
cpus: cpus,
kind: sys.CPU(cpus.List()[0]).CoreKind(),
})
}
}
if len(clusters) > 1 {
log.Debug("package #%d has %d clusters:", id, len(clusters))
for _, cl := range clusters {
log.Debug(" die #%d, cluster #%d: %s cpus %s",
cl.die, cl.cluster, cl.kind, cl.cpus)
}
c.clusters = append(c.clusters, clusters...)
}
}
}

func (p CPUPriority) String() string {
switch p {
case PriorityHigh:
Expand Down Expand Up @@ -665,3 +915,19 @@ func (c *cpuPriorities) cmpCPUSet(csetA, csetB cpuset.CPUSet, prefer CPUPriority
}
return 0
}

func (c *cpuCluster) HasSmallerIDsThan(o *cpuCluster) bool {
if c.pkg < o.pkg {
return true
}
if c.pkg > o.pkg {
return false
}
if c.die < o.die {
return true
}
if c.die > o.die {
return false
}
return c.cluster < o.cluster
}
Loading

0 comments on commit e7085e6

Please # to comment.