Skip to content

Commit

Permalink
Merge pull request #206 from klihub/fixes/topology-discovery-improvem…
Browse files Browse the repository at this point in the history
…ents

sysfs, cpuallocator: topology discovery fixes and improvements.
  • Loading branch information
fmuyassarov authored Mar 21, 2024
2 parents c1b1f9f + 774c205 commit 7b2fece
Show file tree
Hide file tree
Showing 10 changed files with 1,132 additions and 88 deletions.
326 changes: 324 additions & 2 deletions pkg/cpuallocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cpuallocator

import (
"fmt"
"slices"
"sort"

"github.com/containers/nri-plugins/pkg/utils/cpuset"
Expand All @@ -35,10 +36,12 @@ const (
AllocIdlePackages AllocFlag = 1 << iota
// AllocIdleNodes requests allocation of full idle NUMA nodes.
AllocIdleNodes
// AllocIdleClusters requests allocation of full idle CPU clusters.
AllocIdleClusters
// AllocIdleCores requests allocation of full idle cores (all threads in core).
AllocIdleCores
// AllocDefault is the default allocation preferences.
AllocDefault = AllocIdlePackages | AllocIdleCores
AllocDefault = AllocIdlePackages | AllocIdleClusters | AllocIdleCores

logSource = "cpuallocator"
)
Expand Down Expand Up @@ -87,10 +90,19 @@ type topologyCache struct {
core map[idset.ID]cpuset.CPUSet

cpuPriorities cpuPriorities // CPU priority mapping
clusters []*cpuCluster // CPU clusters

}

type cpuPriorities [NumCPUPriorities]cpuset.CPUSet

type cpuCluster struct {
pkg idset.ID
die idset.ID
cluster idset.ID
cpus cpuset.CPUSet
}

// IDFilter helps filtering Ids.
type IDFilter func(idset.ID) bool

Expand Down Expand Up @@ -179,6 +191,181 @@ func (a *allocatorHelper) takeIdlePackages() {
}
}

var (
emptyCPUSet = cpuset.New()
)

