Skip to content

Commit

Permalink
proxy: stop listening on error or empty proxyConf.
Browse files Browse the repository at this point in the history
Since multiple proxies can be load balanced, the lb needs to know which proxy
is healthy.

Before this patch on error communicating with etcd or with an empty proxyConf
the proxy closes connections to the previous master but continues accepting
connections and closing them instantly.

With this patch, on error communicating with etcd or with empty proxyConf the
proxy stops listening. It will restart listening on successful etcd
communication and a populated proxyConf.

In this way a load balancer with a standard tcp check will detect the proxy as
not healthy and not balance to it.

This is enabled by default and can be disabled with `--stop-listening=false`
  • Loading branch information
sgotti committed Oct 28, 2015
1 parent c251ee3 commit b5df80e
Show file tree
Hide file tree
Showing 3 changed files with 421 additions and 45 deletions.
145 changes: 100 additions & 45 deletions cmd/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type config struct {
clusterName string
listenAddress string
port string
stopListening bool
debug bool
}

Expand All @@ -60,33 +61,82 @@ func init() {
cmdProxy.PersistentFlags().StringVar(&cfg.clusterName, "cluster-name", "", "cluster name")
cmdProxy.PersistentFlags().StringVar(&cfg.listenAddress, "listen-address", "127.0.0.1", "proxy listening address")
cmdProxy.PersistentFlags().StringVar(&cfg.port, "port", "5432", "proxy listening port")
cmdProxy.PersistentFlags().BoolVar(&cfg.stopListening, "stop-listening", true, "stop listening on etcd error or empty proxy conf")
cmdProxy.PersistentFlags().BoolVar(&cfg.debug, "debug", false, "enable debug logging")
}

type ClusterChecker struct {
id string
listenAddress string
port string
C chan pollon.ConfData
e *etcdm.EtcdManager

stopListening bool

listener *net.TCPListener
pp *pollon.Proxy
e *etcdm.EtcdManager
endPollonProxyCh chan error
}

func NewClusterChecker(id string, cfg config, C chan pollon.ConfData) (*ClusterChecker, error) {
func NewClusterChecker(id string, cfg config) (*ClusterChecker, error) {
etcdPath := filepath.Join(common.EtcdBasePath, cfg.clusterName)
e, err := etcdm.NewEtcdManager(cfg.etcdEndpoints, etcdPath, common.DefaultEtcdRequestTimeout)
if err != nil {
return nil, err
}

return &ClusterChecker{
id: id,
listenAddress: cfg.listenAddress,
port: cfg.port,
e: e,
C: C,
id: id,
listenAddress: cfg.listenAddress,
port: cfg.port,
stopListening: cfg.stopListening,
e: e,
endPollonProxyCh: make(chan error),
}, nil
}

func (c *ClusterChecker) startPollonProxy() error {
log.Infof("Starting proxying")
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(cfg.listenAddress, cfg.port))
if err != nil {
return fmt.Errorf("error resolving tcp addr %q: %v", addr.String(), err)
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return fmt.Errorf("error listening on tcp addr %q: %v", addr.String(), err)
}

pp, err := pollon.NewProxy(listener)
if err != nil {
return fmt.Errorf("error creating pollon proxy: %v", err)
}
c.pp = pp
c.listener = listener

go func() {
c.endPollonProxyCh <- c.pp.Start()
}()

return nil
}

func (c *ClusterChecker) stopPollonProxy() {
if c.pp != nil {
log.Infof("Stopping listening")
c.pp.Stop()
c.pp = nil
c.listener.Close()
c.listener = nil
}
}

func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) {
if c.pp != nil {
c.pp.C <- confData
}
}

func (c *ClusterChecker) SetProxyInfo(e *etcdm.EtcdManager, cvVersion int, ttl time.Duration) error {
proxyInfo := &cluster.ProxyInfo{
ID: c.id,
Expand All @@ -102,56 +152,81 @@ func (c *ClusterChecker) SetProxyInfo(e *etcdm.EtcdManager, cvVersion int, ttl t
return nil
}

func (c *ClusterChecker) Check() {
func (c *ClusterChecker) Check() error {
cv, _, err := c.e.GetClusterView()
if err != nil {
log.Errorf("cannot get clusterview: %v", err)
c.C <- pollon.ConfData{DestAddr: nil}
return
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
if c.stopListening {
c.stopPollonProxy()
}
return nil
}
log.Debugf(spew.Sprintf("clusterview: %#v", cv))
if cv == nil {
log.Infof("no clusterview available, closing connections to previous master")
c.C <- pollon.ConfData{DestAddr: nil}
return
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
if c.stopListening {
c.stopPollonProxy()
}
return nil
}
pc := cv.ProxyConf
if pc == nil {
log.Infof("no proxyconf available, closing connections to previous master")
c.C <- pollon.ConfData{DestAddr: nil}
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
if c.stopListening {
c.stopPollonProxy()
}
if err := c.SetProxyInfo(c.e, cv.Version, 2*cluster.DefaultProxyCheckInterval); err != nil {
log.Errorf("failed to update proxyInfo: %v", err)
}
return
return nil
}
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%s", pc.Host, pc.Port))
if err != nil {
log.Errorf("err: %v", err)
c.C <- pollon.ConfData{DestAddr: nil}
return
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
if c.stopListening {
c.stopPollonProxy()
}
return nil
}
log.Infof("master address: %v", addr)
if err = c.SetProxyInfo(c.e, cv.Version, 2*cluster.DefaultProxyCheckInterval); err != nil {
log.Errorf("failed to update proxyInfo: %v", err)
}
c.C <- pollon.ConfData{DestAddr: addr}

// Start pollon if not active
c.startPollonProxy()
c.sendPollonConfData(pollon.ConfData{DestAddr: addr})
return nil
}

func (c *ClusterChecker) Start() {
endCh := make(chan struct{})
func (c *ClusterChecker) Start() error {
endPollonProxyCh := make(chan error)
checkCh := make(chan error)
timerCh := time.NewTimer(0).C

for true {
select {
case <-timerCh:
go func() {
c.Check()
endCh <- struct{}{}
checkCh <- c.Check()
}()
case <-endCh:
case err := <-checkCh:
log.Debugf("err: %v", err)
if err != nil {
return fmt.Errorf("checker fatal error: %v", err)
}
timerCh = time.NewTimer(cluster.DefaultProxyCheckInterval).C
case err := <-endPollonProxyCh:
if err != nil {
return fmt.Errorf("proxy error: %v", err)
}
}
}
return nil
}

func main() {
Expand All @@ -173,29 +248,9 @@ func proxy(cmd *cobra.Command, args []string) {
id := fmt.Sprintf("%x", u[:4])
log.Infof("id: %s", id)

addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(cfg.listenAddress, cfg.port))
if err != nil {
log.Fatalf("error: %v", err)
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
log.Fatalf("error: %v", err)
}

proxy, err := pollon.NewProxy(listener)
if err != nil {
log.Fatalf("error: %v", err)
}

clusterChecker, err := NewClusterChecker(id, cfg, proxy.C)
clusterChecker, err := NewClusterChecker(id, cfg)
if err != nil {
log.Fatalf("cannot create cluster checker: %v", err)
}
go clusterChecker.Start()

err = proxy.Start()
if err != nil {
log.Fatalf("error: %v", err)
}
clusterChecker.Start()
}
Loading

0 comments on commit b5df80e

Please # to comment.