Skip to content

Commit

Permalink
pkg/bpf - remove overengineered errWorker infrastructure. unmarshal e…
Browse files Browse the repository at this point in the history
…rrs = panic()
  • Loading branch information
ti-mo committed Mar 31, 2020
1 parent 8128484 commit cc1d4c8
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 60 deletions.
20 changes: 0 additions & 20 deletions internal/pipeline/acct.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ func (p *Pipeline) startAcct() error {
return errors.Wrap(err, "starting probe")
}

// Watch the accounting probe for errors.
go p.acctErrorWorker()

log.Info("Started accounting probe and workers")

return nil
Expand Down Expand Up @@ -163,20 +160,3 @@ func (p *Pipeline) acctDestroyWorker() {
p.acctSinkMu.RUnlock()
}
}

// acctErrorWorker reads from acctProbe's error channel and terminates the
// program when an error occurs.
func (p *Pipeline) acctErrorWorker() {

errs := p.acctProbe.ErrChan()

for {
err, ok := <-errs
if !ok {
log.Debug("Pipeline's error event channel closed, stopping worker.")
break
}

log.Fatalln("Fatal error in acctProbe:", err)
}
}
1 change: 0 additions & 1 deletion pkg/bpf/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bpf
import "errors"

const (
errFmtSplitKprobe = "expected string of format 'k(ret)probe/<kernel-symbol>': %s"
errFmtSymNotFound = "kernel symbol '%s' not found, conntrack kernel module not loaded"
errKernelRelease = "invalid kernel release version '%s'"
)
Expand Down
9 changes: 0 additions & 9 deletions pkg/bpf/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func TestMain(m *testing.M) {
if err := acctProbe.Start(); err != nil {
log.Fatal(err)
}
go errWorker(acctProbe.ErrChan())

// Run tests, save the return code.
rc := m.Run()
Expand Down Expand Up @@ -325,14 +324,6 @@ func filterWorker(in <-chan Event, out chan<- Event, f func(Event) bool) {
}
}

// errWorker listens for errors on the Probe's error channel.
// Terminates the test suite when an error occurs.
func errWorker(ec <-chan error) {
for err := range ec {
log.Fatal("unexpected error from Probe:", err)
}
}

// readTimeout attempts a read from an Event channel, timing out
// when a message wasn't read after ms milliseconds.
func readTimeout(c <-chan Event, ms uint) (Event, error) {
Expand Down
32 changes: 2 additions & 30 deletions pkg/bpf/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ type Probe struct {
// Channel for receiving IDs of lost perf events.
lost chan uint64

// perfWorker error channel.
errs chan error

// Started status of the probe.
startMu sync.Mutex
started bool
Expand Down Expand Up @@ -274,7 +271,6 @@ func (ap *Probe) Start() error {
}

ap.lost = make(chan uint64)
ap.errs = make(chan error)

// Set up Readers for reading events from the perf ring buffers.
r, err := perf.NewReader(ap.collection.Maps[perfUpdateMap], 4096)
Expand Down Expand Up @@ -318,7 +314,6 @@ func (ap *Probe) Stop() error {
}

close(ap.lost)
close(ap.errs)

if err := ap.disablePerfEvents(); err != nil {
return err
Expand All @@ -338,32 +333,11 @@ func (ap *Probe) Kernel() kernel.Kernel {
return ap.kernel
}

// ErrChan returns an initialized Probe's unbuffered error channel.
// The error channel is unbuffered because it doesn't make sense to have
// stale error data. If there is no ready consumer on the channel, errors
// are dropped.
// Returns nil if the Probe has not been Start()ed yet.
func (ap *Probe) ErrChan() chan error {
return ap.errs
}

// Stats returns a snapshot copy of the Probe's statistics.
func (ap *Probe) Stats() ProbeStats {
return ap.stats.Get()
}

// sendError safely sends a message on the Probe's unbuffered errChan.
// If there is no ready channel receiver, sendError is a no-op. A return value
// of true means the error was successfully sent on the channel.
func (ap *Probe) sendError(err error) bool {
select {
case ap.errs <- err:
return true
default:
return false
}
}

// updateWorker reads binady flow update events from the Probe's ring buffer,
// unmarshals the events into Event structures and sends them on all registered
// consumers' event channels.
Expand All @@ -389,8 +363,7 @@ func (ap *Probe) updateWorker() {

var ae Event
if err := ae.unmarshalBinary(rec.RawSample); err != nil {
ap.sendError(errors.Wrap(err, "error unmarshaling Event byte array"))
continue
panic(err)
}

// Fan out update event to all registered consumers.
Expand Down Expand Up @@ -423,8 +396,7 @@ func (ap *Probe) destroyWorker() {

var ae Event
if err := ae.unmarshalBinary(rec.RawSample); err != nil {
ap.sendError(errors.Wrap(err, "error unmarshaling Event byte array"))
continue
panic(err)
}

// Fan out destroy event to all registered consumers.
Expand Down

0 comments on commit cc1d4c8

Please # to comment.