From b5df80ea38c85f4d7fd3fe0696e7f798ce7f67a7 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Tue, 27 Oct 2015 12:05:59 +0100 Subject: [PATCH] proxy: stop listening on error or empty proxyConf. Since multiple proxies can be load balanced, the lb needs to know which proxy is healthy. Before this patch on error communicating with etcd or with an empty proxyConf the proxy closes connections to the previous master but continues accepting connections and closing them instantly. With this patch, on error communicating with etcd or with empty proxyConf the proxy stops listening. It will restart listening on successful etcd communication and a populated proxyConf. In this way a load balancer with a standard tcp check will detect the proxy as not healthy and not balance to it. This is enabled by default and can be disabled with `--stop-listening=false` --- cmd/proxy/proxy.go | 145 +++++++++++++++++--------- tests/integration/proxy_test.go | 174 ++++++++++++++++++++++++++++++++ tests/integration/utils.go | 147 +++++++++++++++++++++++++++ 3 files changed, 421 insertions(+), 45 deletions(-) create mode 100644 tests/integration/proxy_test.go diff --git a/cmd/proxy/proxy.go b/cmd/proxy/proxy.go index f41ee2371..a3f4b7e92 100644 --- a/cmd/proxy/proxy.go +++ b/cmd/proxy/proxy.go @@ -50,6 +50,7 @@ type config struct { clusterName string listenAddress string port string + stopListening bool debug bool } @@ -60,6 +61,7 @@ func init() { cmdProxy.PersistentFlags().StringVar(&cfg.clusterName, "cluster-name", "", "cluster name") cmdProxy.PersistentFlags().StringVar(&cfg.listenAddress, "listen-address", "127.0.0.1", "proxy listening address") cmdProxy.PersistentFlags().StringVar(&cfg.port, "port", "5432", "proxy listening port") + cmdProxy.PersistentFlags().BoolVar(&cfg.stopListening, "stop-listening", true, "stop listening on etcd error or empty proxy conf") cmdProxy.PersistentFlags().BoolVar(&cfg.debug, "debug", false, "enable debug logging") } @@ -67,11 +69,16 @@ type ClusterChecker struct { id string listenAddress string port string - C chan pollon.ConfData - e *etcdm.EtcdManager + + stopListening bool + + listener *net.TCPListener + pp *pollon.Proxy + e *etcdm.EtcdManager + endPollonProxyCh chan error } -func NewClusterChecker(id string, cfg config, C chan pollon.ConfData) (*ClusterChecker, error) { +func NewClusterChecker(id string, cfg config) (*ClusterChecker, error) { etcdPath := filepath.Join(common.EtcdBasePath, cfg.clusterName) e, err := etcdm.NewEtcdManager(cfg.etcdEndpoints, etcdPath, common.DefaultEtcdRequestTimeout) if err != nil { @@ -79,14 +86,57 @@ func NewClusterChecker(id string, cfg config, C chan pollon.ConfData) (*ClusterC } return &ClusterChecker{ - id: id, - listenAddress: cfg.listenAddress, - port: cfg.port, - e: e, - C: C, + id: id, + listenAddress: cfg.listenAddress, + port: cfg.port, + stopListening: cfg.stopListening, + e: e, + endPollonProxyCh: make(chan error), }, nil } +func (c *ClusterChecker) startPollonProxy() error { + log.Infof("Starting proxying") + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(cfg.listenAddress, cfg.port)) + if err != nil { + return fmt.Errorf("error resolving tcp addr %q: %v", addr.String(), err) + } + + listener, err := net.ListenTCP("tcp", addr) + if err != nil { + return fmt.Errorf("error listening on tcp addr %q: %v", addr.String(), err) + } + + pp, err := pollon.NewProxy(listener) + if err != nil { + return fmt.Errorf("error creating pollon proxy: %v", err) + } + c.pp = pp + c.listener = listener + + go func() { + c.endPollonProxyCh <- c.pp.Start() + }() + + return nil +} + +func (c *ClusterChecker) stopPollonProxy() { + if c.pp != nil { + log.Infof("Stopping listening") + c.pp.Stop() + c.pp = nil + c.listener.Close() + c.listener = nil + } +} + +func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) { + if c.pp != nil { + c.pp.C <- confData + } +} + func (c *ClusterChecker) SetProxyInfo(e *etcdm.EtcdManager, cvVersion int, ttl time.Duration) error { proxyInfo := &cluster.ProxyInfo{ ID: c.id, @@ -102,56 +152,81 @@ func (c *ClusterChecker) SetProxyInfo(e *etcdm.EtcdManager, cvVersion int, ttl t return nil } -func (c *ClusterChecker) Check() { +func (c *ClusterChecker) Check() error { cv, _, err := c.e.GetClusterView() if err != nil { log.Errorf("cannot get clusterview: %v", err) - c.C <- pollon.ConfData{DestAddr: nil} - return + c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + if c.stopListening { + c.stopPollonProxy() + } + return nil } log.Debugf(spew.Sprintf("clusterview: %#v", cv)) if cv == nil { log.Infof("no clusterview available, closing connections to previous master") - c.C <- pollon.ConfData{DestAddr: nil} - return + c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + if c.stopListening { + c.stopPollonProxy() + } + return nil } pc := cv.ProxyConf if pc == nil { log.Infof("no proxyconf available, closing connections to previous master") - c.C <- pollon.ConfData{DestAddr: nil} + c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + if c.stopListening { + c.stopPollonProxy() + } if err := c.SetProxyInfo(c.e, cv.Version, 2*cluster.DefaultProxyCheckInterval); err != nil { log.Errorf("failed to update proxyInfo: %v", err) } - return + return nil } addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%s", pc.Host, pc.Port)) if err != nil { log.Errorf("err: %v", err) - c.C <- pollon.ConfData{DestAddr: nil} - return + c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) + if c.stopListening { + c.stopPollonProxy() + } + return nil } log.Infof("master address: %v", addr) if err = c.SetProxyInfo(c.e, cv.Version, 2*cluster.DefaultProxyCheckInterval); err != nil { log.Errorf("failed to update proxyInfo: %v", err) } - c.C <- pollon.ConfData{DestAddr: addr} + + // Start pollon if not active + c.startPollonProxy() + c.sendPollonConfData(pollon.ConfData{DestAddr: addr}) + return nil } -func (c *ClusterChecker) Start() { - endCh := make(chan struct{}) +func (c *ClusterChecker) Start() error { + endPollonProxyCh := make(chan error) + checkCh := make(chan error) timerCh := time.NewTimer(0).C for true { select { case <-timerCh: go func() { - c.Check() - endCh <- struct{}{} + checkCh <- c.Check() }() - case <-endCh: + case err := <-checkCh: + log.Debugf("err: %v", err) + if err != nil { + return fmt.Errorf("checker fatal error: %v", err) + } timerCh = time.NewTimer(cluster.DefaultProxyCheckInterval).C + case err := <-endPollonProxyCh: + if err != nil { + return fmt.Errorf("proxy error: %v", err) + } } } + return nil } func main() { @@ -173,29 +248,9 @@ func proxy(cmd *cobra.Command, args []string) { id := fmt.Sprintf("%x", u[:4]) log.Infof("id: %s", id) - addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(cfg.listenAddress, cfg.port)) - if err != nil { - log.Fatalf("error: %v", err) - } - - listener, err := net.ListenTCP("tcp", addr) - if err != nil { - log.Fatalf("error: %v", err) - } - - proxy, err := pollon.NewProxy(listener) - if err != nil { - log.Fatalf("error: %v", err) - } - - clusterChecker, err := NewClusterChecker(id, cfg, proxy.C) + clusterChecker, err := NewClusterChecker(id, cfg) if err != nil { log.Fatalf("cannot create cluster checker: %v", err) } - go clusterChecker.Start() - - err = proxy.Start() - if err != nil { - log.Fatalf("error: %v", err) - } + clusterChecker.Start() } diff --git a/tests/integration/proxy_test.go b/tests/integration/proxy_test.go new file mode 100644 index 000000000..1b56281d4 --- /dev/null +++ b/tests/integration/proxy_test.go @@ -0,0 +1,174 @@ +// Copyright 2015 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/sorintlab/stolon/common" + "github.com/sorintlab/stolon/pkg/cluster" + etcdm "github.com/sorintlab/stolon/pkg/etcd" + + "github.com/sorintlab/stolon/Godeps/_workspace/src/github.com/satori/go.uuid" +) + +func TestProxyListening(t *testing.T) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + te, err := NewTestEtcd(dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + etcdEndpoints := fmt.Sprintf("http://%s:%s", te.listenAddress, te.port) + if err := te.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := te.WaitUp(10 * time.Second); err != nil { + t.Fatalf("error waiting on etcd up: %v", err) + } + defer func() { + if te.cmd != nil { + te.Stop() + } + }() + + clusterName := uuid.NewV4().String() + + etcdPath := filepath.Join(common.EtcdBasePath, clusterName) + e, err := etcdm.NewEtcdManager(etcdEndpoints, etcdPath, common.DefaultEtcdRequestTimeout) + if err != nil { + t.Fatalf("cannot create etcd manager: %v", err) + } + + res, err := e.SetClusterData(cluster.KeepersState{}, + &cluster.ClusterView{ + Version: 1, + Config: &cluster.NilConfig{ + SleepInterval: cluster.DurationP(5 * time.Second), + KeeperFailInterval: cluster.DurationP(10 * time.Second), + }, + ProxyConf: &cluster.ProxyConf{ + // fake pg address, not relevant + Host: "localhost", + Port: "5432", + }, + }, 0) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + prevCDIndex := res.Node.ModifiedIndex + + tp, err := NewTestProxy(dir, clusterName, etcdEndpoints) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tp.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer tp.Stop() + + // tp should listen + if err := tp.WaitListening(10 * time.Second); err != nil { + t.Fatalf("expecting tp listening, but it's not listening.") + } + + // Stop etcd + te.Stop() + if err := te.WaitDown(10 * time.Second); err != nil { + t.Fatalf("error waiting on etcd down: %v", err) + } + + // tp should not listen because it cannot talk with etcd + if err := tp.WaitNotListening(10 * time.Second); err != nil { + t.Fatalf("expecting tp not listening due to failed etcd communication, but it's listening.") + } + + // Start etcd + if err := te.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := te.WaitUp(10 * time.Second); err != nil { + t.Fatalf("error waiting on etcd up: %v", err) + } + // tp should listen + if err := tp.WaitListening(10 * time.Second); err != nil { + t.Fatalf("expecting tp listening, but it's not listening.") + } + + // remove proxyConf + res, err = e.SetClusterData(cluster.KeepersState{}, + &cluster.ClusterView{ + Version: 1, + Config: &cluster.NilConfig{ + SleepInterval: cluster.DurationP(5 * time.Second), + KeeperFailInterval: cluster.DurationP(10 * time.Second), + }, + ProxyConf: nil, + }, prevCDIndex) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + prevCDIndex = res.Node.ModifiedIndex + + // tp should not listen because proxyConf is empty + if err := tp.WaitNotListening(10 * time.Second); err != nil { + t.Fatalf("expecting tp not listening due to empty proxyConf, but it's listening.") + } + + // Set proxyConf again + res, err = e.SetClusterData(cluster.KeepersState{}, + &cluster.ClusterView{ + Version: 1, + Config: &cluster.NilConfig{ + SleepInterval: cluster.DurationP(5 * time.Second), + KeeperFailInterval: cluster.DurationP(10 * time.Second), + }, + ProxyConf: &cluster.ProxyConf{ + // fake pg address, not relevant + Host: "localhost", + Port: "5432", + }, + }, prevCDIndex) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + prevCDIndex = res.Node.ModifiedIndex + + // tp should listen + if err := tp.WaitListening(10 * time.Second); err != nil { + t.Fatalf("expecting tp listening, but it's not listening.") + } + + // remove whole clusterview + _, err = e.SetClusterData(cluster.KeepersState{}, nil, prevCDIndex) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // tp should not listen because clusterView is empty + if err := tp.WaitNotListening(10 * time.Second); err != nil { + t.Fatalf("expecting tp not listening due to empty clusterView, but it's listening.") + } + +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index a777f3bba..8a8d74c81 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -497,6 +497,153 @@ func (ts *TestSentinel) Stop() { ts.cmd = nil } +type TestProxy struct { + id string + cmd *exec.Cmd + proxyBin string + args []string + + listenAddress string + port string +} + +func NewTestProxy(dir string, clusterName string, etcdEndpoints string, a ...string) (*TestProxy, error) { + u := uuid.NewV4() + id := fmt.Sprintf("%x", u[:4]) + + // Hack to find a free tcp port + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, err + } + defer ln.Close() + + listenAddress := ln.Addr().(*net.TCPAddr).IP.String() + port := strconv.Itoa(ln.Addr().(*net.TCPAddr).Port) + + args := []string{} + args = append(args, fmt.Sprintf("--cluster-name=%s", clusterName)) + args = append(args, fmt.Sprintf("--port=%s", port)) + args = append(args, fmt.Sprintf("--etcd-endpoints=%s", etcdEndpoints)) + args = append(args, "--debug") + args = append(args, a...) + + proxyBin := os.Getenv("STPROXY_BIN") + if proxyBin == "" { + return nil, fmt.Errorf("missing STPROXY_BIN env") + } + tp := &TestProxy{ + id: id, + proxyBin: proxyBin, + args: args, + listenAddress: listenAddress, + port: port, + } + return tp, nil +} + +func (tp *TestProxy) Start() error { + if tp.cmd != nil { + panic(fmt.Errorf("tp: %s, cmd not cleanly stopped", tp.id)) + } + tp.cmd = exec.Command(tp.proxyBin, tp.args...) + stdoutPipe, err := tp.cmd.StdoutPipe() + if err != nil { + return err + } + go func() { + scanner := bufio.NewScanner(stdoutPipe) + for scanner.Scan() { + fmt.Printf("[%s]: %s\n", tp.id, scanner.Text()) + } + }() + + stderrPipe, err := tp.cmd.StderrPipe() + if err != nil { + return err + } + go func() { + scanner := bufio.NewScanner(stderrPipe) + for scanner.Scan() { + fmt.Printf("[%s]: %s\n", tp.id, scanner.Text()) + } + }() + err = tp.cmd.Start() + if err != nil { + return err + } + return nil +} + +func (tp *TestProxy) Signal(sig os.Signal) error { + fmt.Printf("signalling proxy: %s with %s\n", tp.id, sig) + if tp.cmd == nil { + panic(fmt.Errorf("tp: %s, cmd is empty", tp.id)) + } + return tp.cmd.Process.Signal(sig) +} + +func (tp *TestProxy) Kill() { + fmt.Printf("killing proxy: %s\n", tp.id) + if tp.cmd == nil { + panic(fmt.Errorf("tp: %s, cmd is empty", tp.id)) + } + tp.cmd.Process.Signal(os.Kill) + tp.cmd.Wait() + tp.cmd = nil +} + +func (tp *TestProxy) Stop() { + fmt.Printf("stopping proxy: %s\n", tp.id) + if tp.cmd == nil { + panic(fmt.Errorf("tp: %s, cmd is empty", tp.id)) + } + tp.cmd.Process.Signal(os.Interrupt) + tp.cmd.Wait() + tp.cmd = nil +} + +func (tp *TestProxy) WaitProcess(timeout time.Duration) error { + timeoutCh := time.NewTimer(timeout).C + endCh := make(chan error) + go func() { + err := tp.cmd.Wait() + endCh <- err + }() + select { + case <-timeoutCh: + return fmt.Errorf("timeout waiting on process") + case <-endCh: + return nil + } +} + +func (tp *TestProxy) WaitListening(timeout time.Duration) error { + start := time.Now() + for time.Now().Add(-timeout).Before(start) { + _, err := net.DialTimeout("tcp", net.JoinHostPort(tp.listenAddress, tp.port), timeout-time.Now().Sub(start)) + if err == nil { + return nil + } + fmt.Printf("tp: %v, error: %v\n", tp.id, err) + time.Sleep(2 * time.Second) + } + return fmt.Errorf("timeout") +} + +func (tp *TestProxy) WaitNotListening(timeout time.Duration) error { + start := time.Now() + for time.Now().Add(-timeout).Before(start) { + _, err := net.DialTimeout("tcp", net.JoinHostPort(tp.listenAddress, tp.port), timeout-time.Now().Sub(start)) + if err != nil { + return nil + } + fmt.Printf("tp: %v, error: %v\n", tp.id, err) + time.Sleep(2 * time.Second) + } + return fmt.Errorf("timeout") +} + type TestEtcd struct { cmd *exec.Cmd etcdBin string