diff --git a/go.mod b/go.mod index a82ffd30f36308c0b5eb08c40c43df8d4276ceb7..5be6b93288ec31779c0c2c63c1252f5129ed834d 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/coreos/go-systemd/v22 v22.3.2 github.com/docker/docker v20.10.8+incompatible github.com/google/cadvisor v0.39.3 - github.com/hashicorp/memberlist v0.1.3 + github.com/hashicorp/memberlist v0.3.1 github.com/opencontainers/runc v1.0.2 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.8.1 @@ -149,7 +149,7 @@ require ( github.com/mdlayher/genetlink v1.2.0 // indirect github.com/mdlayher/netlink v1.6.0 // indirect github.com/mdlayher/socket v0.2.3 // indirect - github.com/miekg/dns v1.0.14 // indirect + github.com/miekg/dns v1.1.26 // indirect github.com/miekg/pkcs11 v1.0.3 // indirect github.com/mindprince/gonvml v0.0.0-20190828220739-9ebdce4bb989 // indirect github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible // indirect diff --git a/go.sum b/go.sum index adab41550843972820e702c2e1fdd0a0bfc65708..7db8219251ec7042ce2d0b0a81e984c51148c4f5 100644 --- a/go.sum +++ b/go.sum @@ -609,8 +609,11 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= -github.com/hashicorp/memberlist v0.1.3 h1:EmmoJme1matNzb+hMpDuR/0sbJSUisxyqBGG676r31M= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= +github.com/hashicorp/memberlist v0.3.0 h1:8+567mCcFDnS5ADl7lrpxPMWiFCElyUEeW0gtj34fMA= +github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= +github.com/hashicorp/memberlist v0.3.1 h1:MXgUXLqva1QvpVEDQW1IQLG0wivQAtmFlHRQ+1vWZfM= +github.com/hashicorp/memberlist v0.3.1/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/heketi/heketi v10.3.0+incompatible h1:X4DBFPzcyWZWhia32d94UhDECQJHH0M5kpRb1gxxUHk= github.com/heketi/heketi v10.3.0+incompatible/go.mod h1:bB9ly3RchcQqsQ9CpyaQwvva7RS5ytVoSoholZQON6o= @@ -711,8 +714,9 @@ github.com/mdlayher/netlink v1.6.0/go.mod h1:0o3PlBmGst1xve7wQ7j/hwpNaFaH4qCRyWC github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs= github.com/mdlayher/socket v0.2.3 h1:XZA2X2TjdOwNoNPVPclRCURoX/hokBY8nkTmRZFEheM= github.com/mdlayher/socket v0.2.3/go.mod h1:bz12/FozYNH/VbvC3q7TRIK/Y6dH1kCKsXaUeXi/FmY= -github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= +github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/pkcs11 v1.0.3 h1:iMwmD7I5225wv84WxIG/bmxz9AXjWvTWIbM/TYHvWtw= github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721 h1:RlZweED6sbSArvlE924+mUcZuXKLBHA35U7LN621Bws= @@ -1084,6 +1088,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -1160,6 +1165,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -1252,6 +1258,8 @@ golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190812073006-9eafafc0a87e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191002063906-3421d5a6bb1c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1371,6 +1379,7 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 448f61d1ae124fdf9d82e78f0f3d37ad64fe0b1f..121f0a509a8d3103d77400d0af89ef41f444ad52 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -29,7 +29,7 @@ func New(name string, nodeName string, nodeIP net.IP, port int, anchors []net.IP serviceNet := types.ULA(name, 112, 3) ml := sml.New(nodeName, nodeIP, port, anchors, key, newMeta) - vpn, err := wg.New("wg0", 7124, 1400, nodeAddress.IPNet()) + vpn, err := wg.New("wg0", 7124, nodeAddress.IPNet()) if err != nil { return nil, err } @@ -52,14 +52,24 @@ func New(name string, nodeName string, nodeIP net.IP, port int, anchors []net.IP func (c *Cluster) Run() error { events := c.ml.Events() + err := c.ml.Start() + instr := c.ml.Instr() + instrUpdates := instr.Updates() + if err != nil { + return err + } go c.ml.Run() for { - c.updateVPN() - <-events + select { + case <-events: + c.updateVPN(instr.MinMTU()) + case <-instrUpdates: + c.updateVPN(instr.MinMTU()) + } } } -func (c *Cluster) updateVPN() { +func (c *Cluster) updateVPN(mtu int) { peers := []wg.Peer{} for _, node := range c.ml.Nodes() { if node.Name == c.nodeName { @@ -75,6 +85,6 @@ func (c *Cluster) updateVPN() { } peers = append(peers, peer) } - logrus.Debugf("Updating the VPN mesh with %d peers", len(peers)) - c.vpn.Update(peers) + logrus.Debugf("Updating the VPN mesh with %d peers, MTU %d", len(peers), mtu) + c.vpn.Update(peers, mtu) } diff --git a/pkg/sml/instrumentation.go b/pkg/sml/instrumentation.go new file mode 100644 index 0000000000000000000000000000000000000000..249595204f4647326be8d75b752bb49e320113dc --- /dev/null +++ b/pkg/sml/instrumentation.go @@ -0,0 +1,9 @@ +package sml + +type Instrumentation interface { + // Instrumentation data was updated + Updates() <-chan struct{} + // Get the current minimum observed path MTU with any other node in the + // cluster, useful for setting cluster-wide MTU + MinMTU() int +} diff --git a/pkg/sml/memberlist.go b/pkg/sml/memberlist.go index 27e4a4eccb0c2f7adbd17c229edcc772c7ab1fc7..dcc564dc7f2131be88a8a7de653d6ede84e3c468 100644 --- a/pkg/sml/memberlist.go +++ b/pkg/sml/memberlist.go @@ -26,6 +26,7 @@ type Memberlist struct { nodeCache []Node nodeChanges chan struct{} chans []chan struct{} + transport *instrumentedTransport } func New(nodeName string, nodeIP net.IP, port int, anchors []net.IP, key []byte, newMeta func() NodeMeta) *Memberlist { @@ -57,17 +58,32 @@ func New(nodeName string, nodeIP net.IP, port int, anchors []net.IP, key []byte, return m } -// Start the memberlist cluster main loop, that awaits cluster changes, maintains -// the cluster state and propagates information to channels -func (m *Memberlist) Run() error { - // Initialize memberlist +// Start the memberlist cluster by listening on main sockets +func (m *Memberlist) Start() error { + logrus.Info("Starting the cluster transport") + tc := &memberlist.NetTransportConfig{ + BindAddrs: []string{m.config.BindAddr}, + BindPort: m.config.BindPort, + } + transport, err := NewTransport(tc) + if err != nil { + return err + } + m.transport = transport + m.config.Transport = transport ml, err := memberlist.Create(m.config) if err != nil { return err } m.ml = ml - ticker := time.Tick(10 * time.Second) go m.join() + return nil +} + +// Run the memberlist cluster main loop, that awaits cluster changes, maintains +// the cluster state and propagates information to channels +func (m *Memberlist) Run() error { + ticker := time.Tick(10 * time.Second) for { select { case <-ticker: @@ -98,6 +114,11 @@ func (m *Memberlist) Nodes() []Node { return m.nodeCache } +// Get the instrumentation interface +func (m *Memberlist) Instr() Instrumentation { + return m.transport +} + // Build a local node representation from an upstream memberlist node, using // empty metadata (before decoding) func (m *Memberlist) newNode(mlNode *memberlist.Node) Node { diff --git a/pkg/sml/transport.go b/pkg/sml/transport.go new file mode 100644 index 0000000000000000000000000000000000000000..dfac892529fdf2e6450166076edaf21711ea1399 --- /dev/null +++ b/pkg/sml/transport.go @@ -0,0 +1,79 @@ +package sml + +import ( + "net" + "time" + + "github.com/hashicorp/memberlist" + "golang.org/x/net/ipv6" +) + +// Network transport for memberlist, instrumented with metrology +// and automatic path MTU discovery +type instrumentedTransport struct { + memberlist.NetTransport + // Map between node address and known path MTU + pmtu map[string]int + // Last evaluated minimum path MTU + minPmtu int + // Instrumentation updates + updates chan struct{} +} + +// Create a new instrumented transport +func NewTransport(config *memberlist.NetTransportConfig) (*instrumentedTransport, error) { + nt, err := memberlist.NewNetTransport(config) + if err != nil { + return nil, err + } + return &instrumentedTransport{ + *nt, + make(map[string]int), + 1500, + make(chan struct{}, 100), + }, nil +} + +// See Transport. +func (t *instrumentedTransport) DialAddressTimeout(a memberlist.Address, timeout time.Duration) (net.Conn, error) { + addr := a.Addr + + dialer := net.Dialer{Timeout: timeout} + conn, err := dialer.Dial("tcp", addr) + if err == nil { + wrapped := ipv6.NewConn(conn) + mtu, err := wrapped.PathMTU() + if err != nil { + prev, _ := t.pmtu[addr] + t.pmtu[addr] = mtu + if prev != mtu { + t.updateMinMTU() + } + } + } + return conn, err +} + +// Instrumented transport implements instrumentation +func (t *instrumentedTransport) Updates() <-chan struct{} { + return t.updates +} + +// Instrumented transport implements instrumentation +func (t *instrumentedTransport) MinMTU() int { + return t.minPmtu +} + +// Update the minimum MTU and notify upstream if required +func (t *instrumentedTransport) updateMinMTU() { + min := 1500 + for _, mtu := range t.pmtu { + if mtu < min { + min = mtu + } + } + if min != t.minPmtu { + t.updates <- struct{}{} + } + t.minPmtu = min +} diff --git a/pkg/wg/wireguard.go b/pkg/wg/wireguard.go index b593f75582c3366f852baf817f2f00347feb3e7d..8820b874280fb63bc7199a9cc59e313409807a4f 100644 --- a/pkg/wg/wireguard.go +++ b/pkg/wg/wireguard.go @@ -19,12 +19,11 @@ type Wireguard struct { ipnet *net.IPNet client *wgctrl.Client port int - mtu int privKey wgtypes.Key PubKey wgtypes.Key } -func New(iface string, port int, mtu int, ipnet *net.IPNet) (*Wireguard, error) { +func New(iface string, port int, ipnet *net.IPNet) (*Wireguard, error) { client, err := wgctrl.New() if err != nil { return nil, errors.Wrap(err, "could not instantiate wireguard client") @@ -40,13 +39,12 @@ func New(iface string, port int, mtu int, ipnet *net.IPNet) (*Wireguard, error) ipnet: ipnet, client: client, port: port, - mtu: mtu, privKey: privKey, PubKey: pubKey, }, nil } -func (w *Wireguard) Update(peers []Peer) error { +func (w *Wireguard) Update(peers []Peer, mtu int) error { err := netlink.LinkAdd(&netlink.Wireguard{ LinkAttrs: netlink.LinkAttrs{ Name: w.iface, @@ -79,7 +77,7 @@ func (w *Wireguard) Update(peers []Peer) error { if err != nil { return err } - err = netlink.LinkSetMTU(link, w.mtu) + err = netlink.LinkSetMTU(link, mtu) if err != nil { return err }