// Allocate full idle CPU clusters.
func (a *allocatorHelper) takeIdleClusters() {
var (
offline = a.sys.OfflineCPUs()
pickIdle = func(c *cpuCluster) (bool, cpuset.CPUSet) {
// we only take fully idle clusters
cset := c.cpus.Difference(offline)
free := cset.Intersection(a.from)
if free.IsEmpty() || !free.Equals(cset) {
a.Debug(" - omit %s, %d usable CPUs (%s)", c, free.Size(), free)
return false, emptyCPUSet
}

a.Debug(" + pick %s, %d usable CPUs (%s)", c, free.Size(), free)
return true, free
}
preferTightestFit = func(cA, cB *cpuCluster, pkgA, pkgB, dieA, dieB int, csetA, csetB cpuset.CPUSet) (r int) {
defer func() {
if r < 0 {
a.Debug(" + prefer %s", cA)
a.Debug(" over %s", cB)
}
if r > 0 {
a.Debug(" + prefer %s", cB)
a.Debug(" over %s", cA)
}
a.Debug(" - misfit %s", cA)
a.Debug(" and %s", cB)
}()

// prefer cluster which alone can satisfy the request, preferring tighter
cntA, cntB := csetA.Size(), csetB.Size()
if cntA >= a.cnt && cntB < a.cnt {
return -1
}
if cntA < a.cnt && cntB >= a.cnt {
return 1
}
if cntA >= a.cnt && cntB >= a.cnt {
if diff := cntA - cntB; diff != 0 {
return diff
}
// do stable sort: prefer smaller package, die, and cluster IDs
if cA.pkg != cB.pkg {
return cA.pkg - cB.pkg
}
if cA.die != cB.die {
return cA.die - cB.die
}
return cA.cluster - cB.cluster
}

// prefer die which alone can satisfy the request, preferring tighter
if dieA >= a.cnt && dieB < a.cnt {
return -1
}
if dieA < a.cnt && dieB >= a.cnt {
return 1
}
if dieA >= a.cnt && dieB >= a.cnt {
if diff := dieA - dieB; diff != 0 {
return diff
}
// do stable sort: prefer smaller package, die, and cluster IDs
if cA.pkg != cB.pkg {
return cA.pkg - cB.pkg
}
if cA.die != cB.die {
return cA.die - cB.die
}
return cA.cluster - cB.cluster
}

// prefer package which alone can satisfy the request, preferring tighter
if pkgA >= a.cnt && pkgB < a.cnt {
return -1
}
if pkgA < a.cnt && pkgB >= a.cnt {
return 1
}
if pkgA >= a.cnt && pkgB >= a.cnt {
if diff := pkgA - pkgB; diff != 0 {
return diff
}
// do stable sort: prefer smaller package, die, and cluster IDs
if cA.pkg != cB.pkg {
return cA.pkg - cB.pkg
}
if cA.die != cB.die {
return cA.die - cB.die
}
return cA.cluster - cB.cluster
}

// both unusable (don't need stable sort, we won't use them anyway)
return 0
}

sorter = &clusterSorter{
pick: pickIdle,
sort: preferTightestFit,
}
)

a.Debug("* takeIdleClusters()...")

if len(a.topology.clusters) <= 1 {
return
}

a.Debug("looking for %d %s CPUs from %s", a.cnt, a.prefer, a.from)

a.sortCPUClusters(sorter)

var (
clusters = sorter.clusters
pkgCPUCnt = sorter.pkgCPUCnt
cpus = sorter.cpus
)

if len(clusters) < 1 {
return
}

// tightest-fit cluster is a perfect fit, use it
c := clusters[0]
cset := cpus[c]
if cset.Size() == a.cnt {
log.Debug("=> picking single %s", c)
a.result = a.result.Union(cset)
a.from = a.from.Difference(cset)
a.cnt -= cset.Size()
return
}

// tightest-fit cluster is too big, so allocation can't consume any cluster fully
if cset.Size() > a.cnt {
log.Debug(" => tightest-fit cluster too big, can't consume a full cluster")
return
}

// bail out if no package can satisfy the allocation
if cnt := pkgCPUCnt[c.pkg]; cnt < a.cnt {
log.Debug(" => no package can satisfy the allocation, bail out")
}

// start consuming clusters, until we're done
for i, c := range clusters {
cset := cpus[c]

if a.cnt < cset.Size() {
log.Debug("=> %d more CPUs needed after allocation of %d clusters", a.cnt, i)
// XXX TODO: should restrict a.from to the same package, if that has enough
// CPUs to satisfy the request
return
}

log.Debug("=> picking %d. %s", i, c)

if a.cnt >= cset.Size() {
a.result = a.result.Union(cset)
a.from = a.from.Difference(cset)
a.cnt -= cset.Size()
}

if a.cnt == 0 {
return
}
}
}

