net/tstun,wgengine/netstack: implement TCP GRO for local services (#13315)

Throughput improves substantially when measured via netstack loopback
(TS_DEBUG_NETSTACK_LOOPBACK_PORT).

Before (d21ebc2):
jwhited@i5-12400-2:~$ iperf3 -V -c 100.100.100.100
Starting Test: protocol: TCP, 1 streams, 131072 byte blocks
Test Complete. Summary Results:
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.00  sec  5.77 GBytes  4.95 Gbits/sec    0 sender
[  5]   0.00-10.01  sec  5.77 GBytes  4.95 Gbits/sec      receiver

After:
jwhited@i5-12400-2:~$ iperf3 -V -c 100.100.100.100
Starting Test: protocol: TCP, 1 streams, 131072 byte blocks
Test Complete. Summary Results:
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.00  sec  12.7 GBytes  10.9 Gbits/sec    0 sender
[  5]   0.00-10.00  sec  12.7 GBytes  10.9 Gbits/sec      receiver

Updates tailscale/corp#22754

Signed-off-by: Jordan Whited <jordan@tailscale.com>
pull/13225/head
Jordan Whited 3 weeks ago committed by GitHub
parent 71acf87830
commit 0926954cf5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -177,7 +177,7 @@ type Wrapper struct {
// for packets from the local system. This filter is populated by netstack to hook // for packets from the local system. This filter is populated by netstack to hook
// packets that should be handled by netstack. If set, this filter runs before // packets that should be handled by netstack. If set, this filter runs before
// PreFilterFromTunToEngine. // PreFilterFromTunToEngine.
PreFilterPacketOutboundToWireGuardNetstackIntercept FilterFunc PreFilterPacketOutboundToWireGuardNetstackIntercept GROFilterFunc
// PreFilterPacketOutboundToWireGuardEngineIntercept is a filter function that runs before the main filter // PreFilterPacketOutboundToWireGuardEngineIntercept is a filter function that runs before the main filter
// for packets from the local system. This filter is populated by wgengine to hook // for packets from the local system. This filter is populated by wgengine to hook
// packets which it handles internally. If both this and PreFilterFromTunToNetstack // packets which it handles internally. If both this and PreFilterFromTunToNetstack
@ -811,7 +811,7 @@ var (
magicDNSIPPortv6 = netip.AddrPortFrom(tsaddr.TailscaleServiceIPv6(), 0) magicDNSIPPortv6 = netip.AddrPortFrom(tsaddr.TailscaleServiceIPv6(), 0)
) )
func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConfigTable) filter.Response { func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConfigTable, gro *gro.GRO) (filter.Response, *gro.GRO) {
// Fake ICMP echo responses to MagicDNS (100.100.100.100). // Fake ICMP echo responses to MagicDNS (100.100.100.100).
if p.IsEchoRequest() { if p.IsEchoRequest() {
switch p.Dst { switch p.Dst {
@ -820,13 +820,13 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf
header.ToResponse() header.ToResponse()
outp := packet.Generate(&header, p.Payload()) outp := packet.Generate(&header, p.Payload())
t.InjectInboundCopy(outp) t.InjectInboundCopy(outp)
return filter.DropSilently // don't pass on to OS; already handled return filter.DropSilently, gro // don't pass on to OS; already handled
case magicDNSIPPortv6: case magicDNSIPPortv6:
header := p.ICMP6Header() header := p.ICMP6Header()
header.ToResponse() header.ToResponse()
outp := packet.Generate(&header, p.Payload()) outp := packet.Generate(&header, p.Payload())
t.InjectInboundCopy(outp) t.InjectInboundCopy(outp)
return filter.DropSilently // don't pass on to OS; already handled return filter.DropSilently, gro // don't pass on to OS; already handled
} }
} }
@ -838,20 +838,22 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf
t.isSelfDisco(p) { t.isSelfDisco(p) {
t.limitedLogf("[unexpected] received self disco out packet over tstun; dropping") t.limitedLogf("[unexpected] received self disco out packet over tstun; dropping")
metricPacketOutDropSelfDisco.Add(1) metricPacketOutDropSelfDisco.Add(1)
return filter.DropSilently return filter.DropSilently, gro
} }
if t.PreFilterPacketOutboundToWireGuardNetstackIntercept != nil { if t.PreFilterPacketOutboundToWireGuardNetstackIntercept != nil {
if res := t.PreFilterPacketOutboundToWireGuardNetstackIntercept(p, t); res.IsDrop() { var res filter.Response
res, gro = t.PreFilterPacketOutboundToWireGuardNetstackIntercept(p, t, gro)
if res.IsDrop() {
// Handled by netstack.Impl.handleLocalPackets (quad-100 DNS primarily) // Handled by netstack.Impl.handleLocalPackets (quad-100 DNS primarily)
return res return res, gro
} }
} }
if t.PreFilterPacketOutboundToWireGuardEngineIntercept != nil { if t.PreFilterPacketOutboundToWireGuardEngineIntercept != nil {
if res := t.PreFilterPacketOutboundToWireGuardEngineIntercept(p, t); res.IsDrop() { if res := t.PreFilterPacketOutboundToWireGuardEngineIntercept(p, t); res.IsDrop() {
// Handled by userspaceEngine.handleLocalPackets (primarily handles // Handled by userspaceEngine.handleLocalPackets (primarily handles
// quad-100 if netstack is not installed). // quad-100 if netstack is not installed).
return res return res, gro
} }
} }
@ -864,7 +866,7 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf
filt = t.filter.Load() filt = t.filter.Load()
} }
if filt == nil { if filt == nil {
return filter.Drop return filter.Drop, gro
} }
if filt.RunOut(p, t.filterFlags) != filter.Accept { if filt.RunOut(p, t.filterFlags) != filter.Accept {
@ -872,15 +874,15 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf
metricOutboundDroppedPacketsTotal.Add(dropPacketLabel{ metricOutboundDroppedPacketsTotal.Add(dropPacketLabel{
Reason: DropReasonACL, Reason: DropReasonACL,
}, 1) }, 1)
return filter.Drop return filter.Drop, gro
} }
if t.PostFilterPacketOutboundToWireGuard != nil { if t.PostFilterPacketOutboundToWireGuard != nil {
if res := t.PostFilterPacketOutboundToWireGuard(p, t); res.IsDrop() { if res := t.PostFilterPacketOutboundToWireGuard(p, t); res.IsDrop() {
return res return res, gro
} }
} }
return filter.Accept return filter.Accept, gro
} }
// noteActivity records that there was a read or write at the current time. // noteActivity records that there was a read or write at the current time.
@ -919,6 +921,7 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) {
defer parsedPacketPool.Put(p) defer parsedPacketPool.Put(p)
captHook := t.captureHook.Load() captHook := t.captureHook.Load()
pc := t.peerConfig.Load() pc := t.peerConfig.Load()
var buffsGRO *gro.GRO
for _, data := range res.data { for _, data := range res.data {
p.Decode(data[res.dataOffset:]) p.Decode(data[res.dataOffset:])
@ -931,7 +934,8 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) {
captHook(capture.FromLocal, t.now(), p.Buffer(), p.CaptureMeta) captHook(capture.FromLocal, t.now(), p.Buffer(), p.CaptureMeta)
} }
if !t.disableFilter { if !t.disableFilter {
response := t.filterPacketOutboundToWireGuard(p, pc) var response filter.Response
response, buffsGRO = t.filterPacketOutboundToWireGuard(p, pc, buffsGRO)
if response != filter.Accept { if response != filter.Accept {
metricPacketOutDrop.Add(1) metricPacketOutDrop.Add(1)
continue continue
@ -951,6 +955,9 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) {
} }
buffsPos++ buffsPos++
} }
if buffsGRO != nil {
buffsGRO.Flush()
}
// t.vectorBuffer has a fixed location in memory. // t.vectorBuffer has a fixed location in memory.
// TODO(raggi): add an explicit field and possibly method to the tunVectorReadResult // TODO(raggi): add an explicit field and possibly method to the tunVectorReadResult

@ -615,7 +615,7 @@ func TestFilterDiscoLoop(t *testing.T) {
memLog.Reset() memLog.Reset()
pp := new(packet.Parsed) pp := new(packet.Parsed)
pp.Decode(pkt) pp.Decode(pkt)
got = tw.filterPacketOutboundToWireGuard(pp, nil) got, _ = tw.filterPacketOutboundToWireGuard(pp, nil, nil)
if got != filter.DropSilently { if got != filter.DropSilently {
t.Errorf("got %v; want DropSilently", got) t.Errorf("got %v; want DropSilently", got)
} }

@ -725,9 +725,9 @@ func (ns *Impl) isLoopbackPort(port uint16) bool {
// handleLocalPackets is hooked into the tun datapath for packets leaving // handleLocalPackets is hooked into the tun datapath for packets leaving
// the host and arriving at tailscaled. This method returns filter.DropSilently // the host and arriving at tailscaled. This method returns filter.DropSilently
// to intercept a packet for handling, for instance traffic to quad-100. // to intercept a packet for handling, for instance traffic to quad-100.
func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Response { func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) {
if ns.ctx.Err() != nil { if ns.ctx.Err() != nil {
return filter.DropSilently return filter.DropSilently, gro
} }
// Determine if we care about this local packet. // Determine if we care about this local packet.
@ -741,11 +741,11 @@ func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Re
switch p.IPProto { switch p.IPProto {
case ipproto.TCP: case ipproto.TCP:
if port := p.Dst.Port(); port != 53 && port != 80 && port != 8080 && !ns.isLoopbackPort(port) { if port := p.Dst.Port(); port != 53 && port != 80 && port != 8080 && !ns.isLoopbackPort(port) {
return filter.Accept return filter.Accept, gro
} }
case ipproto.UDP: case ipproto.UDP:
if port := p.Dst.Port(); port != 53 && !ns.isLoopbackPort(port) { if port := p.Dst.Port(); port != 53 && !ns.isLoopbackPort(port) {
return filter.Accept return filter.Accept, gro
} }
} }
case viaRange.Contains(dst): case viaRange.Contains(dst):
@ -759,7 +759,7 @@ func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Re
if !shouldHandle { if !shouldHandle {
// Unhandled means that we let the regular processing // Unhandled means that we let the regular processing
// occur without doing anything ourselves. // occur without doing anything ourselves.
return filter.Accept return filter.Accept, gro
} }
if debugNetstack() { if debugNetstack() {
@ -785,7 +785,7 @@ func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Re
} }
go ns.userPing(pingIP, pong, userPingDirectionInbound) go ns.userPing(pingIP, pong, userPingDirectionInbound)
return filter.DropSilently return filter.DropSilently, gro
} }
// Fall through to writing inbound so netstack handles the // Fall through to writing inbound so netstack handles the
@ -794,14 +794,14 @@ func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Re
default: default:
// Not traffic to the service IP or a 4via6 IP, so we don't // Not traffic to the service IP or a 4via6 IP, so we don't
// care about the packet; resume processing. // care about the packet; resume processing.
return filter.Accept return filter.Accept, gro
} }
if debugPackets { if debugPackets {
ns.logf("[v2] service packet in (from %v): % x", p.Src, p.Buffer()) ns.logf("[v2] service packet in (from %v): % x", p.Src, p.Buffer())
} }
ns.linkEP.injectInbound(p) gro = ns.linkEP.gro(p, gro)
return filter.DropSilently return filter.DropSilently, gro
} }
func (ns *Impl) DialContextTCP(ctx context.Context, ipp netip.AddrPort) (*gonet.TCPConn, error) { func (ns *Impl) DialContextTCP(ctx context.Context, ipp netip.AddrPort) (*gonet.TCPConn, error) {

@ -750,7 +750,7 @@ func TestHandleLocalPackets(t *testing.T) {
Dst: netip.MustParseAddrPort("100.100.100.100:53"), Dst: netip.MustParseAddrPort("100.100.100.100:53"),
TCPFlags: packet.TCPSyn, TCPFlags: packet.TCPSyn,
} }
resp := impl.handleLocalPackets(pkt, impl.tundev) resp, _ := impl.handleLocalPackets(pkt, impl.tundev, nil)
if resp != filter.DropSilently { if resp != filter.DropSilently {
t.Errorf("got filter outcome %v, want filter.DropSilently", resp) t.Errorf("got filter outcome %v, want filter.DropSilently", resp)
} }
@ -767,7 +767,7 @@ func TestHandleLocalPackets(t *testing.T) {
Dst: netip.MustParseAddrPort("[fd7a:115c:a1e0:b1a:0:7:a01:109]:5678"), Dst: netip.MustParseAddrPort("[fd7a:115c:a1e0:b1a:0:7:a01:109]:5678"),
TCPFlags: packet.TCPSyn, TCPFlags: packet.TCPSyn,
} }
resp := impl.handleLocalPackets(pkt, impl.tundev) resp, _ := impl.handleLocalPackets(pkt, impl.tundev, nil)
// DropSilently is the outcome we expected, since we actually // DropSilently is the outcome we expected, since we actually
// handled this packet by injecting it into netstack, which // handled this packet by injecting it into netstack, which
@ -789,7 +789,7 @@ func TestHandleLocalPackets(t *testing.T) {
Dst: netip.MustParseAddrPort("[fd7a:115c:a1e0:b1a:0:63:a01:109]:5678"), Dst: netip.MustParseAddrPort("[fd7a:115c:a1e0:b1a:0:63:a01:109]:5678"),
TCPFlags: packet.TCPSyn, TCPFlags: packet.TCPSyn,
} }
resp := impl.handleLocalPackets(pkt, impl.tundev) resp, _ := impl.handleLocalPackets(pkt, impl.tundev, nil)
// Accept means that handleLocalPackets does not handle this // Accept means that handleLocalPackets does not handle this
// packet, we "accept" it to continue further processing, // packet, we "accept" it to continue further processing,

Loading…
Cancel
Save