diff --git a/agent/agent.go b/agent/agent.go index 284d078ebf..a5750e22ca 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -15,6 +15,7 @@ import ( const ( initialSessionFailureBackoff = 100 * time.Millisecond maxSessionFailureBackoff = 8 * time.Second + nodeUpdatePeriod = 20 * time.Second ) // Agent implements the primary node functionality for a member of a swarm @@ -134,9 +135,18 @@ func (a *Agent) run(ctx context.Context) { log.G(ctx).Debugf("(*Agent).run") defer log.G(ctx).Debugf("(*Agent).run exited") + // get the node description + nodeDescription, err := a.nodeDescriptionWithHostname(ctx) + if err != nil { + log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Errorf("agent: node description unavailable") + } + // nodeUpdateTicker is used to periodically check for updates to node description + nodeUpdateTicker := time.NewTicker(nodeUpdatePeriod) + defer nodeUpdateTicker.Stop() + var ( backoff time.Duration - session = newSession(ctx, a, backoff, "") // start the initial session + session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session registered = session.registered ready = a.ready // first session ready sessionq chan sessionOperation @@ -197,10 +207,42 @@ func (a *Agent) run(ctx context.Context) { log.G(ctx).Debugf("agent: rebuild session") // select a session registration delay from backoff range. - delay := time.Duration(rand.Int63n(int64(backoff))) - session = newSession(ctx, a, delay, session.sessionID) + delay := time.Duration(0) + if backoff > 0 { + delay = time.Duration(rand.Int63n(int64(backoff))) + } + session = newSession(ctx, a, delay, session.sessionID, nodeDescription) registered = session.registered sessionq = a.sessionq + case <-nodeUpdateTicker.C: + // skip this case if the registration isn't finished + if registered != nil { + continue + } + // get the current node description + newNodeDescription, err := a.nodeDescriptionWithHostname(ctx) + if err != nil { + log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Errorf("agent: updated node description unavailable") + } + + // if newNodeDescription is nil, it will cause a panic when + // trying to create a session. Typically this can happen + // if the engine goes down + if newNodeDescription == nil { + continue + } + + // if the node description has changed, update it to the new one + // and close the session. The old session will be stopped and a + // new one will be created with the updated description + if !reflect.DeepEqual(nodeDescription, newNodeDescription) { + nodeDescription = newNodeDescription + // close the session + log.G(ctx).Info("agent: found node update") + if err := session.close(); err != nil { + log.G(ctx).WithError(err).Error("agent: closing session for node update failed") + } + } case <-a.stopped: // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump // this loop a few times. @@ -337,6 +379,17 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api } } +// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available +func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) { + desc, err := a.config.Executor.Describe(ctx) + + // Override hostname + if a.config.Hostname != "" && desc != nil { + desc.Hostname = a.config.Hostname + } + return desc, err +} + // nodesEqual returns true if the node states are functionaly equal, ignoring status, // version and other superfluous fields. // diff --git a/agent/session.go b/agent/session.go index 43c0dc881c..c720a54f0c 100644 --- a/agent/session.go +++ b/agent/session.go @@ -42,7 +42,7 @@ type session struct { closed chan struct{} } -func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string) *session { +func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session { s := &session{ agent: agent, sessionID: sessionID, @@ -68,14 +68,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI s.addr = peer.Addr s.conn = cc - go s.run(ctx, delay) + go s.run(ctx, delay, description) return s } -func (s *session) run(ctx context.Context, delay time.Duration) { +func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) { time.Sleep(delay) // delay before registering. - if err := s.start(ctx); err != nil { + if err := s.start(ctx, description); err != nil { select { case s.errs <- err: case <-s.closed: @@ -94,24 +94,14 @@ func (s *session) run(ctx context.Context, delay time.Duration) { } // start begins the session and returns the first SessionMessage. -func (s *session) start(ctx context.Context) error { +func (s *session) start(ctx context.Context, description *api.NodeDescription) error { log.G(ctx).Debugf("(*session).start") - description, err := s.agent.config.Executor.Describe(ctx) - if err != nil { - log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor). - Errorf("node description unavailable") - return err - } - // Override hostname - if s.agent.config.Hostname != "" { - description.Hostname = s.agent.config.Hostname - } - errChan := make(chan error, 1) var ( msg *api.SessionMessage stream api.Dispatcher_SessionClient + err error ) // Note: we don't defer cancellation of this context, because the // streaming RPC is used after this function returned. We only cancel diff --git a/manager/dispatcher/dispatcher.go b/manager/dispatcher/dispatcher.go index 765651983f..791f7a7882 100644 --- a/manager/dispatcher/dispatcher.go +++ b/manager/dispatcher/dispatcher.go @@ -351,26 +351,10 @@ func (d *Dispatcher) isRunning() bool { return true } -// register is used for registration of node with particular dispatcher. -func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) { - // prevent register until we're ready to accept it - if err := d.isRunningLocked(); err != nil { - return "", err - } - - if err := d.nodes.CheckRateLimit(nodeID); err != nil { - return "", err - } - - // TODO(stevvooe): Validate node specification. - var node *api.Node - d.store.View(func(tx store.ReadTx) { - node = store.GetNode(tx, nodeID) - }) - if node == nil { - return "", ErrNodeNotFound - } - +// updateNode updates the description of a node and sets status to READY +// this is used during registration when a new node description is provided +// and during node updates when the node description changes +func (d *Dispatcher) updateNode(nodeID string, description *api.NodeDescription) error { d.nodeUpdatesLock.Lock() d.nodeUpdates[nodeID] = nodeUpdate{status: &api.NodeStatus{State: api.NodeStatus_READY}, description: description} numUpdates := len(d.nodeUpdates) @@ -380,7 +364,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a select { case d.processUpdatesTrigger <- struct{}{}: case <-d.ctx.Done(): - return "", d.ctx.Err() + return d.ctx.Err() } } @@ -389,12 +373,39 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a d.processUpdatesLock.Lock() select { case <-d.ctx.Done(): - return "", d.ctx.Err() + return d.ctx.Err() default: } d.processUpdatesCond.Wait() d.processUpdatesLock.Unlock() + return nil +} + +// register is used for registration of node with particular dispatcher. +func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) { + // prevent register until we're ready to accept it + if err := d.isRunningLocked(); err != nil { + return "", err + } + + if err := d.nodes.CheckRateLimit(nodeID); err != nil { + return "", err + } + + // TODO(stevvooe): Validate node specification. + var node *api.Node + d.store.View(func(tx store.ReadTx) { + node = store.GetNode(tx, nodeID) + }) + if node == nil { + return "", ErrNodeNotFound + } + + if err := d.updateNode(nodeID, description); err != nil { + return "", err + } + expireFunc := func() { nodeStatus := api.NodeStatus{State: api.NodeStatus_DOWN, Message: "heartbeat failure"} log.G(ctx).Debugf("heartbeat expiration") @@ -787,6 +798,10 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio } } else { sessionID = r.SessionID + // update the node description + if err := d.updateNode(nodeID, r.Description); err != nil { + return err + } } fields := logrus.Fields{