Skip to content

Commit

Permalink
Add iperf3-based e2e tests to make sure stitching works
Browse files Browse the repository at this point in the history
This includes an iperf3 upload test that fails with
"Fix TCP stitching in the UL direction" patch
(2531270) reverted.
  • Loading branch information
ivan4th committed Nov 4, 2021
1 parent a4673d5 commit a44fd0d
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax = docker/dockerfile:experimental
# the following is updated automatically by make update-build-image-tag
FROM quay.io/travelping/upg-build:10f03c8684150c9d0b492f050ca14d1e AS build-stage
FROM quay.io/travelping/upg-build:5a3ed8c846175982e2e70a5edd3db8d4 AS build-stage

ADD vpp /src/vpp
ADD upf /src/upf
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.build
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ RUN --mount=target=/var/lib/apt/lists,type=cache,sharing=private \
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
build-essential sudo git netbase curl ca-certificates \
golang-go iproute2 gdb tcpdump iputils-ping libpcap-dev \
dumb-init && \
dumb-init iperf3 && \
curl -sSL "https://github.com/moby/buildkit/releases/download/${BUILDKIT_VERSION}/buildkit-${BUILDKIT_VERSION}.linux-amd64.tar.gz" | \
tar -xvz -C /usr/local bin/buildctl && \
echo "${BUILDCTL_SHA256} /usr/local/bin/buildctl" | sha256sum -c && \
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.devel
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax = docker/dockerfile:experimental
# the following is updated automatically by make update-build-image-tag
FROM quay.io/travelping/upg-build:10f03c8684150c9d0b492f050ca14d1e AS build-stage
FROM quay.io/travelping/upg-build:5a3ed8c846175982e2e70a5edd3db8d4 AS build-stage

ADD vpp /src/vpp
ADD upf /src/upf
Expand Down
108 changes: 108 additions & 0 deletions test/e2e/framework/iperf3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package framework

import (
"bytes"
"context"
"encoding/json"
"math"
"net"
"os/exec"
"strconv"
"time"

"github.com/pkg/errors"
"github.com/travelping/upg-vpp/test/e2e/network"
)

type IPerf3 struct {
ServerMode bool
Duration time.Duration
NS *network.NetNS
ServerIP net.IP
Reverse bool
cmd *exec.Cmd
}

type IPerfResult struct {
Error string `json:"error"`
End IPerfResultEnd `json:"end"`
}

type IPerfResultEnd struct {
SumSent IPerfResultByteStats `json:"sum_sent"`
SumReceived IPerfResultByteStats `json:"sum_received"`
}

type IPerfResultByteStats struct {
Bytes uint32 `json:"bytes"`
}

func (ipf *IPerf3) Start() error {
args := []string{
"--net=" + ipf.NS.Path(),
"iperf3",
"-J", // JSON output
}

if ipf.ServerMode {
args = append(args, "-s", "-1") // -1 means one-off
} else {
args = append(
args, "-c", ipf.ServerIP.String(),
"-t", strconv.Itoa(int(math.Round(ipf.Duration.Seconds()))))
}

if ipf.Reverse {
args = append(args, "-R")
}

ipf.cmd = exec.Command("nsenter", args...)
ipf.cmd.Stdout = &bytes.Buffer{}
ipf.cmd.Stderr = &bytes.Buffer{}
if err := ipf.cmd.Start(); err != nil {
return errors.Wrap(err, "error starting iperf3")
}

return nil
}

func (ipf *IPerf3) Kill() {
if !ipf.cmd.ProcessState.Exited() {
ipf.cmd.Process.Kill()
}
}

