diff --git a/agent/agent.go b/agent/agent.go index 490289be28..a5750e22ca 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -238,6 +238,7 @@ func (a *Agent) run(ctx context.Context) { if !reflect.DeepEqual(nodeDescription, newNodeDescription) { nodeDescription = newNodeDescription // close the session + log.G(ctx).Info("agent: found node update") if err := session.close(); err != nil { log.G(ctx).WithError(err).Error("agent: closing session for node update failed") } diff --git a/manager/dispatcher/dispatcher.go b/manager/dispatcher/dispatcher.go index 765651983f..206c0866ae 100644 --- a/manager/dispatcher/dispatcher.go +++ b/manager/dispatcher/dispatcher.go @@ -11,7 +11,7 @@ import ( "google.golang.org/grpc/transport" "github.com/Sirupsen/logrus" - "github.com/docker/go-events" + events "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/api/equality" "github.com/docker/swarmkit/ca" @@ -351,26 +351,7 @@ func (d *Dispatcher) isRunning() bool { return true } -// register is used for registration of node with particular dispatcher. -func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) { - // prevent register until we're ready to accept it - if err := d.isRunningLocked(); err != nil { - return "", err - } - - if err := d.nodes.CheckRateLimit(nodeID); err != nil { - return "", err - } - - // TODO(stevvooe): Validate node specification. - var node *api.Node - d.store.View(func(tx store.ReadTx) { - node = store.GetNode(tx, nodeID) - }) - if node == nil { - return "", ErrNodeNotFound - } - +func (d *Dispatcher) updateNode(nodeID string, description *api.NodeDescription) (string, error) { d.nodeUpdatesLock.Lock() d.nodeUpdates[nodeID] = nodeUpdate{status: &api.NodeStatus{State: api.NodeStatus_READY}, description: description} numUpdates := len(d.nodeUpdates) @@ -395,6 +376,33 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a d.processUpdatesCond.Wait() d.processUpdatesLock.Unlock() + return "", nil +} + +// register is used for registration of node with particular dispatcher. +func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) { + // prevent register until we're ready to accept it + if err := d.isRunningLocked(); err != nil { + return "", err + } + + if err := d.nodes.CheckRateLimit(nodeID); err != nil { + return "", err + } + + // TODO(stevvooe): Validate node specification. + var node *api.Node + d.store.View(func(tx store.ReadTx) { + node = store.GetNode(tx, nodeID) + }) + if node == nil { + return "", ErrNodeNotFound + } + + if _, err := d.updateNode(nodeID, description); err != nil { + return "", err + } + expireFunc := func() { nodeStatus := api.NodeStatus{State: api.NodeStatus_DOWN, Message: "heartbeat failure"} log.G(ctx).Debugf("heartbeat expiration") @@ -787,6 +795,10 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio } } else { sessionID = r.SessionID + // update the node description + if _, err := d.updateNode(nodeID, r.Description); err != nil { + return err + } } fields := logrus.Fields{