Skip to content

feat: parallelize file processing in walk method #30

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"os"
"path/filepath"
"sync"

yaml "gopkg.in/yaml.v3"
)
Expand All @@ -33,6 +34,7 @@ type JSONHashStore struct {
path string
hashes map[string]string
strategy string
mu sync.RWMutex
}

func NewJSONHashStore(path, strategy string) (*JSONHashStore, error) {
Expand Down Expand Up @@ -61,11 +63,15 @@ func NewJSONHashStore(path, strategy string) (*JSONHashStore, error) {
}

func (s *JSONHashStore) Add(name, hash string) error {
s.mu.Lock()
defer s.mu.Unlock()
s.hashes[name] = hash
return nil
}

func (s *JSONHashStore) Get(name string) (string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.hashes[name], nil
}

Expand All @@ -75,7 +81,9 @@ func (s *JSONHashStore) Save() error {
return nil
}

s.mu.RLock()
b, err := json.MarshalIndent(s.hashes, "", " ")
s.mu.RUnlock()
if err != nil {
return err
}
Expand Down
193 changes: 149 additions & 44 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
"log"
"os"
"os/exec"

"path/filepath"
"strings"
"sync"
"time"

"github.com/chime/mani-diffy/pkg/helm"
Expand Down Expand Up @@ -44,9 +44,76 @@
ignoreSuffix string
}

// Thread-safe visited map
type VisitedMap struct {
sync.RWMutex
visited map[string]bool
}

func NewVisitedMap() *VisitedMap {
return &VisitedMap{
visited: make(map[string]bool),
}
}

func (vm *VisitedMap) Set(path string) {
vm.Lock()
defer vm.Unlock()
vm.visited[path] = true
}

func (vm *VisitedMap) Get(path string) bool {
vm.RLock()
defer vm.RUnlock()
return vm.visited[path]
}

// WorkerPool manages a pool of workers for processing files
type WorkerPool struct {
workers int
tasks chan func()
wg sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
pool := &WorkerPool{
workers: workers,
tasks: make(chan func(), workers), // Buffer size matches worker count
}
pool.start()
return pool
}

func (p *WorkerPool) start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.tasks {
task()
}
}()
}
}

func (p *WorkerPool) Submit(task func()) {
p.tasks <- task
}

func (p *WorkerPool) Wait() {
close(p.tasks)
p.wg.Wait()
}

// BatchProcessor handles batched file operations
type BatchProcessor struct {
files []os.DirEntry

Check failure on line 110 in main.go

View workflow job for this annotation

GitHub Actions / lint

field `files` is unused (unused)
path string

Check failure on line 111 in main.go

View workflow job for this annotation

GitHub Actions / lint

field `path` is unused (unused)
}

// Walk walks a directory tree looking for Argo applications and renders them
func (w *Walker) Walk(inputPath, outputPath string, maxDepth int, hashes HashStore) error {
visited := make(map[string]bool)
visited := NewVisitedMap()

if err := w.walk(inputPath, outputPath, 0, maxDepth, visited, hashes); err != nil {
return err
Expand All @@ -63,7 +130,7 @@
return nil
}

func pruneUnvisited(visited map[string]bool, outputPath string) error {
func pruneUnvisited(visited *VisitedMap, outputPath string) error {
files, err := os.ReadDir(outputPath)
if err != nil {
return err
Expand All @@ -75,7 +142,7 @@
}

path := filepath.Join(outputPath, f.Name())
if visited[path] {
if visited.Get(path) {
continue
}
if err := os.RemoveAll(path); err != nil {
Expand All @@ -86,9 +153,8 @@
return nil
}

func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited map[string]bool, hashes HashStore) error {
func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited *VisitedMap, hashes HashStore) error {
if maxDepth != InfiniteDepth {
// If we've reached the max depth, stop walking
if depth > maxDepth {
return nil
}
Expand All @@ -100,65 +166,104 @@
if err != nil {
return err
}

// Create a worker pool with optimal size
pool := NewWorkerPool(4)
errChan := make(chan error, len(fi))

for _, file := range fi {
if !strings.Contains(file.Name(), ".yaml") {
continue
}

crds, err := helm.Read(filepath.Join(inputPath, file.Name()))
if err != nil {
return err
}
for _, crd := range crds {
if crd.Kind != "Application" {
continue
file := file // Create a new variable for the closure
pool.Submit(func() {
crds, err := helm.Read(filepath.Join(inputPath, file.Name()))
if err != nil {
errChan <- err
return
}

if strings.HasSuffix(crd.ObjectMeta.Name, w.ignoreSuffix) {
continue
}
for _, crd := range crds {
if crd.Kind != "Application" {
continue
}

path := filepath.Join(outputPath, crd.ObjectMeta.Name)
visited[path] = true
if strings.HasSuffix(crd.ObjectMeta.Name, w.ignoreSuffix) {
continue
}

hash, err := hashes.Get(crd.ObjectMeta.Name)
// COMPARE HASHES HERE. STEP INTO RENDER IF NO MATCH
if err != nil {
return err
}
path := filepath.Join(outputPath, crd.ObjectMeta.Name)

hashGenerated, err := w.GenerateHash(crd)
if err != nil {
if errors.Is(err, kustomize.ErrNotSupported) {
continue
// Create the output directory if it doesn't exist
if err := os.MkdirAll(path, 0755); err != nil {
errChan <- err
return
}
return err
}

emptyManifest, err := helm.EmptyManifest(filepath.Join(path, "manifest.yaml"))
if err != nil {
return err
}
visited.Set(path)

// Check hash first to avoid unnecessary operations
hash, err := hashes.Get(crd.ObjectMeta.Name)
if err != nil {
errChan <- err
return
}

// Only proceed with hash generation if needed
if hash == "" {
hashGenerated, err := w.GenerateHash(crd)
if err != nil {
if errors.Is(err, kustomize.ErrNotSupported) {
continue
}
errChan <- err
return
}

emptyManifest, err := helm.EmptyManifest(filepath.Join(path, "manifest.yaml"))
if err != nil {
errChan <- err
return
}

if hashGenerated != hash || emptyManifest {
log.Printf("No match detected. Render: %s\n", crd.ObjectMeta.Name)
if err := w.Render(crd, path); err != nil {
if errors.Is(err, kustomize.ErrNotSupported) {
continue
if emptyManifest {
log.Printf("No match detected. Render: %s\n", crd.ObjectMeta.Name)
if err := w.Render(crd, path); err != nil {
if errors.Is(err, kustomize.ErrNotSupported) {
continue
}
errChan <- err
return
}

if err := hashes.Add(crd.ObjectMeta.Name, hashGenerated); err != nil {
errChan <- err
return
}
}
return err
}

if err := hashes.Add(crd.ObjectMeta.Name, hashGenerated); err != nil {
return err
// Process subdirectories sequentially
if err := w.walk(path, outputPath, depth+1, maxDepth, visited, hashes); err != nil {
errChan <- err
return
}
}
})
}

if err := w.walk(path, outputPath, depth+1, maxDepth, visited, hashes); err != nil {
return err
}
// Wait for all workers to complete
pool.Wait()
close(errChan)

// Check for any errors
for err := range errChan {
if err != nil {
return err
}
}

return nil
}

Expand Down
Loading