func (ipf *IPerf3) Wait(ctx context.Context) (*IPerfResult, error) {
doneCh := make(chan struct{})
defer close(doneCh)
go func() {
select {
case <-ctx.Done():
ipf.Kill()
case <-doneCh:
}
}()

// In JSON mode (-J), iperf3 doesn't print anything on stderr,
// but there can also be an error message from nsenter
runErr := ipf.cmd.Wait()
if runErr != nil {
errMsg := ipf.cmd.Stderr.(*bytes.Buffer).Bytes()
if len(errMsg) != 0 {
return nil, errors.Wrapf(runErr, "nsenter/iperf3 failed:\n%s", errMsg)
}
// no error message from stderr, need to parse stdout below
}

out := ipf.cmd.Stdout.(*bytes.Buffer)
var r IPerfResult
if err := json.Unmarshal(out.Bytes(), &r); err != nil {
return nil, errors.Wrapf(err, "error unmarshalling iperf3 result:\n%s", out.Bytes())
}

if runErr != nil {
return nil, errors.Wrapf(runErr, "error running iperf3: %s", r.Error)
}

return &r, nil
}
168 changes: 150 additions & 18 deletions test/e2e/upg_e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,24 @@ func describeMeasurement(f *framework.Framework) {
}
verifyNonAppMeasurement(f, ms, proto, nil)
})

ginkgo.It("works with iperf3", func() {
verifyIPerf3(f, false)
// FIXME: in case of iperf3 run, e2e traffic measurements may
// be imprecise. We might need to wait longer to make sure
// all of the data are sent
// https://github.com/esnet/iperf/issues/994
// ms = deleteSession(f, seid, true)
// verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil)
})

ginkgo.It("works with iperf3 [reverse]", func() {
verifyIPerf3(f, true)
// FIXME: possible imprecise measurement that's not an UPG
// bug, see above
// ms = deleteSession(f, seid, true)
// verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil)
})
})

