Skip to content

Commit

Permalink
Moving node updates to a separate function
Browse files Browse the repository at this point in the history
Signed-off-by: Nishant Totla <nishanttotla@gmail.com>
  • Loading branch information
nishanttotla committed Aug 31, 2016
1 parent 548c20d commit 3f8ae63
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 21 deletions.
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func (a *Agent) run(ctx context.Context) {
if !reflect.DeepEqual(nodeDescription, newNodeDescription) {
nodeDescription = newNodeDescription
// close the session
log.G(ctx).Info("agent: found node update")
if err := session.close(); err != nil {
log.G(ctx).WithError(err).Error("agent: closing session for node update failed")
}
Expand Down
54 changes: 33 additions & 21 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"google.golang.org/grpc/transport"

"github.com/Sirupsen/logrus"
"github.com/docker/go-events"
events "github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/ca"
Expand Down Expand Up @@ -351,26 +351,7 @@ func (d *Dispatcher) isRunning() bool {
return true
}

// register is used for registration of node with particular dispatcher.
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
// prevent register until we're ready to accept it
if err := d.isRunningLocked(); err != nil {
return "", err
}

if err := d.nodes.CheckRateLimit(nodeID); err != nil {
return "", err
}

// TODO(stevvooe): Validate node specification.
var node *api.Node
d.store.View(func(tx store.ReadTx) {
node = store.GetNode(tx, nodeID)
})
if node == nil {
return "", ErrNodeNotFound
}

func (d *Dispatcher) updateNode(nodeID string, description *api.NodeDescription) (string, error) {
d.nodeUpdatesLock.Lock()
d.nodeUpdates[nodeID] = nodeUpdate{status: &api.NodeStatus{State: api.NodeStatus_READY}, description: description}
numUpdates := len(d.nodeUpdates)
Expand All @@ -395,6 +376,33 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
d.processUpdatesCond.Wait()
d.processUpdatesLock.Unlock()

return "", nil
}

// register is used for registration of node with particular dispatcher.
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
// prevent register until we're ready to accept it
if err := d.isRunningLocked(); err != nil {
return "", err
}

if err := d.nodes.CheckRateLimit(nodeID); err != nil {
return "", err
}

// TODO(stevvooe): Validate node specification.
var node *api.Node
d.store.View(func(tx store.ReadTx) {
node = store.GetNode(tx, nodeID)
})
if node == nil {
return "", ErrNodeNotFound
}

if _, err := d.updateNode(nodeID, description); err != nil {
return "", err
}

expireFunc := func() {
nodeStatus := api.NodeStatus{State: api.NodeStatus_DOWN, Message: "heartbeat failure"}
log.G(ctx).Debugf("heartbeat expiration")
Expand Down Expand Up @@ -787,6 +795,10 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
}
} else {
sessionID = r.SessionID
// update the node description
if _, err := d.updateNode(nodeID, r.Description); err != nil {
return err
}
}

fields := logrus.Fields{
Expand Down

0 comments on commit 3f8ae63

Please # to comment.