From e915a21703fa4608bb1cbd428a1dd930cab43aac Mon Sep 17 00:00:00 2001 From: Krisztian Litkey Date: Tue, 12 Mar 2024 15:23:30 +0200 Subject: [PATCH] cpuallocator: add basic CPU clustering support. Try to satisfy satisfy allocation requests consuming one or more full CPU clusters before falling back to smaller blocks of allocation. Try to squeeze the full allocation into a single die or a single package. Don't attempt any cluster-based allocation if neither of these is possible. Satisfy low-prio allocations using E-core clusters only. Use P-core clusters only to satisfy priority preferences above low-prio. Signed-off-by: Krisztian Litkey --- pkg/cpuallocator/allocator.go | 326 +++++++++++++++++++++++++- pkg/cpuallocator/cpuallocator_test.go | 218 +++++++++++++++++ 2 files changed, 542 insertions(+), 2 deletions(-) diff --git a/pkg/cpuallocator/allocator.go b/pkg/cpuallocator/allocator.go index 5200ccbe6..ea8ad4282 100644 --- a/pkg/cpuallocator/allocator.go +++ b/pkg/cpuallocator/allocator.go @@ -16,6 +16,7 @@ package cpuallocator import ( "fmt" + "slices" "sort" "github.com/containers/nri-plugins/pkg/utils/cpuset" @@ -35,10 +36,12 @@ const ( AllocIdlePackages AllocFlag = 1 << iota // AllocIdleNodes requests allocation of full idle NUMA nodes. AllocIdleNodes + // AllocIdleClusters requests allocation of full idle CPU clusters. + AllocIdleClusters // AllocIdleCores requests allocation of full idle cores (all threads in core). AllocIdleCores // AllocDefault is the default allocation preferences. - AllocDefault = AllocIdlePackages | AllocIdleCores + AllocDefault = AllocIdlePackages | AllocIdleClusters | AllocIdleCores logSource = "cpuallocator" ) @@ -87,10 +90,19 @@ type topologyCache struct { core map[idset.ID]cpuset.CPUSet cpuPriorities cpuPriorities // CPU priority mapping + clusters []*cpuCluster // CPU clusters + } type cpuPriorities [NumCPUPriorities]cpuset.CPUSet +type cpuCluster struct { + pkg idset.ID + die idset.ID + cluster idset.ID + cpus cpuset.CPUSet +} + // IDFilter helps filtering Ids. type IDFilter func(idset.ID) bool @@ -179,6 +191,181 @@ func (a *allocatorHelper) takeIdlePackages() { } } +var ( + emptyCPUSet = cpuset.New() +) + +// Allocate full idle CPU clusters. +func (a *allocatorHelper) takeIdleClusters() { + var ( + offline = a.sys.OfflineCPUs() + pickIdle = func(c *cpuCluster) (bool, cpuset.CPUSet) { + // we only take fully idle clusters + cset := c.cpus.Difference(offline) + free := cset.Intersection(a.from) + if free.IsEmpty() || !free.Equals(cset) { + a.Debug(" - omit %s, %d usable CPUs (%s)", c, free.Size(), free) + return false, emptyCPUSet + } + + a.Debug(" + pick %s, %d usable CPUs (%s)", c, free.Size(), free) + return true, free + } + preferTightestFit = func(cA, cB *cpuCluster, pkgA, pkgB, dieA, dieB int, csetA, csetB cpuset.CPUSet) (r int) { + defer func() { + if r < 0 { + a.Debug(" + prefer %s", cA) + a.Debug(" over %s", cB) + } + if r > 0 { + a.Debug(" + prefer %s", cB) + a.Debug(" over %s", cA) + } + a.Debug(" - misfit %s", cA) + a.Debug(" and %s", cB) + }() + + // prefer cluster which alone can satisfy the request, preferring tighter + cntA, cntB := csetA.Size(), csetB.Size() + if cntA >= a.cnt && cntB < a.cnt { + return -1 + } + if cntA < a.cnt && cntB >= a.cnt { + return 1 + } + if cntA >= a.cnt && cntB >= a.cnt { + if diff := cntA - cntB; diff != 0 { + return diff + } + // do stable sort: prefer smaller package, die, and cluster IDs + if cA.pkg != cB.pkg { + return cA.pkg - cB.pkg + } + if cA.die != cB.die { + return cA.die - cB.die + } + return cA.cluster - cB.cluster + } + + // prefer die which alone can satisfy the request, preferring tighter + if dieA >= a.cnt && dieB < a.cnt { + return -1 + } + if dieA < a.cnt && dieB >= a.cnt { + return 1 + } + if dieA >= a.cnt && dieB >= a.cnt { + if diff := dieA - dieB; diff != 0 { + return diff + } + // do stable sort: prefer smaller package, die, and cluster IDs + if cA.pkg != cB.pkg { + return cA.pkg - cB.pkg + } + if cA.die != cB.die { + return cA.die - cB.die + } + return cA.cluster - cB.cluster + } + + // prefer package which alone can satisfy the request, preferring tighter + if pkgA >= a.cnt && pkgB < a.cnt { + return -1 + } + if pkgA < a.cnt && pkgB >= a.cnt { + return 1 + } + if pkgA >= a.cnt && pkgB >= a.cnt { + if diff := pkgA - pkgB; diff != 0 { + return diff + } + // do stable sort: prefer smaller package, die, and cluster IDs + if cA.pkg != cB.pkg { + return cA.pkg - cB.pkg + } + if cA.die != cB.die { + return cA.die - cB.die + } + return cA.cluster - cB.cluster + } + + // both unusable (don't need stable sort, we won't use them anyway) + return 0 + } + + sorter = &clusterSorter{ + pick: pickIdle, + sort: preferTightestFit, + } + ) + + a.Debug("* takeIdleClusters()...") + + if len(a.topology.clusters) <= 1 { + return + } + + a.Debug("looking for %d %s CPUs from %s", a.cnt, a.prefer, a.from) + + a.sortCPUClusters(sorter) + + var ( + clusters = sorter.clusters + pkgCPUCnt = sorter.pkgCPUCnt + cpus = sorter.cpus + ) + + if len(clusters) < 1 { + return + } + + // tightest-fit cluster is a perfect fit, use it + c := clusters[0] + cset := cpus[c] + if cset.Size() == a.cnt { + log.Debug("=> picking single %s", c) + a.result = a.result.Union(cset) + a.from = a.from.Difference(cset) + a.cnt -= cset.Size() + return + } + + // tightest-fit cluster is too big, so allocation can't consume any cluster fully + if cset.Size() > a.cnt { + log.Debug(" => tightest-fit cluster too big, can't consume a full cluster") + return + } + + // bail out if no package can satisfy the allocation + if cnt := pkgCPUCnt[c.pkg]; cnt < a.cnt { + log.Debug(" => no package can satisfy the allocation, bail out") + } + + // start consuming clusters, until we're done + for i, c := range clusters { + cset := cpus[c] + + if a.cnt < cset.Size() { + log.Debug("=> %d more CPUs needed after allocation of %d clusters", a.cnt, i) + // XXX TODO: should restrict a.from to the same package, if that has enough + // CPUs to satisfy the request + return + } + + log.Debug("=> picking %d. %s", i, c) + + if a.cnt >= cset.Size() { + a.result = a.result.Union(cset) + a.from = a.from.Difference(cset) + a.cnt -= cset.Size() + } + + if a.cnt == 0 { + return + } + } +} + // Allocate full idle CPU cores. func (a *allocatorHelper) takeIdleCores() { a.Debug("* takeIdleCores()...") @@ -329,6 +516,9 @@ func (a *allocatorHelper) allocate() cpuset.CPUSet { if (a.flags & AllocIdlePackages) != 0 { a.takeIdlePackages() } + if a.cnt > 0 && (a.flags&AllocIdleClusters) != 0 { + a.takeIdleClusters() + } if a.cnt > 0 && (a.flags&AllocIdleCores) != 0 { a.takeIdleCores() } @@ -345,6 +535,85 @@ func (a *allocatorHelper) allocate() cpuset.CPUSet { return cpuset.New() } +type clusterSorter struct { + // function to pick or ignore a cluster + pick func(*cpuCluster) (bool, cpuset.CPUSet) + // function to sort slice of picked clusters + sort func(a, b *cpuCluster, pkgCntA, pkgCntB, dieCntA, dieCntB int, cpusA, cpusB cpuset.CPUSet) int + + // resulting cluster, available CPU count per package and die, available CPUs per cluster + clusters []*cpuCluster + pkgCPUCnt map[idset.ID]int + dieCPUCnt map[idset.ID]map[idset.ID]int + cpus map[*cpuCluster]cpuset.CPUSet +} + +func (a *allocatorHelper) sortCPUClusters(s *clusterSorter) { + var ( + clusters = []*cpuCluster{} + pkgCPUCnt = map[idset.ID]int{} + dieCPUCnt = map[idset.ID]map[idset.ID]int{} + cpus = map[*cpuCluster]cpuset.CPUSet{} + ) + + a.Debug("picking suitable clusters") + + for _, c := range a.topology.clusters { + var cset cpuset.CPUSet + + // pick or ignore cluster, determine usable cluster CPUs + if s.pick == nil { + cset = c.cpus + } else { + pick, usable := s.pick(c) + if !pick || usable.Size() == 0 { + continue + } + + cset = usable + } + + // collect cluster and usable CPUs + clusters = append(clusters, c) + cpus[c] = cset + + // count usable CPUs per package and die + if _, ok := dieCPUCnt[c.pkg]; !ok { + dieCPUCnt[c.pkg] = map[idset.ID]int{} + } + dieCPUCnt[c.pkg][c.die] += cset.Size() + pkgCPUCnt[c.pkg] += cset.Size() + } + + if a.DebugEnabled() { + log.Debug("number of collected usable CPUs:") + for pkg, cnt := range pkgCPUCnt { + log.Debug(" - package #%d: %d", pkg, cnt) + } + for pkg, dies := range dieCPUCnt { + for die, cnt := range dies { + log.Debug(" - die #%d/%d %d", pkg, die, cnt) + } + } + } + + // sort collected clusters + if s.sort != nil { + a.Debug("sorting picked clusters") + slices.SortFunc(clusters, func(cA, cB *cpuCluster) int { + pkgCPUsA, pkgCPUsB := pkgCPUCnt[cA.pkg], pkgCPUCnt[cB.pkg] + dieCPUsA, dieCPUsB := dieCPUCnt[cA.pkg][cA.die], dieCPUCnt[cB.pkg][cB.die] + cpusA, cpusB := cpus[cA], cpus[cB] + return s.sort(cA, cB, pkgCPUsA, pkgCPUsB, dieCPUsA, dieCPUsB, cpusA, cpusB) + }) + } + + s.clusters = clusters + s.pkgCPUCnt = pkgCPUCnt + s.dieCPUCnt = dieCPUCnt + s.cpus = cpus +} + func (ca *cpuAllocator) allocateCpus(from *cpuset.CPUSet, cnt int, prefer CPUPriority) (cpuset.CPUSet, error) { var result cpuset.CPUSet var err error @@ -389,7 +658,8 @@ func newTopologyCache(sys sysfs.System) topologyCache { c := topologyCache{ pkg: make(map[idset.ID]cpuset.CPUSet), node: make(map[idset.ID]cpuset.CPUSet), - core: make(map[idset.ID]cpuset.CPUSet)} + core: make(map[idset.ID]cpuset.CPUSet), + } if sys != nil { for _, id := range sys.PackageIDs() { c.pkg[id] = sys.Package(id).CPUSet() @@ -402,6 +672,7 @@ func newTopologyCache(sys sysfs.System) topologyCache { } } + c.discoverCPUClusters(sys) c.discoverCPUPriorities(sys) return c @@ -620,6 +891,36 @@ func (c *topologyCache) discoverCpufreqPriority(sys sysfs.System, pkgID idset.ID return prios } +func (c *topologyCache) discoverCPUClusters(sys sysfs.System) { + if sys == nil { + return + } + + for _, id := range sys.PackageIDs() { + pkg := sys.Package(id) + clusters := []*cpuCluster{} + for _, die := range pkg.DieIDs() { + for _, cl := range pkg.LogicalDieClusterIDs(id) { + cpus := pkg.LogicalDieClusterCPUSet(die, cl) + clusters = append(clusters, &cpuCluster{ + pkg: id, + die: die, + cluster: cl, + cpus: cpus, + }) + } + } + if len(clusters) > 1 { + log.Debug("package #%d has %d clusters:", id, len(clusters)) + for _, cl := range clusters { + log.Debug(" die #%d, cluster #%d: cpus %s", + cl.die, cl.cluster, cl.cpus) + } + c.clusters = append(c.clusters, clusters...) + } + } +} + func (p CPUPriority) String() string { switch p { case PriorityHigh: @@ -665,3 +966,24 @@ func (c *cpuPriorities) cmpCPUSet(csetA, csetB cpuset.CPUSet, prefer CPUPriority } return 0 } + +func (c *cpuCluster) HasSmallerIDsThan(o *cpuCluster) bool { + if c.pkg < o.pkg { + return true + } + if c.pkg > o.pkg { + return false + } + if c.die < o.die { + return true + } + if c.die > o.die { + return false + } + return c.cluster < o.cluster +} + +func (c *cpuCluster) String() string { + return fmt.Sprintf("cluster #%d/%d/%d, %d CPUs (%s)", c.pkg, c.die, c.cluster, + c.cpus.Size(), c.cpus) +} diff --git a/pkg/cpuallocator/cpuallocator_test.go b/pkg/cpuallocator/cpuallocator_test.go index c1969e6bc..bfaeed5d6 100644 --- a/pkg/cpuallocator/cpuallocator_test.go +++ b/pkg/cpuallocator/cpuallocator_test.go @@ -24,6 +24,8 @@ import ( "github.com/containers/nri-plugins/pkg/sysfs" "github.com/containers/nri-plugins/pkg/utils" + + logger "github.com/containers/nri-plugins/pkg/log" ) func TestAllocatorHelper(t *testing.T) { @@ -99,3 +101,219 @@ func TestAllocatorHelper(t *testing.T) { }) } } + +func TestClusteredAllocation(t *testing.T) { + if v := os.Getenv("ENABLE_DEBUG"); v != "" { + logger.EnableDebug(logSource) + } + + // Create tmpdir and decompress testdata there + tmpdir, err := ioutil.TempDir("", "nri-resource-policy-test-") + if err != nil { + t.Fatalf("failed to create tmpdir: %v", err) + } + defer os.RemoveAll(tmpdir) + + if err := utils.UncompressTbz2(path.Join("testdata", "sysfs.tar.bz2"), tmpdir); err != nil { + t.Fatalf("failed to decompress testdata: %v", err) + } + + // Discover mock system from the testdata + sys, err := sysfs.DiscoverSystemAt( + path.Join(tmpdir, "sysfs", "2-socket-4-node-40-core", "sys"), + sysfs.DiscoverCPUTopology, sysfs.DiscoverMemTopology) + if err != nil { + t.Fatalf("failed to discover mock system: %v", err) + } + topoCache := newTopologyCache(sys) + + // Fake cpu priorities: 5 cores from pkg #0 as high prio + // Package CPUs: #0: [0-19,40-59], #1: [20-39,60-79] + topoCache.cpuPriorities = [NumCPUPriorities]cpuset.CPUSet{ + cpuset.MustParse("0-79"), + } + + topoCache.clusters = []*cpuCluster{ + { + pkg: 0, + die: 0, + cluster: 0, + cpus: cpuset.MustParse("0-3"), + }, + { + pkg: 0, + die: 0, + cluster: 1, + cpus: cpuset.MustParse("4-7"), + }, + { + pkg: 0, + die: 0, + cluster: 2, + cpus: cpuset.MustParse("8-11"), + }, + { + pkg: 0, + die: 0, + cluster: 3, + cpus: cpuset.MustParse("12-15"), + }, + { + pkg: 0, + die: 0, + cluster: 4, + cpus: cpuset.MustParse("16-19"), + }, + { + pkg: 0, + die: 0, + cluster: 5, + cpus: cpuset.MustParse("40-43"), + }, + { + pkg: 0, + die: 0, + cluster: 6, + cpus: cpuset.MustParse("44-47"), + }, + { + pkg: 0, + die: 0, + cluster: 7, + cpus: cpuset.MustParse("48-51"), + }, + { + pkg: 0, + die: 0, + cluster: 8, + cpus: cpuset.MustParse("52-55"), + }, + { + pkg: 0, + die: 0, + cluster: 9, + cpus: cpuset.MustParse("56-59"), + }, + + { + pkg: 1, + die: 0, + cluster: 0, + cpus: cpuset.MustParse("20,22,24,26"), + }, + { + pkg: 1, + die: 0, + cluster: 1, + cpus: cpuset.MustParse("21,23,25,27"), + }, + { + pkg: 1, + die: 0, + cluster: 2, + cpus: cpuset.MustParse("28-31"), + }, + { + pkg: 1, + die: 0, + cluster: 3, + cpus: cpuset.MustParse("32-35"), + }, + { + pkg: 1, + die: 0, + cluster: 4, + cpus: cpuset.MustParse("36-39"), + }, + { + pkg: 1, + die: 0, + cluster: 5, + cpus: cpuset.MustParse("60-63"), + }, + { + pkg: 1, + die: 0, + cluster: 6, + cpus: cpuset.MustParse("64-67"), + }, + { + pkg: 1, + die: 0, + cluster: 7, + cpus: cpuset.MustParse("68-71"), + }, + { + pkg: 1, + die: 0, + cluster: 8, + cpus: cpuset.MustParse("72-75"), + }, + { + pkg: 1, + die: 0, + cluster: 9, + cpus: cpuset.MustParse("76-79"), + }, + } + + pkg0 := cpuset.MustParse("0-19,40-59") + pkg1 := cpuset.MustParse("20-39,60-79") + + tcs := []struct { + description string + from cpuset.CPUSet + cnt int + expected cpuset.CPUSet + }{ + { + description: "CPU cores worth one cluster", + from: pkg0, + cnt: 4, + expected: cpuset.MustParse("0-3"), + }, + { + description: "CPU cores worth 2 clusters", + from: pkg0, + cnt: 8, + expected: cpuset.MustParse("0-7"), + }, + { + description: "CPU cores worth 4 clusters in a package", + from: pkg0, + cnt: 16, + expected: cpuset.MustParse("0-15"), + }, + { + description: "CPU cores worth all clusters in a package", + from: pkg0, + cnt: 40, + expected: cpuset.MustParse("0-19,40-59"), + }, + { + description: "CPU cores 1 cluster more than available in the 1st package", + from: pkg0.Union(pkg1), + cnt: 44, + expected: cpuset.MustParse("0-19,20,22,24,26,40-59"), + }, + { + description: "CPU cores 2 clusters more than available in the 1st package", + from: pkg0.Union(pkg1), + cnt: 48, + expected: cpuset.MustParse("0-27,40-59"), + }, + } + + // Run tests + for _, tc := range tcs { + t.Run(tc.description, func(t *testing.T) { + a := newAllocatorHelper(sys, topoCache) + a.from = tc.from + a.cnt = tc.cnt + result := a.allocate() + if !result.Equals(tc.expected) { + t.Errorf("expected %q, result was %q", tc.expected, result) + } + }) + } +}