From 92d4272a472bc0a938b3f53ed7bb3d65fb66096e Mon Sep 17 00:00:00 2001 From: Cody Roseborough Date: Fri, 9 Sep 2016 09:40:05 -0700 Subject: [PATCH 1/2] Fixes #1142: Handle hanging plugin Adds handling of the timeout to the native go client which enforces that each request for {collect, process, publish} finishes in this amount of time. Ups the default client timeout to 10 seconds. --- control/available_plugin.go | 2 +- control/plugin/client/native.go | 22 ++++++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/control/available_plugin.go b/control/available_plugin.go index 430f1845d..aac591cba 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -44,7 +44,7 @@ import ( const ( // DefaultClientTimeout - default timeout for a client connection attempt - DefaultClientTimeout = time.Second * 3 + DefaultClientTimeout = time.Second * 10 // DefaultHealthCheckTimeout - default timeout for a health check DefaultHealthCheckTimeout = time.Second * 1 // DefaultHealthCheckFailureLimit - how any consecutive health check timeouts must occur to trigger a failure diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index 7c92237fc..95efae08b 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -52,6 +52,7 @@ type PluginNativeClient struct { pluginType plugin.PluginType encoder encoding.Encoder encrypter *encrypter.Encrypter + timeout time.Duration } func NewCollectorNativeClient(address string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginCollectorClient, error) { @@ -140,6 +141,15 @@ func decodeMetrics(bts []byte) ([]core.Metric, error) { return cmetrics, nil } +func enforceTimeout(p *PluginNativeClient, dl time.Duration, done chan int) { + select { + case <-time.After(dl): + p.Kill("Passed deadline") + case <-done: + return + } +} + func (p *PluginNativeClient) Publish(metrics []core.Metric, config map[string]ctypes.ConfigValue) error { args := plugin.PublishArgs{ @@ -152,9 +162,11 @@ func (p *PluginNativeClient) Publish(metrics []core.Metric, config map[string]ct if err != nil { return err } - var reply []byte + done := make(chan int) + go enforceTimeout(p, p.timeout, done) err = p.connection.Call("Publisher.Publish", out, &reply) + close(done) return err return nil } @@ -173,7 +185,10 @@ func (p *PluginNativeClient) Process(metrics []core.Metric, config map[string]ct } var reply []byte + done := make(chan int) + go enforceTimeout(p, p.timeout, done) err = p.connection.Call("Processor.Process", out, &reply) + close(done) if err != nil { return nil, err } @@ -211,14 +226,16 @@ func (p *PluginNativeClient) CollectMetrics(mts []core.Metric) ([]core.Metric, e } args := plugin.CollectMetricsArgs{MetricTypes: metricsToCollect} - out, err := p.encoder.Encode(args) if err != nil { return nil, err } var reply []byte + done := make(chan int) + go enforceTimeout(p, p.timeout, done) err = p.connection.Call("Collector.CollectMetrics", out, &reply) + close(done) if err != nil { return nil, err } @@ -303,6 +320,7 @@ func newNativeClient(address string, timeout time.Duration, t plugin.PluginType, p := &PluginNativeClient{ connection: r, pluginType: t, + timeout: timeout, } p.encoder = encoding.NewGobEncoder() From 372cdb1ee23cfbe27c40a770ea2be52c1be04375 Mon Sep 17 00:00:00 2001 From: Cody Roseborough Date: Fri, 9 Sep 2016 09:43:39 -0700 Subject: [PATCH 2/2] General code cleanup Removes duplicate return in client/native.go Removes useless TBD comment in control/runner.go Modifies mock-file-grpc/main_test.go to load the correct plugin when testing. --- control/plugin/client/native.go | 1 - control/runner.go | 1 - .../publisher/snap-plugin-publisher-mock-file-grpc/main_test.go | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index 95efae08b..f2eca3080 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -168,7 +168,6 @@ func (p *PluginNativeClient) Publish(metrics []core.Metric, config map[string]ct err = p.connection.Call("Publisher.Publish", out, &reply) close(done) return err - return nil } func (p *PluginNativeClient) Process(metrics []core.Metric, config map[string]ctypes.ConfigValue) ([]core.Metric, error) { diff --git a/control/runner.go b/control/runner.go index 93008498f..c2e663508 100644 --- a/control/runner.go +++ b/control/runner.go @@ -57,7 +57,6 @@ const ( MaxPluginRestartCount = 3 ) -// TBD type executablePlugin interface { Run(time.Duration) (plugin.Response, error) Kill() error diff --git a/plugin/publisher/snap-plugin-publisher-mock-file-grpc/main_test.go b/plugin/publisher/snap-plugin-publisher-mock-file-grpc/main_test.go index 2aacd6e09..f1ac32146 100644 --- a/plugin/publisher/snap-plugin-publisher-mock-file-grpc/main_test.go +++ b/plugin/publisher/snap-plugin-publisher-mock-file-grpc/main_test.go @@ -33,7 +33,7 @@ import ( ) var ( - PluginName = "snap-plugin-publisher-mock-file" + PluginName = "snap-plugin-publisher-mock-file-grpc" PluginType = "publisher" SnapPath = os.ExpandEnv(os.Getenv("SNAP_PATH")) PluginPath = path.Join(SnapPath, "plugin", PluginName)