Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
cleanup metrics and error routines with a done channel (#280)
Browse files Browse the repository at this point in the history
* cleanup metrics and error routines with a done channel

* use two var form of chan receive, no need to if err now
  • Loading branch information
jipperinbham authored Feb 24, 2017
1 parent 9ce29a3 commit e30886a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/pipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (m *Pipe) Stop() {
c := make(chan bool)
m.chStop <- c
<-c
close(m.Err)
}
}
}
Expand Down
40 changes: 27 additions & 13 deletions pkg/transporter/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Pipeline struct {
// the transporter is running
Err error
sessionTicker *time.Ticker
done chan struct{}
}

// NewDefaultPipeline returns a new Transporter Pipeline with the given node tree, and
Expand Down Expand Up @@ -57,6 +58,7 @@ func NewPipeline(version string, source *Node, emit events.EmitFunc, interval ti
pipeline := &Pipeline{
source: source,
metricsTicker: time.NewTicker(interval),
done: make(chan struct{}),
}

if sessionStore != nil {
Expand Down Expand Up @@ -101,11 +103,10 @@ func (pipeline *Pipeline) Stop() {
}

// pipeline has stopped, emit one last round of metrics and send the exit event
close(pipeline.done)
pipeline.emitMetrics()
pipeline.source.pipe.Event <- events.NewExitEvent(time.Now().UnixNano(), pipeline.version, endpoints)
pipeline.emitter.Stop()

pipeline.metricsTicker.Stop()
}

// Run the pipeline
Expand All @@ -130,24 +131,37 @@ func (pipeline *Pipeline) Run() error {
// start error listener consumes all the events on the pipe's Err channel, and stops the pipeline
// when it receives one
func (pipeline *Pipeline) startErrorListener(cherr chan error) {
for err := range cherr {
if aerr, ok := err.(adaptor.Error); ok {
pipeline.source.pipe.Event <- events.NewErrorEvent(time.Now().UnixNano(), aerr.Path, aerr.Record, aerr.Error())
if aerr.Lvl == adaptor.ERROR || aerr.Lvl == adaptor.CRITICAL {
log.With("path", aerr.Path).Errorln(aerr)
for {
select {
case err, ok := <-cherr:
if !ok {
return
}
} else {
if pipeline.Err == nil {
pipeline.Err = err
if aerr, ok := err.(adaptor.Error); ok {
pipeline.source.pipe.Event <- events.NewErrorEvent(time.Now().UnixNano(), aerr.Path, aerr.Record, aerr.Error())
if aerr.Lvl == adaptor.ERROR || aerr.Lvl == adaptor.CRITICAL {
log.With("path", aerr.Path).Errorln(aerr)
}
} else {
if pipeline.Err == nil {
pipeline.Err = err
}
pipeline.Stop()
}
pipeline.Stop()
case <-pipeline.done:
return
}
}
}

func (pipeline *Pipeline) startMetricsGatherer() {
for range pipeline.metricsTicker.C {
pipeline.emitMetrics()
for {
select {
case <-pipeline.metricsTicker.C:
pipeline.emitMetrics()
case <-pipeline.done:
return
}
}
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/transporter/pipeline_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package transporter

import (
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
Expand All @@ -26,7 +28,14 @@ func setupFiles(in, out string) {
func TestFileToFile(t *testing.T) {
if testing.Short() {
t.Skip("skipping FileToFile in short mode")

}

ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}))
defer ts.Close()
ts.Start()

var (
tempDir = os.TempDir()
inFile = filepath.Join(tempDir, "in")
Expand All @@ -40,7 +49,7 @@ func TestFileToFile(t *testing.T) {
Add(NewNode("localfilein", "file", adaptor.Config{"uri": "file://" + inFile}))

// create the pipeline
p, err := NewDefaultPipeline(outNode, "", "", "", "test", 100*time.Millisecond)
p, err := NewDefaultPipeline(outNode, ts.URL, "", "", "test", 100*time.Millisecond)
if err != nil {
t.Errorf("can't create pipeline, got %s", err.Error())
t.FailNow()
Expand Down

0 comments on commit e30886a

Please # to comment.