Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-10981] Move bandwidth code to pkg/bandwidth #39

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ require (
github.com/godbus/dbus/v5 v5.0.4
github.com/j-keck/arping v1.0.2
github.com/mattn/go-shellwords v1.0.12
github.com/networkplumbing/go-nft v0.2.0
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.15.0
github.com/safchain/ethtool v0.0.0-20210803160452-9aa261dae9b1
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5
github.com/networkplumbing/go-nft v0.4.0
github.com/onsi/ginkgo v1.13.0
github.com/onsi/gomega v1.33.1
github.com/safchain/ethtool v0.4.1
github.com/vishvananda/netlink v1.3.0
golang.org/x/sys v0.21.0
)

Expand All @@ -29,13 +29,15 @@ require (
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo/v2 v2.19.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect
github.com/vishvananda/netns v0.0.4 // indirect
go.opencensus.io v0.22.3 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/text v0.15.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
65 changes: 33 additions & 32 deletions go.sum

Large diffs are not rendered by default.

138 changes: 138 additions & 0 deletions pkg/bandwidth/bandwidth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2025 CNI authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bandwidth

import (
"fmt"
"math"

"github.com/vishvananda/netlink"

"github.com/containernetworking/cni/pkg/types"
current "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/plugins/pkg/ip"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/containernetworking/plugins/pkg/utils"
)

const (
MaxIfbDeviceLength = 15
IfbDevicePrefix = "bwp"
)

// BandwidthEntry corresponds to a single entry in the bandwidth argument,
// see CONVENTIONS.md
type BandwidthEntry struct {
IngressRate uint64 `json:"ingressRate"` // Bandwidth rate in bps for traffic through container. 0 for no limit. If ingressRate is set, ingressBurst must also be set
IngressBurst uint64 `json:"ingressBurst"` // Bandwidth burst in bits for traffic through container. 0 for no limit. If ingressBurst is set, ingressRate must also be set

EgressRate uint64 `json:"egressRate"` // Bandwidth rate in bps for traffic through container. 0 for no limit. If egressRate is set, egressBurst must also be set
EgressBurst uint64 `json:"egressBurst"` // Bandwidth burst in bits for traffic through container. 0 for no limit. If egressBurst is set, egressRate must also be set
}

func (bw *BandwidthEntry) IsZero() bool {
return bw.IngressBurst == 0 && bw.IngressRate == 0 && bw.EgressBurst == 0 && bw.EgressRate == 0
}

type PluginConf struct {
types.NetConf

RuntimeConfig struct {
Bandwidth *BandwidthEntry `json:"bandwidth,omitempty"`
} `json:"runtimeConfig,omitempty"`

*BandwidthEntry
}

func SafeQdiscList(link netlink.Link) ([]netlink.Qdisc, error) {
qdiscs, err := netlink.QdiscList(link)
if err != nil {
return nil, err
}
result := []netlink.Qdisc{}
for _, qdisc := range qdiscs {
// filter out pfifo_fast qdiscs because
// older kernels don't return them
_, pfifo := qdisc.(*netlink.PfifoFast)
if !pfifo {
result = append(result, qdisc)
}
}
return result, nil
}

func ValidateRateAndBurst(rate, burst uint64) error {
switch {
case burst == 0 && rate != 0:
return fmt.Errorf("if rate is set, burst must also be set")
case rate == 0 && burst != 0:
return fmt.Errorf("if burst is set, rate must also be set")
case burst/8 >= math.MaxUint32:
return fmt.Errorf("burst cannot be more than 4GB")
}

return nil
}

func GetBandwidth(conf *PluginConf) *BandwidthEntry {
if conf.BandwidthEntry == nil && conf.RuntimeConfig.Bandwidth != nil {
return conf.RuntimeConfig.Bandwidth
}
return conf.BandwidthEntry
}

func GetIfbDeviceName(networkName string, containerID string) string {
return utils.MustFormatHashWithPrefix(MaxIfbDeviceLength, IfbDevicePrefix, networkName+containerID)
}

func GetMTU(deviceName string) (int, error) {
link, err := netlink.LinkByName(deviceName)
if err != nil {
return -1, err
}

return link.Attrs().MTU, nil
}

// get the veth peer of container interface in host namespace
func GetHostInterface(interfaces []*current.Interface, containerIfName string, netns ns.NetNS) (*current.Interface, error) {
if len(interfaces) == 0 {
return nil, fmt.Errorf("no interfaces provided")
}

// get veth peer index of container interface
var peerIndex int
var err error
_ = netns.Do(func(_ ns.NetNS) error {
_, peerIndex, err = ip.GetVethPeerIfindex(containerIfName)
return nil
})
if peerIndex <= 0 {
return nil, fmt.Errorf("container interface %s has no veth peer: %v", containerIfName, err)
}

// find host interface by index
link, err := netlink.LinkByIndex(peerIndex)
if err != nil {
return nil, fmt.Errorf("veth peer with index %d is not in host ns", peerIndex)
}
for _, iface := range interfaces {
if iface.Sandbox == "" && iface.Name == link.Attrs().Name {
return iface, nil
}
}

return nil, fmt.Errorf("no veth peer of container interface found in host ns")
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package main
package bandwidth

import (
"fmt"
Expand All @@ -24,7 +24,7 @@ import (
"github.com/vishvananda/netlink"
)

const latencyInMillis = 25
const LatencyInMillis = 25

func CreateIfb(ifbDeviceName string, mtu int) error {
err := netlink.LinkAdd(&netlink.Ifb{
Expand Down Expand Up @@ -58,7 +58,7 @@ func CreateIngressQdisc(rateInBits, burstInBits uint64, hostDeviceName string) e
return createTBF(rateInBits, burstInBits, hostDevice.Attrs().Index)
}

func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, ifbDeviceName string) error {
func CreateEgressQdisc(qdiscType string, rateInBits, burstInBits uint64, hostDeviceName string, ifbDeviceName string) error {
ifbDevice, err := netlink.LinkByName(ifbDeviceName)
if err != nil {
return fmt.Errorf("get ifb device: %s", err)
Expand All @@ -68,41 +68,52 @@ func CreateEgressQdisc(rateInBits, burstInBits uint64, hostDeviceName string, if
return fmt.Errorf("get host device: %s", err)
}

// add qdisc ingress on host device
ingress := &netlink.Ingress{
QdiscAttrs: netlink.QdiscAttrs{
LinkIndex: hostDevice.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
Parent: netlink.HANDLE_INGRESS,
},
var qdisc netlink.Qdisc
var filterHandle uint32
switch qdiscType {
case "ingress":
// add ingress qdisc on host device
qdisc = &netlink.Ingress{
QdiscAttrs: netlink.QdiscAttrs{
LinkIndex: hostDevice.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
Parent: netlink.HANDLE_INGRESS,
},
}
filterHandle = qdisc.Attrs().Handle
case "clsact":
// add clsact qdisc on host device
qdisc = &netlink.Clsact{
QdiscAttrs: netlink.QdiscAttrs{
LinkIndex: hostDevice.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0), // ffff:
Parent: netlink.HANDLE_CLSACT,
},
}
filterHandle = netlink.HANDLE_MIN_EGRESS
default:
return fmt.Errorf("unknown qdisc type: %s", qdiscType)
}

err = netlink.QdiscAdd(ingress)
err = netlink.QdiscAdd(qdisc)
if err != nil {
return fmt.Errorf("create ingress qdisc: %s", err)
return fmt.Errorf("create %s qdisc: %s", qdisc.Type(), err)
}

// add filter on host device to mirror traffic to ifb device
filter := &netlink.U32{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: hostDevice.Attrs().Index,
Parent: ingress.QdiscAttrs.Handle,
Parent: filterHandle,
Priority: 1,
Protocol: syscall.ETH_P_ALL,
},
ClassId: netlink.MakeHandle(1, 1),
RedirIndex: ifbDevice.Attrs().Index,
Actions: []netlink.Action{
&netlink.MirredAction{
ActionAttrs: netlink.ActionAttrs{},
MirredAction: netlink.TCA_EGRESS_REDIR,
Ifindex: ifbDevice.Attrs().Index,
},
},
Actions: []netlink.Action{netlink.NewMirredAction(ifbDevice.Attrs().Index)},
}
err = netlink.FilterAdd(filter)
if err != nil {
return fmt.Errorf("add filter: %s", err)
return fmt.Errorf("add egress filter: %s", err)
}

// throttle traffic on ifb device
Expand All @@ -126,9 +137,9 @@ func createTBF(rateInBits, burstInBits uint64, linkIndex int) error {
}
rateInBytes := rateInBits / 8
burstInBytes := burstInBits / 8
bufferInBytes := buffer(uint64(rateInBytes), uint32(burstInBytes))
latency := latencyInUsec(latencyInMillis)
limitInBytes := limit(uint64(rateInBytes), latency, uint32(burstInBytes))
bufferInBytes := Buffer(rateInBytes, uint32(burstInBytes))
latency := LatencyInUsec(LatencyInMillis)
limitInBytes := Limit(rateInBytes, latency, uint32(burstInBytes))

qdisc := &netlink.Tbf{
QdiscAttrs: netlink.QdiscAttrs{
Expand All @@ -155,14 +166,14 @@ func time2Tick(time uint32) uint32 {
return uint32(float64(time) * float64(netlink.TickInUsec()))
}

func buffer(rate uint64, burst uint32) uint32 {
func Buffer(rate uint64, burst uint32) uint32 {
return time2Tick(uint32(float64(burst) * float64(netlink.TIME_UNITS_PER_SEC) / float64(rate)))
}

func limit(rate uint64, latency float64, buffer uint32) uint32 {
func Limit(rate uint64, latency float64, buffer uint32) uint32 {
return uint32(float64(rate)*latency/float64(netlink.TIME_UNITS_PER_SEC)) + buffer
}

func latencyInUsec(latencyInMillis float64) float64 {
func LatencyInUsec(latencyInMillis float64) float64 {
return float64(netlink.TIME_UNITS_PER_SEC) * (latencyInMillis / 1000.0)
}
13 changes: 13 additions & 0 deletions pkg/ip/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,16 @@ func AddDefaultRoute(gw net.IP, dev netlink.Link) error {
_, defNet, _ := net.ParseCIDR("0.0.0.0/0")
return AddRoute(defNet, gw, dev)
}

// IsIPNetZero check if the IPNet is "0.0.0.0/0" or "::/0"
// This is needed as go-netlink replaces nil Dst with a '0' IPNet since
// https://github.com/vishvananda/netlink/commit/acdc658b8613655ddb69f978e9fb4cf413e2b830
func IsIPNetZero(ipnet *net.IPNet) bool {
if ipnet == nil {
return true
}
if ones, _ := ipnet.Mask.Size(); ones != 0 {
return false
}
return ipnet.IP.Equal(net.IPv4zero) || ipnet.IP.Equal(net.IPv6zero)
}
4 changes: 2 additions & 2 deletions pkg/utils/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func DeleteConntrackEntriesForDstIP(dstIP string, protocol uint8) error {
filter.AddIP(netlink.ConntrackOrigDstIP, ip)
filter.AddProtocol(protocol)

_, err := netlink.ConntrackDeleteFilter(netlink.ConntrackTable, family, filter)
_, err := netlink.ConntrackDeleteFilters(netlink.ConntrackTable, family, filter)
if err != nil {
return fmt.Errorf("error deleting connection tracking state for protocol: %d IP: %s, error: %v", protocol, ip, err)
}
Expand All @@ -65,7 +65,7 @@ func DeleteConntrackEntriesForDstPort(port uint16, protocol uint8, family netlin
filter.AddProtocol(protocol)
filter.AddPort(netlink.ConntrackOrigDstPort, port)

_, err := netlink.ConntrackDeleteFilter(netlink.ConntrackTable, family, filter)
_, err := netlink.ConntrackDeleteFilters(netlink.ConntrackTable, family, filter)
if err != nil {
return fmt.Errorf("error deleting connection tracking state for protocol: %d Port: %d, error: %v", protocol, port, err)
}
Expand Down
12 changes: 6 additions & 6 deletions plugins/main/bridge/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func (tester *testerV10x) cmdAddTest(tc testCase, dataDir string) (types.Result,
continue
}
for _, route := range routes {
*found = (route.Dst == nil && route.Src == nil && route.Gw.Equal(gwIP))
*found = (ip.IsIPNetZero(route.Dst) && route.Src == nil && route.Gw.Equal(gwIP))
if *found {
break
}
Expand Down Expand Up @@ -689,7 +689,7 @@ func (tester *testerV10x) cmdCheckTest(tc testCase, conf *Net, dataDir string) {
continue
}
for _, route := range routes {
*found = (route.Dst == nil && route.Src == nil && route.Gw.Equal(gwIP))
*found = (ip.IsIPNetZero(route.Dst) && route.Src == nil && route.Gw.Equal(gwIP))
if *found {
break
}
Expand Down Expand Up @@ -913,7 +913,7 @@ func (tester *testerV04x) cmdAddTest(tc testCase, dataDir string) (types.Result,
continue
}
for _, route := range routes {
*found = (route.Dst == nil && route.Src == nil && route.Gw.Equal(gwIP))
*found = (ip.IsIPNetZero(route.Dst) && route.Src == nil && route.Gw.Equal(gwIP))
if *found {
break
}
Expand Down Expand Up @@ -989,7 +989,7 @@ func (tester *testerV04x) cmdCheckTest(tc testCase, conf *Net, dataDir string) {
continue
}
for _, route := range routes {
*found = (route.Dst == nil && route.Src == nil && route.Gw.Equal(gwIP))
*found = (ip.IsIPNetZero(route.Dst) && route.Src == nil && route.Gw.Equal(gwIP))
if *found {
break
}
Expand Down Expand Up @@ -1212,7 +1212,7 @@ func (tester *testerV03x) cmdAddTest(tc testCase, dataDir string) (types.Result,
continue
}
for _, route := range routes {
*found = (route.Dst == nil && route.Src == nil && route.Gw.Equal(gwIP))
*found = (ip.IsIPNetZero(route.Dst) && route.Src == nil && route.Gw.Equal(gwIP))
if *found {
break
}
Expand Down Expand Up @@ -1431,7 +1431,7 @@ func (tester *testerV01xOr02x) cmdAddTest(tc testCase, dataDir string) (types.Re
continue
}
for _, route := range routes {
*found = (route.Dst == nil && route.Src == nil && route.Gw.Equal(gwIP))
*found = (ip.IsIPNetZero(route.Dst) && route.Src == nil && route.Gw.Equal(gwIP))
if *found {
break
}
Expand Down
Loading
Loading