diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index fd09b81bbb6..9ec676a4f82 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -22,6 +22,7 @@ import ( "net" "net/http" _ "net/http/pprof" + "net/url" "os" "path" "reflect" @@ -192,38 +193,88 @@ func startEtcdOrProxyV2() { osutil.Exit(0) } -// startEtcd launches the etcd server and HTTP handlers for client/server communication. -func startEtcd(cfg *config) (<-chan struct{}, error) { - urlsmap, token, err := getPeerURLsMapAndToken(cfg, "etcd") - if err != nil { - return nil, fmt.Errorf("error setting up initial cluster: %v", err) +type ErrEtcdConnFatal struct { + Err error +} + +func (e *ErrEtcdConnFatal) Error() string { + return fmt.Sprintf("ErrEtcdConnFatal: %v", e.Err) +} + +// EtcdConn is a helper struct that contains all the connection starting code +type EtcdConn struct { + PeerAutoTLS bool + ClientAutoTLS bool + PeerTLSInfo *transport.TLSInfo + ClientTLSInfo *transport.TLSInfo + CorsInfo *cors.CORSInfo + Lpurls []url.URL + Lcurls []url.URL + Dir string + Plog *capnslog.PackageLogger + + listen func(*etcdserver.EtcdServer) + defers []func() + err error +} + +// NewEtcdConn wraps the common net and listener code for embedded etcd reuse! +func NewEtcdConn(peerAutoTLS, clientAutoTLS bool, peerTLSInfo, clientTLSInfo *transport.TLSInfo, corsInfo *cors.CORSInfo, lpurls, lcurls []url.URL, dir string, plog *capnslog.PackageLogger) *EtcdConn { + return &EtcdConn{ + PeerAutoTLS: peerAutoTLS, + ClientAutoTLS: clientAutoTLS, + PeerTLSInfo: peerTLSInfo, + ClientTLSInfo: clientTLSInfo, + CorsInfo: corsInfo, + Lpurls: lpurls, + Lcurls: lcurls, + Dir: dir, + Plog: plog, + + listen: nil, + defers: []func(){}, + err: fmt.Errorf("not initialized"), } +} - if cfg.PeerAutoTLS && cfg.peerTLSInfo.Empty() { +func (ec *EtcdConn) Init() error { + var err error + if ec.PeerAutoTLS && ec.PeerTLSInfo != nil && ec.PeerTLSInfo.Empty() { var phosts []string - for _, u := range cfg.lpurls { + for _, u := range ec.Lpurls { phosts = append(phosts, u.Host) } - cfg.peerTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/peer"), phosts) + peerTLSInfoPointer, err := transport.SelfCert(path.Join(ec.Dir, "fixtures/peer"), phosts) + ec.PeerTLSInfo = &peerTLSInfoPointer if err != nil { - plog.Fatalf("could not get certs (%v)", err) + if ec.Plog != nil { + ec.Plog.Fatalf("could not get certs (%v)", err) + } else { + ec.err = fmt.Errorf("could not get certs (%v)", err) + return &ErrEtcdConnFatal{err} + } + } + } else if ec.PeerAutoTLS { + if ec.Plog != nil { + ec.Plog.Warningf("ignoring peer auto TLS since certs given") } - } else if cfg.PeerAutoTLS { - plog.Warningf("ignoring peer auto TLS since certs given") } - - if !cfg.peerTLSInfo.Empty() { - plog.Infof("peerTLS: %s", cfg.peerTLSInfo) + if ec.Plog != nil { + if !ec.PeerTLSInfo.Empty() { + ec.Plog.Infof("peerTLS: %s", ec.PeerTLSInfo) + } } var plns []net.Listener - for _, u := range cfg.lpurls { + for _, u := range ec.Lpurls { if u.Scheme == "http" { - if !cfg.peerTLSInfo.Empty() { - plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) - } - if cfg.peerTLSInfo.ClientCertAuth { - plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + if ec.Plog != nil { + if !ec.PeerTLSInfo.Empty() { + ec.Plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) + } + if ec.PeerTLSInfo.ClientCertAuth { + ec.Plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + } } } var ( @@ -231,63 +282,92 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { tlscfg *tls.Config ) - if !cfg.peerTLSInfo.Empty() { - tlscfg, err = cfg.peerTLSInfo.ServerConfig() + if ec.PeerTLSInfo != nil && !ec.PeerTLSInfo.Empty() { + tlscfg, err = ec.PeerTLSInfo.ServerConfig() if err != nil { - return nil, err + //return nil, err + ec.err = err + return err } } l, err = rafthttp.NewListener(u, tlscfg) if err != nil { - return nil, err + //return nil, err + ec.err = err + return err } urlStr := u.String() - plog.Info("listening for peers on ", urlStr) - defer func() { + if ec.Plog != nil { + ec.Plog.Info("listening for peers on ", urlStr) + } + ll := l // make a unique copy for the closure + d1 := func() { if err != nil { - l.Close() - plog.Info("stopping listening for peers on ", urlStr) + ll.Close() + if ec.Plog != nil { + ec.Plog.Info("stopping listening for peers on ", urlStr) + } } - }() + } + ec.defers = append(ec.defers, d1) plns = append(plns, l) } - if cfg.ClientAutoTLS && cfg.clientTLSInfo.Empty() { + if ec.ClientAutoTLS && ec.ClientTLSInfo != nil && ec.ClientTLSInfo.Empty() { var chosts []string - for _, u := range cfg.lcurls { + for _, u := range ec.Lcurls { chosts = append(chosts, u.Host) } - cfg.clientTLSInfo, err = transport.SelfCert(path.Join(cfg.Dir, "fixtures/client"), chosts) + clientTLSInfoPointer, err := transport.SelfCert(path.Join(ec.Dir, "fixtures/client"), chosts) + ec.ClientTLSInfo = &clientTLSInfoPointer if err != nil { - plog.Fatalf("could not get certs (%v)", err) + if ec.Plog != nil { + ec.Plog.Fatalf("could not get certs (%v)", err) + } else { + ec.err = fmt.Errorf("could not get certs (%v)", err) + return &ErrEtcdConnFatal{err} + } + } + } else if ec.ClientAutoTLS { + if ec.Plog != nil { + ec.Plog.Warningf("ignoring client auto TLS since certs given") } - } else if cfg.ClientAutoTLS { - plog.Warningf("ignoring client auto TLS since certs given") } var ctlscfg *tls.Config - if !cfg.clientTLSInfo.Empty() { - plog.Infof("clientTLS: %s", cfg.clientTLSInfo) - ctlscfg, err = cfg.clientTLSInfo.ServerConfig() + if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() { + if ec.Plog != nil { + ec.Plog.Infof("clientTLS: %s", ec.ClientTLSInfo) + } + ctlscfg, err = ec.ClientTLSInfo.ServerConfig() if err != nil { - return nil, err + //return nil, err + ec.err = err + return err } } sctxs := make(map[string]*serveCtx) - for _, u := range cfg.lcurls { + for _, u := range ec.Lcurls { if u.Scheme == "http" { - if !cfg.clientTLSInfo.Empty() { - plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String()) + if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() { + if ec.Plog != nil { + ec.Plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String()) + } } - if cfg.clientTLSInfo.ClientCertAuth { - plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + if ec.ClientTLSInfo != nil && ec.ClientTLSInfo.ClientCertAuth { + if ec.Plog != nil { + ec.Plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) + } } } if u.Scheme == "https" && ctlscfg == nil { - return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String()) + //return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String()) + err = fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String()) + ec.err = err + return err } ctx := &serveCtx{host: u.Host} @@ -312,13 +392,21 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { l, err = net.Listen("tcp", u.Host) if err != nil { - return nil, err + //return nil, err + ec.err = err + return err } var fdLimit uint64 if fdLimit, err = runtimeutil.FDLimit(); err == nil { if fdLimit <= reservedInternalFDNum { - plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) + if ec.Plog != nil { + ec.Plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) + } else { + err = fmt.Errorf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) + ec.err = err + return &ErrEtcdConnFatal{err} + } } l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum)) } @@ -326,19 +414,108 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { l, err = transport.NewKeepAliveListener(l, "tcp", nil) ctx.l = l if err != nil { - return nil, err + //return nil, err + ec.err = err + return err } - plog.Info("listening for client requests on ", u.Host) - defer func() { + if ec.Plog != nil { + ec.Plog.Info("listening for client requests on ", u.Host) + } + ll := l + msg := u.Host + d2 := func() { if err != nil { - l.Close() - plog.Info("stopping listening for client requests on ", u.Host) + ll.Close() + if ec.Plog != nil { + ec.Plog.Info("stopping listening for client requests on ", msg) + } } - }() + } + ec.defers = append(ec.defers, d2) sctxs[u.Host] = ctx } + // plns []net.Listener, ctlscfg *tls.Config, sctxs map[string]*serveCtx + listen := func(s *etcdserver.EtcdServer) { + ch := http.Handler(&cors.CORSHandler{ + Handler: v2http.NewClientHandler(s, s.Cfg.ReqTimeout()), + Info: ec.CorsInfo, + }) + ph := v2http.NewPeerHandler(s) + + // Start the peer server in a goroutine + for _, l := range plns { + go func(l net.Listener) { + e := servePeerHTTP(l, ph) + if ec.Plog != nil { + ec.Plog.Fatal(e) + } else { + os.Exit(1) // TODO: write on a ch to notify + } + }(l) + } + // Start a client server goroutine for each listen address + for _, sctx := range sctxs { + go func(sctx *serveCtx) { + // read timeout does not work with http close notify + // TODO: https://github.com/golang/go/issues/9524 + e := serve(sctx, s, ctlscfg, ch) + if ec.Plog != nil { + ec.Plog.Fatal(e) + } else { + os.Exit(1) // TODO: write on a ch to notify + } + }(sctx) + } + } + ec.listen = listen + ec.err = err // likely nil + return err +} + +func (ec *EtcdConn) Listen(s *etcdserver.EtcdServer) error { + if ec.err != nil { // error on init + return ec.err + } + ec.listen(s) + return nil +} + +func (ec *EtcdConn) Close() { + for _, f := range ec.defers { + f() + } +} + +// startEtcd launches the etcd server and HTTP handlers for client/server communication. +func startEtcd(cfg *config) (<-chan struct{}, error) { + + urlsmap, token, err := getPeerURLsMapAndToken(cfg, "etcd") + if err != nil { + return nil, fmt.Errorf("error setting up initial cluster: %v", err) + } + + ec := NewEtcdConn( + cfg.PeerAutoTLS, cfg.ClientAutoTLS, + &cfg.peerTLSInfo, &cfg.clientTLSInfo, + cfg.corsInfo, cfg.lpurls, cfg.lcurls, + cfg.Dir, plog, + ) + defer ec.Close() + // listen, defers, err := NewEtcdConn( + // cfg.PeerAutoTLS, cfg.ClientAutoTLS, + // &cfg.peerTLSInfo, &cfg.clientTLSInfo, + // cfg.corsInfo, cfg.lpurls, cfg.lcurls, + // cfg.Dir, plog, + // ) + // for _, d := range defers { + // defer d() + // } + if err = ec.Init(); err != nil { + return nil, err + } + srvcfg := &etcdserver.ServerConfig{ Name: cfg.Name, ClientURLs: cfg.acurls, @@ -373,26 +550,9 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { if cfg.corsInfo.String() != "" { plog.Infof("cors = %s", cfg.corsInfo) } - ch := http.Handler(&cors.CORSHandler{ - Handler: v2http.NewClientHandler(s, srvcfg.ReqTimeout()), - Info: cfg.corsInfo, - }) - ph := v2http.NewPeerHandler(s) - - // Start the peer server in a goroutine - for _, l := range plns { - go func(l net.Listener) { - plog.Fatal(servePeerHTTP(l, ph)) - }(l) - } - // Start a client server goroutine for each listen address - for _, sctx := range sctxs { - go func(sctx *serveCtx) { - // read timeout does not work with http close notify - // TODO: https://github.com/golang/go/issues/9524 - plog.Fatal(serve(sctx, s, ctlscfg, ch)) - }(sctx) - } + + //listen(s) + ec.Listen(s) <-s.ReadyNotify() return s.StopNotify(), nil