// Allocate full idle CPU cores.
func (a *allocatorHelper) takeIdleCores() {
a.Debug("* takeIdleCores()...")
Expand Down Expand Up @@ -329,6 +516,9 @@ func (a *allocatorHelper) allocate() cpuset.CPUSet {
if (a.flags & AllocIdlePackages) != 0 {
a.takeIdlePackages()
}
if a.cnt > 0 && (a.flags&AllocIdleClusters) != 0 {
a.takeIdleClusters()
}
if a.cnt > 0 && (a.flags&AllocIdleCores) != 0 {
a.takeIdleCores()
}
Expand All @@ -345,6 +535,85 @@ func (a *allocatorHelper) allocate() cpuset.CPUSet {
return cpuset.New()
}

type clusterSorter struct {
// function to pick or ignore a cluster
pick func(*cpuCluster) (bool, cpuset.CPUSet)
// function to sort slice of picked clusters
sort func(a, b *cpuCluster, pkgCntA, pkgCntB, dieCntA, dieCntB int, cpusA, cpusB cpuset.CPUSet) int

// resulting cluster, available CPU count per package and die, available CPUs per cluster
clusters []*cpuCluster
pkgCPUCnt map[idset.ID]int
dieCPUCnt map[idset.ID]map[idset.ID]int
cpus map[*cpuCluster]cpuset.CPUSet
}

func (a *allocatorHelper) sortCPUClusters(s *clusterSorter) {
var (
clusters = []*cpuCluster{}
pkgCPUCnt = map[idset.ID]int{}
dieCPUCnt = map[idset.ID]map[idset.ID]int{}
cpus = map[*cpuCluster]cpuset.CPUSet{}
)

a.Debug("picking suitable clusters")

for _, c := range a.topology.clusters {
var cset cpuset.CPUSet

// pick or ignore cluster, determine usable cluster CPUs
if s.pick == nil {
cset = c.cpus
} else {
pick, usable := s.pick(c)
if !pick || usable.Size() == 0 {
continue
}

cset = usable
}

// collect cluster and usable CPUs
clusters = append(clusters, c)
cpus[c] = cset

// count usable CPUs per package and die
if _, ok := dieCPUCnt[c.pkg]; !ok {
dieCPUCnt[c.pkg] = map[idset.ID]int{}
}
dieCPUCnt[c.pkg][c.die] += cset.Size()
pkgCPUCnt[c.pkg] += cset.Size()
}

if a.DebugEnabled() {
log.Debug("number of collected usable CPUs:")
for pkg, cnt := range pkgCPUCnt {
log.Debug(" - package #%d: %d", pkg, cnt)
}
for pkg, dies := range dieCPUCnt {
for die, cnt := range dies {
log.Debug(" - die #%d/%d %d", pkg, die, cnt)
}
}
}

// sort collected clusters
if s.sort != nil {
a.Debug("sorting picked clusters")
slices.SortFunc(clusters, func(cA, cB *cpuCluster) int {
pkgCPUsA, pkgCPUsB := pkgCPUCnt[cA.pkg], pkgCPUCnt[cB.pkg]
dieCPUsA, dieCPUsB := dieCPUCnt[cA.pkg][cA.die], dieCPUCnt[cB.pkg][cB.die]
cpusA, cpusB := cpus[cA], cpus[cB]
return s.sort(cA, cB, pkgCPUsA, pkgCPUsB, dieCPUsA, dieCPUsB, cpusA, cpusB)
})
}

s.clusters = clusters
s.pkgCPUCnt = pkgCPUCnt
s.dieCPUCnt = dieCPUCnt
s.cpus = cpus
}

func (ca *cpuAllocator) allocateCpus(from *cpuset.CPUSet, cnt int, prefer CPUPriority) (cpuset.CPUSet, error) {
var result cpuset.CPUSet
var err error
Expand Down Expand Up @@ -389,7 +658,8 @@ func newTopologyCache(sys sysfs.System) topologyCache {
c := topologyCache{
pkg: make(map[idset.ID]cpuset.CPUSet),
node: make(map[idset.ID]cpuset.CPUSet),
core: make(map[idset.ID]cpuset.CPUSet)}
core: make(map[idset.ID]cpuset.CPUSet),
}
if sys != nil {
for _, id := range sys.PackageIDs() {
c.pkg[id] = sys.Package(id).CPUSet()
Expand All @@ -402,6 +672,7 @@ func newTopologyCache(sys sysfs.System) topologyCache {
}
}

c.discoverCPUClusters(sys)
c.discoverCPUPriorities(sys)

return c
Expand Down Expand Up @@ -620,6 +891,36 @@ func (c *topologyCache) discoverCpufreqPriority(sys sysfs.System, pkgID idset.ID
return prios
}

func (c *topologyCache) discoverCPUClusters(sys sysfs.System) {
if sys == nil {
return
}

for _, id := range sys.PackageIDs() {
pkg := sys.Package(id)
clusters := []*cpuCluster{}
for _, die := range pkg.DieIDs() {
for _, cl := range pkg.LogicalDieClusterIDs(id) {
cpus := pkg.LogicalDieClusterCPUSet(die, cl)
clusters = append(clusters, &cpuCluster{
pkg: id,
die: die,
cluster: cl,
cpus: cpus,
})
}
}
if len(clusters) > 1 {
log.Debug("package #%d has %d clusters:", id, len(clusters))
for _, cl := range clusters {
log.Debug(" die #%d, cluster #%d: cpus %s",
cl.die, cl.cluster, cl.cpus)
}
c.clusters = append(c.clusters, clusters...)
}
}
}

func (p CPUPriority) String() string {
switch p {
case PriorityHigh:
Expand Down Expand Up @@ -665,3 +966,24 @@ func (c *cpuPriorities) cmpCPUSet(csetA, csetB cpuset.CPUSet, prefer CPUPriority
}
return 0
}

func (c *cpuCluster) HasSmallerIDsThan(o *cpuCluster) bool {
if c.pkg < o.pkg {
return true
}
if c.pkg > o.pkg {
return false
}
if c.die < o.die {
return true
}
if c.die > o.die {
return false
}
return c.cluster < o.cluster
}

func (c *cpuCluster) String() string {
return fmt.Sprintf("cluster #%d/%d/%d, %d CPUs (%s)", c.pkg, c.die, c.cluster,
c.cpus.Size(), c.cpus)
}
Loading

0 comments on commit 7b2fece

Please # to comment.