Skip to content

Commit

Permalink
Merge pull request #1374 from nishanttotla/dynamically-update-node-in…
Browse files Browse the repository at this point in the history
…ventory-same-session

Dynamically update node inventory
  • Loading branch information
stevvooe authored Sep 6, 2016
2 parents 80b5796 + 8fa5bcd commit 47e75e7
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 41 deletions.
59 changes: 56 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const (
initialSessionFailureBackoff = 100 * time.Millisecond
maxSessionFailureBackoff = 8 * time.Second
nodeUpdatePeriod = 20 * time.Second
)

// Agent implements the primary node functionality for a member of a swarm
Expand Down Expand Up @@ -134,9 +135,18 @@ func (a *Agent) run(ctx context.Context) {
log.G(ctx).Debugf("(*Agent).run")
defer log.G(ctx).Debugf("(*Agent).run exited")

// get the node description
nodeDescription, err := a.nodeDescriptionWithHostname(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Errorf("agent: node description unavailable")
}
// nodeUpdateTicker is used to periodically check for updates to node description
nodeUpdateTicker := time.NewTicker(nodeUpdatePeriod)
defer nodeUpdateTicker.Stop()

var (
backoff time.Duration
session = newSession(ctx, a, backoff, "") // start the initial session
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
Expand Down Expand Up @@ -197,10 +207,42 @@ func (a *Agent) run(ctx context.Context) {
log.G(ctx).Debugf("agent: rebuild session")

// select a session registration delay from backoff range.
delay := time.Duration(rand.Int63n(int64(backoff)))
session = newSession(ctx, a, delay, session.sessionID)
delay := time.Duration(0)
if backoff > 0 {
delay = time.Duration(rand.Int63n(int64(backoff)))
}
session = newSession(ctx, a, delay, session.sessionID, nodeDescription)
registered = session.registered
sessionq = a.sessionq
case <-nodeUpdateTicker.C:
// skip this case if the registration isn't finished
if registered != nil {
continue
}
// get the current node description
newNodeDescription, err := a.nodeDescriptionWithHostname(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Errorf("agent: updated node description unavailable")
}

// if newNodeDescription is nil, it will cause a panic when
// trying to create a session. Typically this can happen
// if the engine goes down
if newNodeDescription == nil {
continue
}

// if the node description has changed, update it to the new one
// and close the session. The old session will be stopped and a
// new one will be created with the updated description
if !reflect.DeepEqual(nodeDescription, newNodeDescription) {
nodeDescription = newNodeDescription
// close the session
log.G(ctx).Info("agent: found node update")
if err := session.close(); err != nil {
log.G(ctx).WithError(err).Error("agent: closing session for node update failed")
}
}
case <-a.stopped:
// TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
// this loop a few times.
Expand Down Expand Up @@ -337,6 +379,17 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
}
}

// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) {
desc, err := a.config.Executor.Describe(ctx)

// Override hostname
if a.config.Hostname != "" && desc != nil {
desc.Hostname = a.config.Hostname
}
return desc, err
}

// nodesEqual returns true if the node states are functionaly equal, ignoring status,
// version and other superfluous fields.
//
Expand Down
22 changes: 6 additions & 16 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type session struct {
closed chan struct{}
}

func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string) *session {
func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session {
s := &session{
agent: agent,
sessionID: sessionID,
Expand All @@ -68,14 +68,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
s.addr = peer.Addr
s.conn = cc

go s.run(ctx, delay)
go s.run(ctx, delay, description)
return s
}

func (s *session) run(ctx context.Context, delay time.Duration) {
func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) {
time.Sleep(delay) // delay before registering.

if err := s.start(ctx); err != nil {
if err := s.start(ctx, description); err != nil {
select {
case s.errs <- err:
case <-s.closed:
Expand All @@ -94,24 +94,14 @@ func (s *session) run(ctx context.Context, delay time.Duration) {
}

// start begins the session and returns the first SessionMessage.
func (s *session) start(ctx context.Context) error {
func (s *session) start(ctx context.Context, description *api.NodeDescription) error {
log.G(ctx).Debugf("(*session).start")

description, err := s.agent.config.Executor.Describe(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
Errorf("node description unavailable")
return err
}
// Override hostname
if s.agent.config.Hostname != "" {
description.Hostname = s.agent.config.Hostname
}

errChan := make(chan error, 1)
var (
msg *api.SessionMessage
stream api.Dispatcher_SessionClient
err error
)
// Note: we don't defer cancellation of this context, because the
// streaming RPC is used after this function returned. We only cancel
Expand Down
59 changes: 37 additions & 22 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,26 +351,10 @@ func (d *Dispatcher) isRunning() bool {
return true
}

// register is used for registration of node with particular dispatcher.
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
// prevent register until we're ready to accept it
if err := d.isRunningLocked(); err != nil {
return "", err
}

if err := d.nodes.CheckRateLimit(nodeID); err != nil {
return "", err
}

// TODO(stevvooe): Validate node specification.
var node *api.Node
d.store.View(func(tx store.ReadTx) {
node = store.GetNode(tx, nodeID)
})
if node == nil {
return "", ErrNodeNotFound
}

// updateNode updates the description of a node and sets status to READY
// this is used during registration when a new node description is provided
// and during node updates when the node description changes
func (d *Dispatcher) updateNode(nodeID string, description *api.NodeDescription) error {
d.nodeUpdatesLock.Lock()
d.nodeUpdates[nodeID] = nodeUpdate{status: &api.NodeStatus{State: api.NodeStatus_READY}, description: description}
numUpdates := len(d.nodeUpdates)
Expand All @@ -380,7 +364,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
select {
case d.processUpdatesTrigger <- struct{}{}:
case <-d.ctx.Done():
return "", d.ctx.Err()
return d.ctx.Err()
}

}
Expand All @@ -389,12 +373,39 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
d.processUpdatesLock.Lock()
select {
case <-d.ctx.Done():
return "", d.ctx.Err()
return d.ctx.Err()
default:
}
d.processUpdatesCond.Wait()
d.processUpdatesLock.Unlock()

return nil
}

// register is used for registration of node with particular dispatcher.
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
// prevent register until we're ready to accept it
if err := d.isRunningLocked(); err != nil {
return "", err
}

if err := d.nodes.CheckRateLimit(nodeID); err != nil {
return "", err
}

// TODO(stevvooe): Validate node specification.
var node *api.Node
d.store.View(func(tx store.ReadTx) {
node = store.GetNode(tx, nodeID)
})
if node == nil {
return "", ErrNodeNotFound
}

if err := d.updateNode(nodeID, description); err != nil {
return "", err
}

expireFunc := func() {
nodeStatus := api.NodeStatus{State: api.NodeStatus_DOWN, Message: "heartbeat failure"}
log.G(ctx).Debugf("heartbeat expiration")
Expand Down Expand Up @@ -787,6 +798,10 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
}
} else {
sessionID = r.SessionID
// update the node description
if err := d.updateNode(nodeID, r.Description); err != nil {
return err
}
}

fields := logrus.Fields{
Expand Down

0 comments on commit 47e75e7

Please # to comment.