diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index c37ffb246..dacf0ec57 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -177,7 +177,7 @@ type Wrapper struct { // for packets from the local system. This filter is populated by netstack to hook // packets that should be handled by netstack. If set, this filter runs before // PreFilterFromTunToEngine. - PreFilterPacketOutboundToWireGuardNetstackIntercept FilterFunc + PreFilterPacketOutboundToWireGuardNetstackIntercept GROFilterFunc // PreFilterPacketOutboundToWireGuardEngineIntercept is a filter function that runs before the main filter // for packets from the local system. This filter is populated by wgengine to hook // packets which it handles internally. If both this and PreFilterFromTunToNetstack @@ -811,7 +811,7 @@ var ( magicDNSIPPortv6 = netip.AddrPortFrom(tsaddr.TailscaleServiceIPv6(), 0) ) -func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConfigTable) filter.Response { +func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConfigTable, gro *gro.GRO) (filter.Response, *gro.GRO) { // Fake ICMP echo responses to MagicDNS (100.100.100.100). if p.IsEchoRequest() { switch p.Dst { @@ -820,13 +820,13 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf header.ToResponse() outp := packet.Generate(&header, p.Payload()) t.InjectInboundCopy(outp) - return filter.DropSilently // don't pass on to OS; already handled + return filter.DropSilently, gro // don't pass on to OS; already handled case magicDNSIPPortv6: header := p.ICMP6Header() header.ToResponse() outp := packet.Generate(&header, p.Payload()) t.InjectInboundCopy(outp) - return filter.DropSilently // don't pass on to OS; already handled + return filter.DropSilently, gro // don't pass on to OS; already handled } } @@ -838,20 +838,22 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf t.isSelfDisco(p) { t.limitedLogf("[unexpected] received self disco out packet over tstun; dropping") metricPacketOutDropSelfDisco.Add(1) - return filter.DropSilently + return filter.DropSilently, gro } if t.PreFilterPacketOutboundToWireGuardNetstackIntercept != nil { - if res := t.PreFilterPacketOutboundToWireGuardNetstackIntercept(p, t); res.IsDrop() { + var res filter.Response + res, gro = t.PreFilterPacketOutboundToWireGuardNetstackIntercept(p, t, gro) + if res.IsDrop() { // Handled by netstack.Impl.handleLocalPackets (quad-100 DNS primarily) - return res + return res, gro } } if t.PreFilterPacketOutboundToWireGuardEngineIntercept != nil { if res := t.PreFilterPacketOutboundToWireGuardEngineIntercept(p, t); res.IsDrop() { // Handled by userspaceEngine.handleLocalPackets (primarily handles // quad-100 if netstack is not installed). - return res + return res, gro } } @@ -864,7 +866,7 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf filt = t.filter.Load() } if filt == nil { - return filter.Drop + return filter.Drop, gro } if filt.RunOut(p, t.filterFlags) != filter.Accept { @@ -872,15 +874,15 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf metricOutboundDroppedPacketsTotal.Add(dropPacketLabel{ Reason: DropReasonACL, }, 1) - return filter.Drop + return filter.Drop, gro } if t.PostFilterPacketOutboundToWireGuard != nil { if res := t.PostFilterPacketOutboundToWireGuard(p, t); res.IsDrop() { - return res + return res, gro } } - return filter.Accept + return filter.Accept, gro } // noteActivity records that there was a read or write at the current time. @@ -919,6 +921,7 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) { defer parsedPacketPool.Put(p) captHook := t.captureHook.Load() pc := t.peerConfig.Load() + var buffsGRO *gro.GRO for _, data := range res.data { p.Decode(data[res.dataOffset:]) @@ -931,7 +934,8 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) { captHook(capture.FromLocal, t.now(), p.Buffer(), p.CaptureMeta) } if !t.disableFilter { - response := t.filterPacketOutboundToWireGuard(p, pc) + var response filter.Response + response, buffsGRO = t.filterPacketOutboundToWireGuard(p, pc, buffsGRO) if response != filter.Accept { metricPacketOutDrop.Add(1) continue @@ -951,6 +955,9 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) { } buffsPos++ } + if buffsGRO != nil { + buffsGRO.Flush() + } // t.vectorBuffer has a fixed location in memory. // TODO(raggi): add an explicit field and possibly method to the tunVectorReadResult diff --git a/net/tstun/wrap_test.go b/net/tstun/wrap_test.go index 9cb2ad550..f93192102 100644 --- a/net/tstun/wrap_test.go +++ b/net/tstun/wrap_test.go @@ -615,7 +615,7 @@ func TestFilterDiscoLoop(t *testing.T) { memLog.Reset() pp := new(packet.Parsed) pp.Decode(pkt) - got = tw.filterPacketOutboundToWireGuard(pp, nil) + got, _ = tw.filterPacketOutboundToWireGuard(pp, nil, nil) if got != filter.DropSilently { t.Errorf("got %v; want DropSilently", got) } diff --git a/wgengine/netstack/netstack.go b/wgengine/netstack/netstack.go index 2ab40e810..3f49bd5a9 100644 --- a/wgengine/netstack/netstack.go +++ b/wgengine/netstack/netstack.go @@ -725,9 +725,9 @@ func (ns *Impl) isLoopbackPort(port uint16) bool { // handleLocalPackets is hooked into the tun datapath for packets leaving // the host and arriving at tailscaled. This method returns filter.DropSilently // to intercept a packet for handling, for instance traffic to quad-100. -func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Response { +func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) { if ns.ctx.Err() != nil { - return filter.DropSilently + return filter.DropSilently, gro } // Determine if we care about this local packet. @@ -741,11 +741,11 @@ func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Re switch p.IPProto { case ipproto.TCP: if port := p.Dst.Port(); port != 53 && port != 80 && port != 8080 && !ns.isLoopbackPort(port) { - return filter.Accept + return filter.Accept, gro } case ipproto.UDP: if port := p.Dst.Port(); port != 53 && !ns.isLoopbackPort(port) { - return filter.Accept + return filter.Accept, gro } } case viaRange.Contains(dst): @@ -759,7 +759,7 @@ func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Re if !shouldHandle { // Unhandled means that we let the regular processing // occur without doing anything ourselves. - return filter.Accept + return filter.Accept, gro } if debugNetstack() { @@ -785,7 +785,7 @@ func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Re } go ns.userPing(pingIP, pong, userPingDirectionInbound) - return filter.DropSilently + return filter.DropSilently, gro } // Fall through to writing inbound so netstack handles the @@ -794,14 +794,14 @@ func (ns *Impl) handleLocalPackets(p *packet.Parsed, t *tstun.Wrapper) filter.Re default: // Not traffic to the service IP or a 4via6 IP, so we don't // care about the packet; resume processing. - return filter.Accept + return filter.Accept, gro } if debugPackets { ns.logf("[v2] service packet in (from %v): % x", p.Src, p.Buffer()) } - ns.linkEP.injectInbound(p) - return filter.DropSilently + gro = ns.linkEP.gro(p, gro) + return filter.DropSilently, gro } func (ns *Impl) DialContextTCP(ctx context.Context, ipp netip.AddrPort) (*gonet.TCPConn, error) { diff --git a/wgengine/netstack/netstack_test.go b/wgengine/netstack/netstack_test.go index fe28efdd5..6be61cd58 100644 --- a/wgengine/netstack/netstack_test.go +++ b/wgengine/netstack/netstack_test.go @@ -750,7 +750,7 @@ func TestHandleLocalPackets(t *testing.T) { Dst: netip.MustParseAddrPort("100.100.100.100:53"), TCPFlags: packet.TCPSyn, } - resp := impl.handleLocalPackets(pkt, impl.tundev) + resp, _ := impl.handleLocalPackets(pkt, impl.tundev, nil) if resp != filter.DropSilently { t.Errorf("got filter outcome %v, want filter.DropSilently", resp) } @@ -767,7 +767,7 @@ func TestHandleLocalPackets(t *testing.T) { Dst: netip.MustParseAddrPort("[fd7a:115c:a1e0:b1a:0:7:a01:109]:5678"), TCPFlags: packet.TCPSyn, } - resp := impl.handleLocalPackets(pkt, impl.tundev) + resp, _ := impl.handleLocalPackets(pkt, impl.tundev, nil) // DropSilently is the outcome we expected, since we actually // handled this packet by injecting it into netstack, which @@ -789,7 +789,7 @@ func TestHandleLocalPackets(t *testing.T) { Dst: netip.MustParseAddrPort("[fd7a:115c:a1e0:b1a:0:63:a01:109]:5678"), TCPFlags: packet.TCPSyn, } - resp := impl.handleLocalPackets(pkt, impl.tundev) + resp, _ := impl.handleLocalPackets(pkt, impl.tundev, nil) // Accept means that handleLocalPackets does not handle this // packet, we "accept" it to continue further processing,