ginkgo.Context("[ip rules]", func() {
Expand Down Expand Up @@ -265,6 +283,58 @@ func describeMeasurement(f *framework.Framework) {
ginkgo.It("can survive session creation-deletion loop", func() {
verifySessionDeletionLoop(f, &seid)
})

ginkgo.It("works with iperf3", func() {
out, err := f.VPP.Ctl("show upf proxy")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(out).To(gomega.ContainSubstring("Force stitching: on"))

verifyIPerf3(f, false)
// FIXME: possible imprecise measurement that's not an UPG
// bug, see above
// ms = deleteSession(f, seid, true)
// verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil)
})

ginkgo.It("works with iperf3 [reverse]", func() {
out, err := f.VPP.Ctl("show upf proxy")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(out).To(gomega.ContainSubstring("Force stitching: on"))

verifyIPerf3(f, true)
// FIXME: possible imprecise measurement that's not an UPG
// bug, see above
// ms = deleteSession(f, seid, true)
// verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil)
})

ginkgo.It("works with iperf3 [no force-stitching]", func() {
_, err := f.VPP.Ctl("set upf proxy force-stitching off")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
out, err := f.VPP.Ctl("show upf proxy")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(out).To(gomega.ContainSubstring("Force stitching: off"))

verifyIPerf3(f, false)
// FIXME: possible imprecise measurement that's not an UPG
// bug, see above
// ms = deleteSession(f, seid, true)
// verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil)
})

ginkgo.It("works with iperf3 [no force-stitching] [reverse]", func() {
_, err := f.VPP.Ctl("set upf proxy force-stitching off")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
out, err := f.VPP.Ctl("show upf proxy")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(out).To(gomega.ContainSubstring("Force stitching: off"))

verifyIPerf3(f, true)
// FIXME: possible imprecise measurement that's not an UPG
// bug, see above
// ms = deleteSession(f, seid, true)
// verifyNonAppMeasurement(f, ms, layers.IPProtocolTCP, nil)
})
})

sessionContext("[redirects]", framework.SessionConfig{Redirect: true}, func() {
Expand Down Expand Up @@ -593,7 +663,8 @@ var _ = ginkgo.Describe("[Reporting]", func() {
gomega.Expect(string(out)).To(gomega.ContainSubstring("Monitoring Time"))

ginkgo.By("Starting some traffic")
tg, clientNS, serverNS := newTrafficGen(f, &traffic.UDPPingConfig{
clientNS, serverNS := getClientAndServerNamespaces(f)
tg := newTrafficGen(f, &traffic.UDPPingConfig{
PacketCount: 50, // 5s
Retry: true,
Delay: 100 * time.Millisecond,
Expand Down Expand Up @@ -668,7 +739,8 @@ var _ = ginkgo.Describe("[Reporting]", func() {
gomega.Expect(string(out)).To(gomega.ContainSubstring("Monitoring Time"))

ginkgo.By("Starting some traffic")
tg, clientNS, serverNS := newTrafficGen(f, &traffic.UDPPingConfig{
clientNS, serverNS := getClientAndServerNamespaces(f)
tg := newTrafficGen(f, &traffic.UDPPingConfig{
PacketCount: 180, // 18s
Retry: true,
Delay: 100 * time.Millisecond,
Expand Down Expand Up @@ -783,7 +855,8 @@ var _ = ginkgo.Describe("[Reporting]", func() {
gomega.Expect(string(out)).To(gomega.ContainSubstring(seidHex))

ginkgo.By("Starting some traffic")
tg, clientNS, serverNS := newTrafficGen(f, &traffic.UDPPingConfig{
clientNS, serverNS := getClientAndServerNamespaces(f)
tg := newTrafficGen(f, &traffic.UDPPingConfig{
PacketCount: 180, // 30s, but will be stopped when VPP exits
Retry: true,
Delay: 100 * time.Millisecond,
Expand Down Expand Up @@ -1227,7 +1300,17 @@ func deleteSessions(f *framework.Framework, seids []pfcp.SEID, showInfo bool) []
return ms
}

func newTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffic.TrafficRec) (*traffic.TrafficGen, *network.NetNS, *network.NetNS) {
func getClientAndServerNamespaces(f *framework.Framework) (*network.NetNS, *network.NetNS) {
var serverNS *network.NetNS
if f.Mode == framework.UPGModeGTPProxy {
serverNS = f.VPP.GetNS("srv")
} else {
serverNS = f.VPP.GetNS("sgi")
}
return f.VPP.GetNS("ue"), serverNS
}

func newTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffic.TrafficRec) *traffic.TrafficGen {
ginkgo.By("starting the traffic generator")
cfg.SetNoLinger(true)
if !cfg.HasServerIP() {
Expand All @@ -1242,24 +1325,19 @@ func newTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffi
cfg.AddServerIP(f.AddServerIP())
}
}
clientNS := f.VPP.GetNS("ue")
var serverNS *network.NetNS
if f.Mode == framework.UPGModeGTPProxy {
serverNS = f.VPP.GetNS("srv")
} else {
serverNS = f.VPP.GetNS("sgi")
}
return traffic.NewTrafficGen(cfg, rec), clientNS, serverNS
return traffic.NewTrafficGen(cfg, rec)
}

func runTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffic.TrafficRec) {
tg, clientNS, serverNS := newTrafficGen(f, cfg, rec)
clientNS, serverNS := getClientAndServerNamespaces(f)
tg := newTrafficGen(f, cfg, rec)
framework.ExpectNoError(tg.Run(f.Context, clientNS, serverNS))
}

func verifyConnFlood(f *framework.Framework, netem bool) {
clientNS, serverNS := getClientAndServerNamespaces(f)
rec := &traffic.SimpleTrafficRec{}
tg, clientNS, serverNS := newTrafficGen(f, &traffic.HTTPConfig{
tg := newTrafficGen(f, &traffic.HTTPConfig{
Retry: true,
SimultaneousCount: 400, // TODO: 5000 works with bigger chunks but takes up too much memory
Persist: true,
Expand Down Expand Up @@ -1305,8 +1383,9 @@ func verifyConnFlood(f *framework.Framework, netem bool) {
}

// make sure UPG and the session are still alive after the stress test
clientNS, serverNS = getClientAndServerNamespaces(f)
rec = &traffic.SimpleTrafficRec{}
tg, clientNS, serverNS = newTrafficGen(f, &traffic.UDPPingConfig{
tg = newTrafficGen(f, &traffic.UDPPingConfig{
PacketCount: 3,
Retry: true,
}, rec)
Expand All @@ -1315,7 +1394,8 @@ func verifyConnFlood(f *framework.Framework, netem bool) {

func verifySessionDeletionLoop(f *framework.Framework, seid *pfcp.SEID) {
rec := &traffic.SimpleTrafficRec{}
tg, clientNS, serverNS := newTrafficGen(f, &traffic.HTTPConfig{
clientNS, serverNS := getClientAndServerNamespaces(f)
tg := newTrafficGen(f, &traffic.HTTPConfig{
Retry: true,
SimultaneousCount: 400, // TODO: 5000 works with bigger chunks but takes up too much memory
Persist: true,
Expand Down Expand Up @@ -1354,16 +1434,18 @@ LOOP:
*seid = startMeasurementSession(f, &framework.SessionConfig{})
}
// make sure UPG and the session are still alive after the stress test
clientNS, serverNS = getClientAndServerNamespaces(f)
rec = &traffic.SimpleTrafficRec{}
tg, clientNS, serverNS = newTrafficGen(f, &traffic.UDPPingConfig{
tg = newTrafficGen(f, &traffic.UDPPingConfig{
PacketCount: 3,
Retry: true,
}, rec)
framework.ExpectNoError(tg.Run(f.Context, clientNS, serverNS))
}

func startTrafficGen(f *framework.Framework, cfg traffic.TrafficConfig, rec traffic.TrafficRec) chan error {
tg, clientNS, serverNS := newTrafficGen(f, cfg, rec)
clientNS, serverNS := getClientAndServerNamespaces(f)
tg := newTrafficGen(f, cfg, rec)
return tg.Start(f.Context, clientNS, serverNS)
}

Expand Down Expand Up @@ -1520,3 +1602,53 @@ func verifyPSDBU(m message.Message, numUsageReports int) {
}
}
}

func verifyIPerf3(f *framework.Framework, reverse bool) {
clientNS, serverNS := getClientAndServerNamespaces(f)

serverIPerf3 := &framework.IPerf3{
ServerMode: true,
NS: serverNS,
}
gomega.Expect(serverIPerf3.Start()).To(gomega.Succeed())
defer func() {
serverIPerf3.Kill() // does nothing if the process has exited
}()

clientIPerf3 := &framework.IPerf3{
ServerMode: false,
Duration: 10 * time.Second,
NS: clientNS,
ServerIP: f.ServerIP(),
Reverse: reverse,
}
gomega.Expect(clientIPerf3.Start()).To(gomega.Succeed())
defer func() {
clientIPerf3.Kill()
}()

clientResult, err := clientIPerf3.Wait(f.Context)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

serverResult, err := serverIPerf3.Wait(f.Context)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

framework.Logf("iperf3: %d bytes sent, %d bytes received",
clientResult.End.SumSent.Bytes,
clientResult.End.SumReceived.Bytes)

gomega.Expect(clientResult.End.SumSent.Bytes).
To(gomega.BeNumerically(">", 50000000),
"low iperf3 transfer volume")
gomega.Expect(clientResult.End.SumReceived.Bytes).
To(gomega.BeNumerically(">", clientResult.End.SumSent.Bytes/2),
"high loss reported by iperf3")

if reverse {
gomega.Expect(clientResult.End.SumSent.Bytes).
To(gomega.Equal(serverResult.End.SumSent.Bytes))
} else {
gomega.Expect(clientResult.End.SumReceived.Bytes).
To(gomega.Equal(serverResult.End.SumReceived.Bytes))
}
}

0 comments on commit a44fd0d

Please # to comment.