Skip to content

Commit

Permalink
dispatcher: migrate to SubConn.Shutdown and deprecate Addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Nov 15, 2023
1 parent 0150a2b commit 2821d85
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
4 changes: 2 additions & 2 deletions broker/protocol/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ func (d *dispatcher) sweep() {
d.mu.Unlock()

for _, sc := range toSweep {
// RemoveSubConn begins SubConn shutdown. We expect to see a
// SubConn.Shutdown begins a shutdown. We expect to see a
// HandleSubConnStateChange with connectivity.Shutdown, at which
// point we'll de-index it.
d.cc.RemoveSubConn(sc)
sc.Shutdown()
}
}

Expand Down
14 changes: 7 additions & 7 deletions broker/protocol/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
cc.created = nil

// Case: Default connection transitions to Ready. Expect it's now returned.
mockSubConn("default.addr").UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
mockSubConn("default.addr").UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

result, err := disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
Expand All @@ -71,7 +71,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {

// Case: Specific remote peer is dispatched to.
ctx = WithDispatchRoute(context.Background(),
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})

result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrNoSubConnAvailable)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
mockSubConn("remote.addr").UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})

ctx = WithDispatchRoute(context.Background(),
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})

_, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrTransientFailure)
Expand Down Expand Up @@ -239,10 +239,10 @@ type mockClientConn struct {

type mockSubConn string

func (s mockSubConn) UpdateAddresses([]resolver.Address) {}
func (s mockSubConn) UpdateAddresses([]resolver.Address) { panic("deprecated") }
func (s mockSubConn) Connect() {}
func (s mockSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, func() {}
return nil, func() {}
}
func (s mockSubConn) Shutdown() {}

Expand All @@ -253,10 +253,10 @@ func (c *mockClientConn) NewSubConn(a []resolver.Address, _ balancer.NewSubConnO
}

func (c *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
c.removed = append(c.removed, sc.(mockSubConn))
panic("deprecated")
}

func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) {}
func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) { panic("deprecated") }
func (c *mockClientConn) UpdateState(balancer.State) {}
func (c *mockClientConn) ResolveNow(resolver.ResolveNowOptions) {}
func (c *mockClientConn) Target() string { return "default.addr" }
Expand Down

0 comments on commit 2821d85

Please # to comment.