Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Expand on endpoint selection strategy #31

Merged
merged 1 commit into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/loadtest/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ func buildCLI(cli *CLIConfig, logger logging.Logger) *cobra.Command {
rootCmd.PersistentFlags().IntVarP(&cfg.Count, "count", "N", -1, "The maximum number of transactions to send - set to -1 to turn off this limit")
rootCmd.PersistentFlags().StringVar(&cfg.BroadcastTxMethod, "broadcast-tx-method", "async", "The broadcast_tx method to use when submitting transactions - can be async, sync or commit")
rootCmd.PersistentFlags().StringSliceVar(&cfg.Endpoints, "endpoints", []string{}, "A comma-separated list of URLs indicating Tendermint WebSockets RPC endpoints to which to connect")
rootCmd.PersistentFlags().StringVar(&cfg.EndpointSelectMethod, "endpoint-select-method", SelectGivenEndpoints, "The method by which to select endpoints")
rootCmd.PersistentFlags().StringVar(&cfg.EndpointSelectMethod, "endpoint-select-method", SelectSuppliedEndpoints, "The method by which to select endpoints")
rootCmd.PersistentFlags().IntVar(&cfg.ExpectPeers, "expect-peers", 0, "The minimum number of peers to expect when crawling the P2P network from the specified endpoint(s) prior to waiting for slaves to connect")
rootCmd.PersistentFlags().IntVar(&cfg.MaxEndpoints, "max-endpoints", 0, "The maximum number of endpoints to use for testing, where 0 means unlimited")
rootCmd.PersistentFlags().IntVar(&cfg.PeerConnectTimeout, "peer-connect-timeout", 600, "The number of seconds to wait for all required peers to connect if expect-peers > 0")
rootCmd.PersistentFlags().BoolVarP(&flagVerbose, "verbose", "v", false, "Increase output logging verbosity to DEBUG level")

Expand Down
14 changes: 10 additions & 4 deletions pkg/loadtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
)

const (
SelectGivenEndpoints = "given-endpoints" // Select only the given endpoint(s) for load testing (the default).
SelectCrawledEndpoints = "crawled-endpoints" // Select all crawled endpoint addresses (only valid if ExpectPeers is specified in the configuration).
SelectSuppliedEndpoints = "supplied" // Select only the supplied endpoint(s) for load testing (the default).
SelectDiscoveredEndpoints = "discovered" // Select newly discovered endpoints only (excluding supplied endpoints).
SelectAnyEndpoints = "any" // Select from any of supplied and/or discovered endpoints.
)

var validEndpointSelectMethods = map[string]interface{}{
SelectGivenEndpoints: nil,
SelectCrawledEndpoints: nil,
SelectSuppliedEndpoints: nil,
SelectDiscoveredEndpoints: nil,
SelectAnyEndpoints: nil,
}

// Config represents the configuration for a single client (i.e. standalone or
Expand All @@ -29,6 +31,7 @@ type Config struct {
Endpoints []string `json:"endpoints"` // A list of the Tendermint node endpoints to which to connect for this load test.
EndpointSelectMethod string `json:"endpoint_select_method"` // The method by which to select endpoints for load testing.
ExpectPeers int `json:"expect_peers"` // The minimum number of peers to expect before starting a load test. Set to 0 by default (no minimum).
MaxEndpoints int `json:"max_endpoints"` // The maximum number of endpoints to use for load testing. Set to 0 by default (no maximum).
PeerConnectTimeout int `json:"peer_connect_timeout"` // The maximum time to wait (in seconds) for all peers to connect, if ExpectPeers > 0.
}

