Skip to content

Commit

Permalink
cpuallocator: add basic CPU clustering support.
Browse files Browse the repository at this point in the history
Try to satisfy satisfy allocation requests consuming one
or more full CPU clusters before falling back to smaller
blocks of allocation. Try to squeeze the full allocation
into a single die or a single package. Don't attempt any
cluster-based allocation if neither of these is possible.

Satisfy low-prio allocations using E-core clusters only.
Use P-core clusters only to satisfy priority preferences
above low-prio.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
  • Loading branch information
klihub committed Mar 18, 2024
1 parent a349c38 commit e915a21
Show file tree
Hide file tree
Showing 2 changed files with 542 additions and 2 deletions.
326 changes: 324 additions & 2 deletions pkg/cpuallocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cpuallocator

import (
"fmt"
"slices"
"sort"

"github.com/containers/nri-plugins/pkg/utils/cpuset"
Expand All @@ -35,10 +36,12 @@ const (
AllocIdlePackages AllocFlag = 1 << iota
// AllocIdleNodes requests allocation of full idle NUMA nodes.
AllocIdleNodes
// AllocIdleClusters requests allocation of full idle CPU clusters.
AllocIdleClusters
// AllocIdleCores requests allocation of full idle cores (all threads in core).
AllocIdleCores
// AllocDefault is the default allocation preferences.
AllocDefault = AllocIdlePackages | AllocIdleCores
AllocDefault = AllocIdlePackages | AllocIdleClusters | AllocIdleCores

logSource = "cpuallocator"
)
Expand Down Expand Up @@ -87,10 +90,19 @@ type topologyCache struct {
core map[idset.ID]cpuset.CPUSet

cpuPriorities cpuPriorities // CPU priority mapping
clusters []*cpuCluster // CPU clusters

}

type cpuPriorities [NumCPUPriorities]cpuset.CPUSet

type cpuCluster struct {
pkg idset.ID
die idset.ID
cluster idset.ID
cpus cpuset.CPUSet
}

// IDFilter helps filtering Ids.
type IDFilter func(idset.ID) bool

Expand Down Expand Up @@ -179,6 +191,181 @@ func (a *allocatorHelper) takeIdlePackages() {
}
}

var (
emptyCPUSet = cpuset.New()
)

