diff --git a/net/packet/tsmp.go b/net/packet/tsmp.go index 0ea321e84..8fad1d503 100644 --- a/net/packet/tsmp.go +++ b/net/packet/tsmp.go @@ -15,7 +15,9 @@ import ( "fmt" "net/netip" + "go4.org/mem" "tailscale.com/types/ipproto" + "tailscale.com/types/key" ) const minTSMPSize = 7 // the rejected body is 7 bytes @@ -72,6 +74,9 @@ const ( // TSMPTypePong is the type byte for a TailscalePongResponse. TSMPTypePong TSMPType = 'o' + + // TSPMTypeDiscoAdvertisement is the type byte for sending disco keys + TSMPTypeDiscoAdvertisement TSMPType = 'a' ) type TailscaleRejectReason byte @@ -259,3 +264,53 @@ func (h TSMPPongReply) Marshal(buf []byte) error { binary.BigEndian.PutUint16(buf[9:11], h.PeerAPIPort) return nil } + +// TSMPDiscoKeyAdvertisement is a TSMP message that's used for distributing Disco Keys. +// +// On the wire, after the IP header, it's currently 33 bytes: +// - 'a' (TSMPTypeDiscoAdvertisement) +// - 32 disco key bytes +type TSMPDiscoKeyAdvertisement struct { + Src, Dst netip.Addr + Key key.DiscoPublic +} + +func (ka *TSMPDiscoKeyAdvertisement) Marshal() ([]byte, error) { + var iph Header + if ka.Src.Is4() { + iph = IP4Header{ + IPProto: ipproto.TSMP, + Src: ka.Src, + Dst: ka.Dst, + } + } else { + iph = IP6Header{ + IPProto: ipproto.TSMP, + Src: ka.Src, + Dst: ka.Dst, + } + } + payload := make([]byte, 0, 33) + payload = append(payload, byte(TSMPTypeDiscoAdvertisement)) + payload = ka.Key.AppendTo(payload) + if len(payload) != 33 { + // Mostly to safeguard against ourselves changing this in the future. + return []byte{}, fmt.Errorf("expected payload length 33, got %d", len(payload)) + } + + return Generate(iph, payload), nil +} + +func (pp *Parsed) AsTSMPDiscoAdvertisement() (tka TSMPDiscoKeyAdvertisement, ok bool) { + if pp.IPProto != ipproto.TSMP { + return + } + p := pp.Payload() + if len(p) < 33 || p[0] != byte(TSMPTypeDiscoAdvertisement) { + return + } + tka.Src = pp.Src.Addr() + tka.Key = key.DiscoPublicFromRaw32(mem.B(p[1:33])) + + return tka, true +} diff --git a/net/packet/tsmp_test.go b/net/packet/tsmp_test.go index e261e6a41..d8f1d38d5 100644 --- a/net/packet/tsmp_test.go +++ b/net/packet/tsmp_test.go @@ -4,8 +4,14 @@ package packet import ( + "bytes" + "encoding/hex" "net/netip" + "slices" "testing" + + "go4.org/mem" + "tailscale.com/types/key" ) func TestTailscaleRejectedHeader(t *testing.T) { @@ -71,3 +77,62 @@ func TestTailscaleRejectedHeader(t *testing.T) { } } } + +func TestTSMPDiscoKeyAdvertisementMarshal(t *testing.T) { + var ( + // IPv4: Ver(4)Len(5), TOS, Len(53), ID, Flags, TTL(64), Proto(99), Cksum + headerV4, _ = hex.DecodeString("45000035000000004063705d") + // IPv6: Ver(6)TCFlow, Len(33), NextHdr(99), HopLim(64) + headerV6, _ = hex.DecodeString("6000000000216340") + + packetType = []byte{'a'} + testKey = bytes.Repeat([]byte{'a'}, 32) + + // IPs + srcV4 = netip.MustParseAddr("1.2.3.4") + dstV4 = netip.MustParseAddr("4.3.2.1") + srcV6 = netip.MustParseAddr("2001:db8::1") + dstV6 = netip.MustParseAddr("2001:db8::2") + ) + + join := func(parts ...[]byte) []byte { + return bytes.Join(parts, nil) + } + + tests := []struct { + name string + tka TSMPDiscoKeyAdvertisement + want []byte + }{ + { + name: "v4Header", + tka: TSMPDiscoKeyAdvertisement{ + Src: srcV4, + Dst: dstV4, + Key: key.DiscoPublicFromRaw32(mem.B(testKey)), + }, + want: join(headerV4, srcV4.AsSlice(), dstV4.AsSlice(), packetType, testKey), + }, + { + name: "v6Header", + tka: TSMPDiscoKeyAdvertisement{ + Src: srcV6, + Dst: dstV6, + Key: key.DiscoPublicFromRaw32(mem.B(testKey)), + }, + want: join(headerV6, srcV6.AsSlice(), dstV6.AsSlice(), packetType, testKey), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.tka.Marshal() + if err != nil { + t.Errorf("error mashalling TSMPDiscoAdvertisement: %s", err) + } + if !slices.Equal(got, tt.want) { + t.Errorf("error mashalling TSMPDiscoAdvertisement, expected: \n%x, \ngot:\n%x", tt.want, got) + } + }) + } +} diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index db4f689bf..6e07c7a3d 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -34,6 +34,7 @@ import ( "tailscale.com/types/logger" "tailscale.com/types/netlogfunc" "tailscale.com/util/clientmetric" + "tailscale.com/util/eventbus" "tailscale.com/util/usermetric" "tailscale.com/wgengine/filter" "tailscale.com/wgengine/netstack/gro" @@ -209,6 +210,9 @@ type Wrapper struct { captureHook syncs.AtomicValue[packet.CaptureCallback] metrics *metrics + + eventClient *eventbus.Client + discoKeyAdvertisementPub *eventbus.Publisher[DiscoKeyAdvertisement] } type metrics struct { @@ -254,15 +258,15 @@ func (w *Wrapper) Start() { close(w.startCh) } -func WrapTAP(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper { - return wrap(logf, tdev, true, m) +func WrapTAP(logf logger.Logf, tdev tun.Device, m *usermetric.Registry, bus *eventbus.Bus) *Wrapper { + return wrap(logf, tdev, true, m, bus) } -func Wrap(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper { - return wrap(logf, tdev, false, m) +func Wrap(logf logger.Logf, tdev tun.Device, m *usermetric.Registry, bus *eventbus.Bus) *Wrapper { + return wrap(logf, tdev, false, m, bus) } -func wrap(logf logger.Logf, tdev tun.Device, isTAP bool, m *usermetric.Registry) *Wrapper { +func wrap(logf logger.Logf, tdev tun.Device, isTAP bool, m *usermetric.Registry, bus *eventbus.Bus) *Wrapper { logf = logger.WithPrefix(logf, "tstun: ") w := &Wrapper{ logf: logf, @@ -283,6 +287,9 @@ func wrap(logf logger.Logf, tdev tun.Device, isTAP bool, m *usermetric.Registry) metrics: registerMetrics(m), } + w.eventClient = bus.Client("net.tstun") + w.discoKeyAdvertisementPub = eventbus.Publish[DiscoKeyAdvertisement](w.eventClient) + w.vectorBuffer = make([][]byte, tdev.BatchSize()) for i := range w.vectorBuffer { w.vectorBuffer[i] = make([]byte, maxBufferSize) @@ -357,6 +364,7 @@ func (t *Wrapper) Close() error { close(t.vectorOutbound) t.outboundMu.Unlock() err = t.tdev.Close() + t.eventClient.Close() }) return err } @@ -1118,6 +1126,11 @@ func (t *Wrapper) injectedRead(res tunInjectedRead, outBuffs [][]byte, sizes []i return n, err } +type DiscoKeyAdvertisement struct { + Src netip.Addr + Key key.DiscoPublic +} + func (t *Wrapper) filterPacketInboundFromWireGuard(p *packet.Parsed, captHook packet.CaptureCallback, pc *peerConfigTable, gro *gro.GRO) (filter.Response, *gro.GRO) { if captHook != nil { captHook(packet.FromPeer, t.now(), p.Buffer(), p.CaptureMeta) @@ -1128,6 +1141,12 @@ func (t *Wrapper) filterPacketInboundFromWireGuard(p *packet.Parsed, captHook pa t.noteActivity() t.injectOutboundPong(p, pingReq) return filter.DropSilently, gro + } else if discoKeyAdvert, ok := p.AsTSMPDiscoAdvertisement(); ok { + t.discoKeyAdvertisementPub.Publish(DiscoKeyAdvertisement{ + Src: discoKeyAdvert.Src, + Key: discoKeyAdvert.Key, + }) + return filter.DropSilently, gro } else if data, ok := p.AsTSMPPong(); ok { if f := t.OnTSMPPongReceived; f != nil { f(data) diff --git a/net/tstun/wrap_test.go b/net/tstun/wrap_test.go index 75cf5afb2..c7d0708df 100644 --- a/net/tstun/wrap_test.go +++ b/net/tstun/wrap_test.go @@ -36,6 +36,8 @@ import ( "tailscale.com/types/netlogtype" "tailscale.com/types/ptr" "tailscale.com/types/views" + "tailscale.com/util/eventbus" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/must" "tailscale.com/util/usermetric" "tailscale.com/wgengine/filter" @@ -170,10 +172,10 @@ func setfilter(logf logger.Logf, tun *Wrapper) { tun.SetFilter(filter.New(matches, nil, ipSet, ipSet, nil, logf)) } -func newChannelTUN(logf logger.Logf, secure bool) (*tuntest.ChannelTUN, *Wrapper) { +func newChannelTUN(logf logger.Logf, bus *eventbus.Bus, secure bool) (*tuntest.ChannelTUN, *Wrapper) { chtun := tuntest.NewChannelTUN() reg := new(usermetric.Registry) - tun := Wrap(logf, chtun.TUN(), reg) + tun := Wrap(logf, chtun.TUN(), reg, bus) if secure { setfilter(logf, tun) } else { @@ -183,10 +185,10 @@ func newChannelTUN(logf logger.Logf, secure bool) (*tuntest.ChannelTUN, *Wrapper return chtun, tun } -func newFakeTUN(logf logger.Logf, secure bool) (*fakeTUN, *Wrapper) { +func newFakeTUN(logf logger.Logf, bus *eventbus.Bus, secure bool) (*fakeTUN, *Wrapper) { ftun := NewFake() reg := new(usermetric.Registry) - tun := Wrap(logf, ftun, reg) + tun := Wrap(logf, ftun, reg, bus) if secure { setfilter(logf, tun) } else { @@ -196,7 +198,8 @@ func newFakeTUN(logf logger.Logf, secure bool) (*fakeTUN, *Wrapper) { } func TestReadAndInject(t *testing.T) { - chtun, tun := newChannelTUN(t.Logf, false) + bus := eventbustest.NewBus(t) + chtun, tun := newChannelTUN(t.Logf, bus, false) defer tun.Close() const size = 2 // all payloads have this size @@ -221,7 +224,7 @@ func TestReadAndInject(t *testing.T) { } var buf [MaxPacketSize]byte - var seen = make(map[string]bool) + seen := make(map[string]bool) sizes := make([]int, 1) // We expect the same packets back, in no particular order. for i := range len(written) + len(injected) { @@ -257,7 +260,8 @@ func TestReadAndInject(t *testing.T) { } func TestWriteAndInject(t *testing.T) { - chtun, tun := newChannelTUN(t.Logf, false) + bus := eventbustest.NewBus(t) + chtun, tun := newChannelTUN(t.Logf, bus, false) defer tun.Close() written := []string{"w0", "w1"} @@ -316,8 +320,8 @@ func mustHexDecode(s string) []byte { } func TestFilter(t *testing.T) { - - chtun, tun := newChannelTUN(t.Logf, true) + bus := eventbustest.NewBus(t) + chtun, tun := newChannelTUN(t.Logf, bus, true) defer tun.Close() // Reset the metrics before test. These are global @@ -462,7 +466,8 @@ func assertMetricPackets(t *testing.T, metricName string, want, got int64) { } func TestAllocs(t *testing.T) { - ftun, tun := newFakeTUN(t.Logf, false) + bus := eventbustest.NewBus(t) + ftun, tun := newFakeTUN(t.Logf, bus, false) defer tun.Close() buf := [][]byte{{0x00}} @@ -473,14 +478,14 @@ func TestAllocs(t *testing.T) { return } }) - if err != nil { t.Error(err) } } func TestClose(t *testing.T) { - ftun, tun := newFakeTUN(t.Logf, false) + bus := eventbustest.NewBus(t) + ftun, tun := newFakeTUN(t.Logf, bus, false) data := [][]byte{udp4("1.2.3.4", "5.6.7.8", 98, 98)} _, err := ftun.Write(data, 0) @@ -497,7 +502,8 @@ func TestClose(t *testing.T) { func BenchmarkWrite(b *testing.B) { b.ReportAllocs() - ftun, tun := newFakeTUN(b.Logf, true) + bus := eventbustest.NewBus(b) + ftun, tun := newFakeTUN(b.Logf, bus, true) defer tun.Close() packet := [][]byte{udp4("5.6.7.8", "1.2.3.4", 89, 89)} @@ -887,7 +893,8 @@ func TestCaptureHook(t *testing.T) { now := time.Unix(1682085856, 0) - _, w := newFakeTUN(t.Logf, true) + bus := eventbustest.NewBus(t) + _, w := newFakeTUN(t.Logf, bus, true) w.timeNow = func() time.Time { return now } @@ -957,3 +964,30 @@ func TestCaptureHook(t *testing.T) { captured, want) } } + +func TestTSMPDisco(t *testing.T) { + t.Run("IPv6DiscoAdvert", func(t *testing.T) { + src := netip.MustParseAddr("2001:db8::1") + dst := netip.MustParseAddr("2001:db8::2") + discoKey := key.NewDisco() + buf, _ := (&packet.TSMPDiscoKeyAdvertisement{ + Src: src, + Dst: dst, + Key: discoKey.Public(), + }).Marshal() + + var p packet.Parsed + p.Decode(buf) + + tda, ok := p.AsTSMPDiscoAdvertisement() + if !ok { + t.Error("Unable to parse message as TSMPDiscoAdversitement") + } + if tda.Src != src { + t.Errorf("Src address did not match, expected %v, got %v", src, tda.Src) + } + if !reflect.DeepEqual(tda.Key, discoKey.Public()) { + t.Errorf("Key did not match, expected %q, got %q", discoKey.Public(), tda.Key) + } + }) +} diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 7ae422906..4e1024886 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -211,7 +211,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, ln nettype.PacketListe } tun := tuntest.NewChannelTUN() - tsTun := tstun.Wrap(logf, tun.TUN(), ®) + tsTun := tstun.Wrap(logf, tun.TUN(), ®, bus) tsTun.SetFilter(filter.NewAllowAllForTest(logf)) tsTun.Start() @@ -1771,7 +1771,6 @@ func TestEndpointSetsEqual(t *testing.T) { t.Errorf("%q vs %q = %v; want %v", tt.a, tt.b, got, tt.want) } } - } func TestBetterAddr(t *testing.T) { @@ -1915,7 +1914,6 @@ func TestBetterAddr(t *testing.T) { t.Errorf("[%d] betterAddr(%+v, %+v) and betterAddr(%+v, %+v) both unexpectedly true", i, tt.a, tt.b, tt.b, tt.a) } } - } func epFromTyped(eps []tailcfg.Endpoint) (ret []netip.AddrPort) { @@ -3138,7 +3136,6 @@ func TestMaybeRebindOnError(t *testing.T) { t.Errorf("expected at least 5 seconds between %s and %s", lastRebindTime, newTime) } } - }) }) } diff --git a/wgengine/userspace.go b/wgengine/userspace.go index e4c99ded2..a369fa343 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -323,9 +323,9 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) var tsTUNDev *tstun.Wrapper if conf.IsTAP { - tsTUNDev = tstun.WrapTAP(logf, conf.Tun, conf.Metrics) + tsTUNDev = tstun.WrapTAP(logf, conf.Tun, conf.Metrics, conf.EventBus) } else { - tsTUNDev = tstun.Wrap(logf, conf.Tun, conf.Metrics) + tsTUNDev = tstun.Wrap(logf, conf.Tun, conf.Metrics, conf.EventBus) } closePool.add(tsTUNDev) @@ -1436,6 +1436,7 @@ func (e *userspaceEngine) Ping(ip netip.Addr, pingType tailcfg.PingType, size in e.magicConn.Ping(peer, res, size, cb) case "TSMP": e.sendTSMPPing(ip, peer, res, cb) + e.sendTSMPDiscoAdvertisement(ip) case "ICMP": e.sendICMPEchoRequest(ip, peer, res, cb) } @@ -1556,6 +1557,29 @@ func (e *userspaceEngine) sendTSMPPing(ip netip.Addr, peer tailcfg.NodeView, res e.tundev.InjectOutbound(tsmpPing) } +func (e *userspaceEngine) sendTSMPDiscoAdvertisement(ip netip.Addr) { + srcIP, err := e.mySelfIPMatchingFamily(ip) + if err != nil { + e.logf("getting matching node: %s", err) + return + } + tdka := packet.TSMPDiscoKeyAdvertisement{ + Src: srcIP, + Dst: ip, + Key: e.magicConn.DiscoPublicKey(), + } + payload, err := tdka.Marshal() + if err != nil { + e.logf("error generating TSMP Advertisement: %s", err) + metricTSMPDiscoKeyAdvertisementError.Add(1) + } else if err := e.tundev.InjectOutbound(payload); err != nil { + e.logf("error sending TSMP Advertisement: %s", err) + metricTSMPDiscoKeyAdvertisementError.Add(1) + } else { + metricTSMPDiscoKeyAdvertisementSent.Add(1) + } +} + func (e *userspaceEngine) setTSMPPongCallback(data [8]byte, cb func(packet.TSMPPongReply)) { e.mu.Lock() defer e.mu.Unlock() @@ -1722,6 +1746,9 @@ var ( metricNumMajorChanges = clientmetric.NewCounter("wgengine_major_changes") metricNumMinorChanges = clientmetric.NewCounter("wgengine_minor_changes") + + metricTSMPDiscoKeyAdvertisementSent = clientmetric.NewCounter("magicsock_tsmp_disco_key_advertisement_sent") + metricTSMPDiscoKeyAdvertisementError = clientmetric.NewCounter("magicsock_tsmp_disco_key_advertisement_error") ) func (e *userspaceEngine) InstallCaptureHook(cb packet.CaptureCallback) { diff --git a/wgengine/userspace_test.go b/wgengine/userspace_test.go index 89d75b98a..0a1d2924d 100644 --- a/wgengine/userspace_test.go +++ b/wgengine/userspace_test.go @@ -325,6 +325,64 @@ func TestUserspaceEnginePeerMTUReconfig(t *testing.T) { } } +func TestTSMPKeyAdvertisement(t *testing.T) { + var knobs controlknobs.Knobs + + bus := eventbustest.NewBus(t) + ht := health.NewTracker(bus) + reg := new(usermetric.Registry) + e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg, bus) + if err != nil { + t.Fatal(err) + } + t.Cleanup(e.Close) + ue := e.(*userspaceEngine) + routerCfg := &router.Config{} + nodeKey := nkFromHex("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + nm := &netmap.NetworkMap{ + Peers: nodeViews([]*tailcfg.Node{ + { + ID: 1, + Key: nodeKey, + }, + }), + SelfNode: (&tailcfg.Node{ + StableID: "TESTCTRL00000001", + Name: "test-node.test.ts.net", + Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32"), netip.MustParsePrefix("fd7a:115c:a1e0:ab12:4843:cd96:0:1/128")}, + }).View(), + } + cfg := &wgcfg.Config{ + Peers: []wgcfg.Peer{ + { + PublicKey: nodeKey, + AllowedIPs: []netip.Prefix{ + netip.PrefixFrom(netaddr.IPv4(100, 100, 99, 1), 32), + }, + }, + }, + } + + ue.SetNetworkMap(nm) + err = ue.Reconfig(cfg, routerCfg, &dns.Config{}) + if err != nil { + t.Fatal(err) + } + + addr := netip.MustParseAddr("100.100.99.1") + previousValue := metricTSMPDiscoKeyAdvertisementSent.Value() + ue.sendTSMPDiscoAdvertisement(addr) + if val := metricTSMPDiscoKeyAdvertisementSent.Value(); val <= previousValue { + errs := metricTSMPDiscoKeyAdvertisementError.Value() + t.Errorf("Expected 1 disco key advert, got %d, errors %d", val, errs) + } + // Remove config to have the engine shut down more consistently + err = ue.Reconfig(&wgcfg.Config{}, &router.Config{}, &dns.Config{}) + if err != nil { + t.Fatal(err) + } +} + func nkFromHex(hex string) key.NodePublic { if len(hex) != 64 { panic(fmt.Sprintf("%q is len %d; want 64", hex, len(hex)))