Skip to content

Commit

Permalink
feat: add package reachable (#822)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Nov 24, 2021
1 parent efc7fd3 commit 71ab827
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 16 deletions.
2 changes: 1 addition & 1 deletion client/config/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
cachePath = filepath.Join(dfpath.DefaultCacheDir, "daemon_dynconfig")

// Watch dynconfig interval
watchInterval = 5 * time.Second
watchInterval = 10 * time.Second
)

type DynconfigData struct {
Expand Down
50 changes: 36 additions & 14 deletions client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"d7y.io/dragonfly/v2/internal/dfpath"
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/reachable"
"d7y.io/dragonfly/v2/pkg/rpc"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
Expand Down Expand Up @@ -84,7 +85,7 @@ type clientDaemon struct {
PieceManager peer.PieceManager

dynconfig config.Dynconfig
schedulerAddrs []dfnet.NetAddr
schedulers []*manager.Scheduler
schedulerClient schedulerclient.SchedulerClient
}

Expand All @@ -102,6 +103,7 @@ func New(opt *config.DaemonOption) (Daemon, error) {
}

var addrs []dfnet.NetAddr
var schedulers []*manager.Scheduler
var dynconfig config.Dynconfig
if opt.Scheduler.Manager.Enable == true {
// New manager client
Expand All @@ -119,12 +121,11 @@ func New(opt *config.DaemonOption) (Daemon, error) {
}

// Get schedulers from manager
schedulers, err := dynconfig.GetSchedulers()
if err != nil {
if schedulers, err = dynconfig.GetSchedulers(); err != nil {
return nil, err
}

addrs = schedulersToNetAddrs(schedulers)
addrs = schedulersToAvailableNetAddrs(schedulers)
} else {
addrs = opt.Scheduler.NetAddrs
}
Expand Down Expand Up @@ -219,7 +220,7 @@ func New(opt *config.DaemonOption) (Daemon, error) {
StorageManager: storageManager,
GCManager: gc.NewManager(opt.GCInterval.Duration),
dynconfig: dynconfig,
schedulerAddrs: addrs,
schedulers: schedulers,
schedulerClient: sched,
}, nil
}
Expand Down Expand Up @@ -517,24 +518,45 @@ func (cd *clientDaemon) Stop() {
}

func (cd *clientDaemon) OnNotify(data *config.DynconfigData) {
addrs := schedulersToNetAddrs(data.Schedulers)
if reflect.DeepEqual(cd.schedulerAddrs, addrs) {
if reflect.DeepEqual(cd.schedulers, data.Schedulers) {
return
}

// Get the available scheduler addresses and use ip first
addrs := schedulersToAvailableNetAddrs(data.Schedulers)

// Update scheduler client addresses
cd.schedulerClient.UpdateState(addrs)
cd.schedulerAddrs = addrs
cd.schedulers = data.Schedulers
}

// schedulersToNetAddrs coverts []*manager.Scheduler to []dfnet.NetAddr.
func schedulersToNetAddrs(schedulers []*manager.Scheduler) []dfnet.NetAddr {
// schedulersToAvailableNetAddrs coverts []*manager.Scheduler to available []dfnet.NetAddr.
func schedulersToAvailableNetAddrs(schedulers []*manager.Scheduler) []dfnet.NetAddr {
netAddrs := make([]dfnet.NetAddr, 0, len(schedulers))
for _, scheduler := range schedulers {
netAddrs = append(netAddrs, dfnet.NetAddr{
Type: dfnet.TCP,
Addr: fmt.Sprintf("%s:%d", scheduler.HostName, scheduler.Port),
})
// Check whether the ip can be reached
ipReachable := reachable.New(&reachable.Config{Address: fmt.Sprintf("%s:%d", scheduler.Ip, scheduler.Port)})
if err := ipReachable.Check(); err != nil {
logger.Warnf("scheduler address %s:%d is unreachable", scheduler.Ip, scheduler.Port)
} else {
netAddrs = append(netAddrs, dfnet.NetAddr{
Type: dfnet.TCP,
Addr: fmt.Sprintf("%s:%d", scheduler.Ip, scheduler.Port),
})

continue
}

// Check whether the host can be reached
hostReachable := reachable.New(&reachable.Config{Address: fmt.Sprintf("%s:%d", scheduler.HostName, scheduler.Port)})
if err := hostReachable.Check(); err != nil {
logger.Warnf("scheduler address %s:%d is unreachable", scheduler.HostName, scheduler.Port)
} else {
netAddrs = append(netAddrs, dfnet.NetAddr{
Type: dfnet.TCP,
Addr: fmt.Sprintf("%s:%d", scheduler.HostName, scheduler.Port),
})
}
}

return netAddrs
Expand Down
126 changes: 125 additions & 1 deletion client/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,132 @@
package daemon

import (
"net"
"testing"

"github.com/stretchr/testify/assert"

"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
)

func TestPeerHost_Serve(t *testing.T) {
func TestDaemonSchedulersToAvailableNetAddrs(t *testing.T) {
l, err := net.Listen("tcp", ":3000")
if err != nil {
t.Fatal(err)
}
defer l.Close()

tests := []struct {
name string
schedulers []*manager.Scheduler
expect func(t *testing.T, addrs []dfnet.NetAddr)
}{
{
name: "available ip",
schedulers: []*manager.Scheduler{
{
Ip: "127.0.0.1",
Port: int32(3000),
},
{
Ip: "127.0.0.1",
Port: int32(3001),
},
},
expect: func(t *testing.T, addrs []dfnet.NetAddr) {
assert := assert.New(t)
assert.EqualValues(addrs, []dfnet.NetAddr{{Type: dfnet.TCP, Addr: "127.0.0.1:3000"}})
},
},
{
name: "available host",
schedulers: []*manager.Scheduler{
{
Ip: "foo",
HostName: "localhost",
Port: int32(3000),
},
{
Ip: "foo",
HostName: "localhost",
Port: int32(3001),
},
},
expect: func(t *testing.T, addrs []dfnet.NetAddr) {
assert := assert.New(t)
assert.EqualValues(addrs, []dfnet.NetAddr{{Type: dfnet.TCP, Addr: "localhost:3000"}})
},
},
{
name: "available ip and host",
schedulers: []*manager.Scheduler{
{
Ip: "foo",
HostName: "localhost",
Port: int32(3000),
},
{
Ip: "foo",
HostName: "localhost",
Port: int32(3001),
},
{
Ip: "127.0.0.1",
HostName: "foo",
Port: int32(3001),
},
{
Ip: "127.0.0.1",
HostName: "foo",
Port: int32(3000),
},
{
Ip: "127.0.0.1",
HostName: "foo",
Port: int32(3001),
},
},
expect: func(t *testing.T, addrs []dfnet.NetAddr) {
assert := assert.New(t)
assert.EqualValues(addrs, []dfnet.NetAddr{
{Type: dfnet.TCP, Addr: "localhost:3000"},
{Type: dfnet.TCP, Addr: "127.0.0.1:3000"},
})
},
},
{
name: "unreachable",
schedulers: []*manager.Scheduler{
{
Ip: "foo",
HostName: "localhost",
Port: int32(3001),
},
{
Ip: "127.0.0.1",
HostName: "foo",
Port: int32(3001),
},
},
expect: func(t *testing.T, addrs []dfnet.NetAddr) {
assert := assert.New(t)
assert.EqualValues(addrs, []dfnet.NetAddr{})
},
},
{
name: "empty schedulers",
schedulers: []*manager.Scheduler{},
expect: func(t *testing.T, addrs []dfnet.NetAddr) {
assert := assert.New(t)
assert.EqualValues(addrs, []dfnet.NetAddr{})
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, schedulersToAvailableNetAddrs(tc.schedulers))
})
}
}
83 changes: 83 additions & 0 deletions pkg/reachable/reachable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reachable

import (
"fmt"
"net"
"strings"
"time"
)

const (
// DefaultPort is the default tcp port
DefaultPort = "80"
// DefaultNetwork is the default network type
DefaultNetwork = "tcp"
// DefaultTimeout is the default dial timeout
DefaultTimeout = 1 * time.Second
)

type Reachable interface {
// Check that the address can be accessed
Check() error
}

type reachable struct {
address string
network string
timeout time.Duration
}

type Config struct {
Address string
Network string
Timeout time.Duration
}

// New returns a new ReachableInterface interface
func New(r *Config) Reachable {
network := DefaultNetwork
if r.Network != "" {
network = r.Network
}

timeout := DefaultTimeout
if r.Timeout != 0 {
timeout = r.Timeout
}

return &reachable{
address: r.Address,
network: network,
timeout: timeout,
}
}

func (r *reachable) Check() error {
if !strings.Contains(r.address, ":") {
r.address = fmt.Sprintf("%s:%s", r.address, DefaultPort)
}

conn, err := net.DialTimeout(r.network, r.address, r.timeout)
if err != nil {
return err
}
conn.Close()

return nil
}
Loading

0 comments on commit 71ab827

Please # to comment.