// Allocate full idle CPU clusters.
func (a *allocatorHelper) takeIdleClusters() {
var (
offline = a.sys.OfflineCPUs()
pickIdle = func(c *cpuCluster) (bool, cpuset.CPUSet) {
// we only take fully idle clusters
cset := c.cpus.Difference(offline)
free := cset.Intersection(a.from)
if free.IsEmpty() || !free.Equals(cset) {
a.Debug(" - omit %s, %d usable CPUs (%s)", c, free.Size(), free)
return false, emptyCPUSet
}

a.Debug(" + pick %s, %d usable CPUs (%s)", c, free.Size(), free)
return true, free
}
preferTightestFit = func(cA, cB *cpuCluster, pkgA, pkgB, dieA, dieB int, csetA, csetB cpuset.CPUSet) (r int) {
defer func() {
if r < 0 {
a.Debug(" + prefer %s", cA)
a.Debug(" over %s", cB)
}
if r > 0 {
a.Debug(" + prefer %s", cB)
a.Debug(" over %s", cA)
}
a.Debug(" - misfit %s", cA)
a.Debug(" and %s", cB)
}()

// prefer cluster which alone can satisfy the request, preferring tighter
cntA, cntB := csetA.Size(), csetB.Size()
if cntA >= a.cnt && cntB < a.cnt {
return -1
}
if cntA < a.cnt && cntB >= a.cnt {
return 1
}
if cntA >= a.cnt && cntB >= a.cnt {
if diff := cntA - cntB; diff != 0 {
return diff
}
// do stable sort: prefer smaller package, die, and cluster IDs
if cA.pkg != cB.pkg {
return cA.pkg - cB.pkg
}
if cA.die != cB.die {
return cA.die - cB.die
}
return cA.cluster - cB.cluster
}

// prefer die which alone can satisfy the request, preferring tighter
if dieA >= a.cnt && dieB < a.cnt {
return -1
}
if dieA < a.cnt && dieB >= a.cnt {
return 1
}
if dieA >= a.cnt && dieB >= a.cnt {
if diff := dieA - dieB; diff != 0 {
return diff
}
// do stable sort: prefer smaller package, die, and cluster IDs
if cA.pkg != cB.pkg {
return cA.pkg - cB.pkg
}
if cA.die != cB.die {
return cA.die - cB.die
}
return cA.cluster - cB.cluster
}

// prefer package which alone can satisfy the request, preferring tighter
if pkgA >= a.cnt && pkgB < a.cnt {
return -1
}
if pkgA < a.cnt && pkgB >= a.cnt {
return 1
}
if pkgA >= a.cnt && pkgB >= a.cnt {
if diff := pkgA - pkgB; diff != 0 {
return diff
}
// do stable sort: prefer smaller package, die, and cluster IDs
if cA.pkg != cB.pkg {
return cA.pkg - cB.pkg
}
if cA.die != cB.die {
return cA.die - cB.die
}
return cA.cluster - cB.cluster
}

// both unusable (don't need stable sort, we won't use them anyway)
return 0
}

sorter = &clusterSorter{
pick: pickIdle,
sort: preferTightestFit,
}
)

a.Debug("* takeIdleClusters()...")

if len(a.topology.clusters) <= 1 {
return
}

a.Debug("looking for %d %s CPUs from %s", a.cnt, a.prefer, a.from)

a.sortCPUClusters(sorter)

var (
clusters = sorter.clusters
pkgCPUCnt = sorter.pkgCPUCnt
cpus = sorter.cpus
)

if len(clusters) < 1 {
return
}

// tightest-fit cluster is a perfect fit, use it
c := clusters[0]
cset := cpus[c]
if cset.Size() == a.cnt {
log.Debug("=> picking single %s", c)
a.result = a.result.Union(cset)
a.from = a.from.Difference(cset)
a.cnt -= cset.Size()
return
}

// tightest-fit cluster is too big, so allocation can't consume any cluster fully
if cset.Size() > a.cnt {
log.Debug(" => tightest-fit cluster too big, can't consume a full cluster")
return
}

// bail out if no package can satisfy the allocation
if cnt := pkgCPUCnt[c.pkg]; cnt < a.cnt {
log.Debug(" => no package can satisfy the allocation, bail out")
}

// start consuming clusters, until we're done
for i, c := range clusters {
cset := cpus[c]

if a.cnt < cset.Size() {
log.Debug("=> %d more CPUs needed after allocation of %d clusters", a.cnt, i)
// XXX TODO: should restrict a.from to the same package, if that has enough
// CPUs to satisfy the request
return
}

log.Debug("=> picking %d. %s", i, c)

if a.cnt >= cset.Size() {
a.result = a.result.Union(cset)
a.from = a.from.Difference(cset)
a.cnt -= cset.Size()
}

if a.cnt == 0 {
return
}
}
}