Expand Down Expand Up @@ -93,6 +96,9 @@ func (c Config) Validate() error {
if c.ExpectPeers > 0 && c.PeerConnectTimeout < 1 {
return fmt.Errorf("peer-connect-timeout must be at least 1 if expect-peers is non-zero, but got %d", c.PeerConnectTimeout)
}
if c.MaxEndpoints < 0 {
return fmt.Errorf("invalid value for max-endpoints: %d", c.MaxEndpoints)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/loadtest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func testConfig() loadtest.Config {
Count: totalTxsPerSlave,
BroadcastTxMethod: "async",
Endpoints: []string{getRPCAddress()},
EndpointSelectMethod: loadtest.SelectGivenEndpoints,
EndpointSelectMethod: loadtest.SelectSuppliedEndpoints,
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/loadtest/loadtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ func executeLoadTest(cfg Config) error {
if cfg.ExpectPeers > 0 {
peers, err := waitForTendermintNetworkPeers(
cfg.Endpoints,
cfg.EndpointSelectMethod,
cfg.ExpectPeers,
cfg.MaxEndpoints,
time.Duration(cfg.PeerConnectTimeout)*time.Second,
logger,
)
if err != nil {
logger.Error("Failed while waiting for peers to connect", "err", err)
return err
}
if cfg.EndpointSelectMethod == SelectCrawledEndpoints {
if cfg.EndpointSelectMethod == SelectDiscoveredEndpoints {
cfg.Endpoints = peers
}
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/loadtest/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type Master struct {
totalTxsPerSlave map[string]int // The number of transactions sent by each slave.

// Prometheus metrics
stateMetric prometheus.Gauge // A code-based status metric for representing the master's current state.
totalTxsMetric prometheus.Gauge // The total number of transactions sent by all slaves.
txRateMetric prometheus.Gauge // The transaction throughput rate (tx/sec) as measured by the master since the last metrics update.
stateMetric prometheus.Gauge // A code-based status metric for representing the master's current state.
totalTxsMetric prometheus.Gauge // The total number of transactions sent by all slaves.
txRateMetric prometheus.Gauge // The transaction throughput rate (tx/sec) as measured by the master since the last metrics update.
overallTxRateMetric prometheus.Gauge // The overall transaction throughput rate (tx/sec) as measured by the master since the beginning of the load test.
}

Expand Down Expand Up @@ -208,14 +208,16 @@ func (m *Master) waitForPeers() error {
m.stateMetric.Set(masterWaitingForPeers)
peers, err := waitForTendermintNetworkPeers(
m.cfg.Endpoints,
m.cfg.EndpointSelectMethod,
m.cfg.ExpectPeers,
m.cfg.MaxEndpoints,
time.Duration(m.cfg.PeerConnectTimeout)*time.Second,
m.logger,
)
if err != nil {
return err
}
if m.cfg.EndpointSelectMethod == SelectCrawledEndpoints {
if m.cfg.EndpointSelectMethod == SelectDiscoveredEndpoints {
m.cfg.Endpoints = peers
}
return nil
Expand Down
46 changes: 37 additions & 9 deletions pkg/loadtest/tm_network_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,39 @@ type tendermintPeerInfo struct {
// must be present when polled repeatedly for a period of time).
func waitForTendermintNetworkPeers(
startingPeerAddrs []string,
minPeers int,
selectionMethod string,
minDiscoveredPeers int,
maxReturnedPeers int,
timeout time.Duration,
logger logging.Logger,
) ([]string, error) {
logger.Info(
"Waiting for peers to connect",
"minPeers", minPeers,
"minDiscoveredPeers", minDiscoveredPeers,
"maxReturnedPeers", maxReturnedPeers,
"timeout", fmt.Sprintf("%.2f seconds", timeout.Seconds()),
"selectionMethod", selectionMethod,
)

startTime := time.Now()
peers := make(map[string]*tendermintPeerInfo)
suppliedPeers := make(map[string]*tendermintPeerInfo)
for _, peerURL := range startingPeerAddrs {
u, err := url.Parse(peerURL)
if err != nil {
return nil, fmt.Errorf("failed to parse peer URL %s: %s", peerURL, err)
}
peerAddr := fmt.Sprintf("http://%s:26657", u.Hostname())
peers[peerAddr] = &tendermintPeerInfo{
suppliedPeers[peerAddr] = &tendermintPeerInfo{
Addr: peerAddr,
Client: client.NewHTTP(peerAddr, "/websocket"),
PeerAddrs: make([]string, 0),
}
}
peers := make(map[string]*tendermintPeerInfo)
for a, p := range suppliedPeers {
pc := *p
peers[a] = &pc
}
for {
remainingTimeout := timeout - time.Since(startTime)
if remainingTimeout < 0 {
Expand All @@ -64,10 +73,10 @@ func waitForTendermintNetworkPeers(
if len(newPeers) > len(peers) {
peers = newPeers
}
if len(peers) >= minPeers {
if len(peers) >= minDiscoveredPeers {
logger.Info("All required peers connected", "count", len(peers))
// we're done here
return peerMapToList(peers, minPeers), nil
return filterTendermintPeerMap(suppliedPeers, peers, selectionMethod, maxReturnedPeers), nil
} else {
logger.Debug("Peers discovered so far", "count", len(peers), "peers", peers)
time.Sleep(1 * time.Second)
Expand Down Expand Up @@ -150,10 +159,29 @@ func resolveTendermintPeerMap(peers map[string]*tendermintPeerInfo) map[string]*
return result
}

func peerMapToList(peers map[string]*tendermintPeerInfo, maxCount int) []string {
func filterTendermintPeerMap(suppliedPeers, newPeers map[string]*tendermintPeerInfo, selectionMethod string, maxCount int) []string {
result := make([]string, 0)
for _, peer := range peers {
result = append(result, peer.Addr)
for peerAddr := range newPeers {
u, err := url.Parse(peerAddr)
if err != nil {
continue
}
addr := fmt.Sprintf("ws://%s:26657/websocket", u.Hostname())
switch selectionMethod {
case SelectSuppliedEndpoints:
// only add it to the result if it was in the original list
if _, ok := suppliedPeers[peerAddr]; ok {
result = append(result, addr)
}
case SelectDiscoveredEndpoints:
// only add it to the result if it wasn't in the original list
if _, ok := suppliedPeers[peerAddr]; !ok {
result = append(result, addr)
}
default:
// otherwise, always add it
result = append(result, addr)
}
if len(result) >= maxCount {
break
}
Expand Down