// Allocate full idle CPU cores.
func (a *allocatorHelper) takeIdleCores() {
a.Debug("* takeIdleCores()...")
Expand Down Expand Up @@ -329,6 +516,9 @@ func (a *allocatorHelper) allocate() cpuset.CPUSet {
if (a.flags & AllocIdlePackages) != 0 {
a.takeIdlePackages()
}
if a.cnt > 0 && (a.flags&AllocIdleClusters) != 0 {
a.takeIdleClusters()
}
if a.cnt > 0 && (a.flags&AllocIdleCores) != 0 {
a.takeIdleCores()
}
Expand All @@ -345,6 +535,85 @@ func (a *allocatorHelper) allocate() cpuset.CPUSet {
return cpuset.New()
}

type clusterSorter struct {
// function to pick or ignore a cluster
pick func(*cpuCluster) (bool, cpuset.CPUSet)
// function to sort slice of picked clusters
sort func(a, b *cpuCluster, pkgCntA, pkgCntB, dieCntA, dieCntB int, cpusA, cpusB cpuset.CPUSet) int

// resulting cluster, available CPU count per package and die, available CPUs per cluster
clusters []*cpuCluster
pkgCPUCnt map[idset.ID]int
dieCPUCnt map[idset.ID]map[idset.ID]int
cpus map[*cpuCluster]cpuset.CPUSet
}

func (a *allocatorHelper) sortCPUClusters(s *clusterSorter) {
var (
clusters = []*cpuCluster{}
pkgCPUCnt = map[idset.ID]int{}
dieCPUCnt = map[idset.ID]map[idset.ID]int{}
cpus = map[*cpuCluster]cpuset.CPUSet{}
)

a.Debug("picking suitable clusters")

for _, c := range a.topology.clusters {
var cset cpuset.CPUSet

// pick or ignore cluster, determine usable cluster CPUs
if s.pick == nil {
cset = c.cpus
} else {
pick, usable := s.pick(c)
if !pick || usable.Size() == 0 {
continue
}

cset = usable
}

// collect cluster and usable CPUs
clusters = append(clusters, c)
cpus[c] = cset

// count usable CPUs per package and die
if _, ok := dieCPUCnt[c.pkg]; !ok {
dieCPUCnt[c.pkg] = map[idset.ID]int{}
}
dieCPUCnt[c.pkg][c.die] += cset.Size()
pkgCPUCnt[c.pkg] += cset.Size()
}

if a.DebugEnabled() {
log.Debug("number of collected usable CPUs:")
for pkg, cnt := range pkgCPUCnt {
log.Debug(" - package #%d: %d", pkg, cnt)
}
for pkg, dies := range dieCPUCnt {
for die, cnt := range dies {
log.Debug(" - die #%d/%d %d", pkg, die, cnt)
}
}
}

// sort collected clusters
if s.sort != nil {
a.Debug("sorting picked clusters")
slices.SortFunc(clusters, func(cA, cB *cpuCluster) int {
pkgCPUsA, pkgCPUsB := pkgCPUCnt[cA.pkg], pkgCPUCnt[cB.pkg]
dieCPUsA, dieCPUsB := dieCPUCnt[cA.pkg][cA.die], dieCPUCnt[cB.pkg][cB.die]
cpusA, cpusB := cpus[cA], cpus[cB]
return s.sort(cA, cB, pkgCPUsA, pkgCPUsB, dieCPUsA, dieCPUsB, cpusA, cpusB)
})
}

s.clusters = clusters
s.pkgCPUCnt = pkgCPUCnt
s.dieCPUCnt = dieCPUCnt
s.cpus = cpus
}

func (ca *cpuAllocator) allocateCpus(from *cpuset.CPUSet, cnt int, prefer CPUPriority) (cpuset.CPUSet, error) {
var result cpuset.CPUSet
var err error
Expand Down Expand Up @@ -389,7 +658,8 @@ func newTopologyCache(sys sysfs.System) topologyCache {
c := topologyCache{
pkg: make(map[idset.ID]cpuset.CPUSet),
node: make(map[idset.ID]cpuset.CPUSet),
core: make(map[idset.ID]cpuset.CPUSet)}
core: make(map[idset.ID]cpuset.CPUSet),
}
if sys != nil {
for _, id := range sys.PackageIDs() {
c.pkg[id] = sys.Package(id).CPUSet()
Expand All @@ -402,6 +672,7 @@ func newTopologyCache(sys sysfs.System) topologyCache {
}
}

c.discoverCPUClusters(sys)
c.discoverCPUPriorities(sys)

return c
Expand Down Expand Up @@ -620,6 +891,36 @@ func (c *topologyCache) discoverCpufreqPriority(sys sysfs.System, pkgID idset.ID
return prios
}

func (c *topologyCache) discoverCPUClusters(sys sysfs.System) {
if sys == nil {
return
}

for _, id := range sys.PackageIDs() {
pkg := sys.Package(id)
clusters := []*cpuCluster{}
for _, die := range pkg.DieIDs() {
for _, cl := range pkg.LogicalDieClusterIDs(id) {
cpus := pkg.LogicalDieClusterCPUSet(die, cl)
clusters = append(clusters, &cpuCluster{
pkg: id,
die: die,
cluster: cl,
cpus: cpus,
})
}
}
if len(clusters) > 1 {
log.Debug("package #%d has %d clusters:", id, len(clusters))
for _, cl := range clusters {
log.Debug(" die #%d, cluster #%d: cpus %s",
cl.die, cl.cluster, cl.cpus)
}
c.clusters = append(c.clusters, clusters...)
}
}
}

func (p CPUPriority) String() string {
switch p {
case PriorityHigh:
Expand Down Expand Up @@ -665,3 +966,24 @@ func (c *cpuPriorities) cmpCPUSet(csetA, csetB cpuset.CPUSet, prefer CPUPriority
}
return 0
}

func (c *cpuCluster) HasSmallerIDsThan(o *cpuCluster) bool {
if c.pkg < o.pkg {
return true
}
if c.pkg > o.pkg {
return false
}
if c.die < o.die {
return true
}
if c.die > o.die {
return false
}
return c.cluster < o.cluster
}

func (c *cpuCluster) String() string {
return fmt.Sprintf("cluster #%d/%d/%d, %d CPUs (%s)", c.pkg, c.die, c.cluster,
c.cpus.Size(), c.cpus)
}
Loading

0 comments on commit e915a21

Please # to comment.