derp: include src IPs in mesh watch messages

Updates tailscale/corp#13945

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/8907/head
Brad Fitzpatrick 11 months ago committed by Brad Fitzpatrick
parent 7ed3681cbe
commit 6c791f7d60

@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"log" "log"
"net" "net"
"net/netip"
"strings" "strings"
"time" "time"
@ -67,7 +68,7 @@ func startMeshWithHost(s *derp.Server, host string) error {
return d.DialContext(ctx, network, addr) return d.DialContext(ctx, network, addr)
}) })
add := func(k key.NodePublic) { s.AddPacketForwarder(k, c) } add := func(k key.NodePublic, _ netip.AddrPort) { s.AddPacketForwarder(k, c) }
remove := func(k key.NodePublic) { s.RemovePacketForwarder(k, c) } remove := func(k key.NodePublic) { s.RemovePacketForwarder(k, c) }
go c.RunWatchConnectionLoop(context.Background(), s.PublicKey(), logf, add, remove) go c.RunWatchConnectionLoop(context.Background(), s.PublicKey(), logf, add, remove)
return nil return nil

@ -85,7 +85,7 @@ const (
// framePeerPresent is like framePeerGone, but for other // framePeerPresent is like framePeerGone, but for other
// members of the DERP region when they're meshed up together. // members of the DERP region when they're meshed up together.
framePeerPresent = frameType(0x09) // 32B pub key of peer that's connected framePeerPresent = frameType(0x09) // 32B pub key of peer that's connected + optional 18B ip:port (16 byte IP + 2 byte BE uint16 port)
// frameWatchConns is how one DERP node in a regional mesh // frameWatchConns is how one DERP node in a regional mesh
// subscribes to the others in the region. // subscribes to the others in the region.

@ -363,7 +363,12 @@ func (PeerGoneMessage) msg() {}
// PeerPresentMessage is a ReceivedMessage that indicates that the client // PeerPresentMessage is a ReceivedMessage that indicates that the client
// is connected to the server. (Only used by trusted mesh clients) // is connected to the server. (Only used by trusted mesh clients)
type PeerPresentMessage key.NodePublic type PeerPresentMessage struct {
// Key is the public key of the client.
Key key.NodePublic
// IPPort is the remote IP and port of the client.
IPPort netip.AddrPort
}
func (PeerPresentMessage) msg() {} func (PeerPresentMessage) msg() {}
@ -546,8 +551,15 @@ func (c *Client) recvTimeout(timeout time.Duration) (m ReceivedMessage, err erro
c.logf("[unexpected] dropping short peerPresent frame from DERP server") c.logf("[unexpected] dropping short peerPresent frame from DERP server")
continue continue
} }
pg := PeerPresentMessage(key.NodePublicFromRaw32(mem.B(b[:keyLen]))) var msg PeerPresentMessage
return pg, nil msg.Key = key.NodePublicFromRaw32(mem.B(b[:keyLen]))
if n >= keyLen+16+2 {
msg.IPPort = netip.AddrPortFrom(
netip.AddrFrom16([16]byte(b[keyLen:keyLen+16])).Unmap(),
binary.BigEndian.Uint16(b[keyLen+16:keyLen+16+2]),
)
}
return msg, nil
case frameRecvPacket: case frameRecvPacket:
var rp ReceivedPacket var rp ReceivedPacket

@ -12,6 +12,7 @@ import (
crand "crypto/rand" crand "crypto/rand"
"crypto/x509" "crypto/x509"
"crypto/x509/pkix" "crypto/x509/pkix"
"encoding/binary"
"encoding/json" "encoding/json"
"errors" "errors"
"expvar" "expvar"
@ -43,6 +44,7 @@ import (
"tailscale.com/tstime/rate" "tailscale.com/tstime/rate"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/set"
"tailscale.com/version" "tailscale.com/version"
) )
@ -150,7 +152,7 @@ type Server struct {
closed bool closed bool
netConns map[Conn]chan struct{} // chan is closed when conn closes netConns map[Conn]chan struct{} // chan is closed when conn closes
clients map[key.NodePublic]clientSet clients map[key.NodePublic]clientSet
watchers map[*sclient]bool // mesh peer -> true watchers set.Set[*sclient] // mesh peers
// clientsMesh tracks all clients in the cluster, both locally // clientsMesh tracks all clients in the cluster, both locally
// and to mesh peers. If the value is nil, that means the // and to mesh peers. If the value is nil, that means the
// peer is only local (and thus in the clients Map, but not // peer is only local (and thus in the clients Map, but not
@ -219,8 +221,7 @@ func (s singleClient) ForeachClient(f func(*sclient)) { f(s.c) }
// All fields are guarded by Server.mu. // All fields are guarded by Server.mu.
type dupClientSet struct { type dupClientSet struct {
// set is the set of connected clients for sclient.key. // set is the set of connected clients for sclient.key.
// The values are all true. set set.Set[*sclient]
set map[*sclient]bool
// last is the most recent addition to set, or nil if the most // last is the most recent addition to set, or nil if the most
// recent one has since disconnected and nobody else has send // recent one has since disconnected and nobody else has send
@ -261,7 +262,7 @@ func (s *dupClientSet) removeClient(c *sclient) bool {
trim := s.sendHistory[:0] trim := s.sendHistory[:0]
for _, v := range s.sendHistory { for _, v := range s.sendHistory {
if s.set[v] && (len(trim) == 0 || trim[len(trim)-1] != v) { if s.set.Contains(v) && (len(trim) == 0 || trim[len(trim)-1] != v) {
trim = append(trim, v) trim = append(trim, v)
} }
} }
@ -316,7 +317,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
clientsMesh: map[key.NodePublic]PacketForwarder{}, clientsMesh: map[key.NodePublic]PacketForwarder{},
netConns: map[Conn]chan struct{}{}, netConns: map[Conn]chan struct{}{},
memSys0: ms.Sys, memSys0: ms.Sys,
watchers: map[*sclient]bool{}, watchers: set.Set[*sclient]{},
sentTo: map[key.NodePublic]map[key.NodePublic]int64{}, sentTo: map[key.NodePublic]map[key.NodePublic]int64{},
avgQueueDuration: new(uint64), avgQueueDuration: new(uint64),
tcpRtt: metrics.LabelMap{Label: "le"}, tcpRtt: metrics.LabelMap{Label: "le"},
@ -498,8 +499,8 @@ func (s *Server) registerClient(c *sclient) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
set := s.clients[c.key] curSet := s.clients[c.key]
switch set := set.(type) { switch curSet := curSet.(type) {
case nil: case nil:
s.clients[c.key] = singleClient{c} s.clients[c.key] = singleClient{c}
c.debugLogf("register single client") c.debugLogf("register single client")
@ -507,14 +508,14 @@ func (s *Server) registerClient(c *sclient) {
s.dupClientKeys.Add(1) s.dupClientKeys.Add(1)
s.dupClientConns.Add(2) // both old and new count s.dupClientConns.Add(2) // both old and new count
s.dupClientConnTotal.Add(1) s.dupClientConnTotal.Add(1)
old := set.ActiveClient() old := curSet.ActiveClient()
old.isDup.Store(true) old.isDup.Store(true)
c.isDup.Store(true) c.isDup.Store(true)
s.clients[c.key] = &dupClientSet{ s.clients[c.key] = &dupClientSet{
last: c, last: c,
set: map[*sclient]bool{ set: set.Set[*sclient]{
old: true, old: struct{}{},
c: true, c: struct{}{},
}, },
sendHistory: []*sclient{old}, sendHistory: []*sclient{old},
} }
@ -523,9 +524,9 @@ func (s *Server) registerClient(c *sclient) {
s.dupClientConns.Add(1) // the gauge s.dupClientConns.Add(1) // the gauge
s.dupClientConnTotal.Add(1) // the counter s.dupClientConnTotal.Add(1) // the counter
c.isDup.Store(true) c.isDup.Store(true)
set.set[c] = true curSet.set.Add(c)
set.last = c curSet.last = c
set.sendHistory = append(set.sendHistory, c) curSet.sendHistory = append(curSet.sendHistory, c)
c.debugLogf("register another duplicate client") c.debugLogf("register another duplicate client")
} }
@ -534,7 +535,7 @@ func (s *Server) registerClient(c *sclient) {
} }
s.keyOfAddr[c.remoteIPPort] = c.key s.keyOfAddr[c.remoteIPPort] = c.key
s.curClients.Add(1) s.curClients.Add(1)
s.broadcastPeerStateChangeLocked(c.key, true) s.broadcastPeerStateChangeLocked(c.key, c.remoteIPPort, true)
} }
// broadcastPeerStateChangeLocked enqueues a message to all watchers // broadcastPeerStateChangeLocked enqueues a message to all watchers
@ -542,9 +543,13 @@ func (s *Server) registerClient(c *sclient) {
// presence changed. // presence changed.
// //
// s.mu must be held. // s.mu must be held.
func (s *Server) broadcastPeerStateChangeLocked(peer key.NodePublic, present bool) { func (s *Server) broadcastPeerStateChangeLocked(peer key.NodePublic, ipPort netip.AddrPort, present bool) {
for w := range s.watchers { for w := range s.watchers {
w.peerStateChange = append(w.peerStateChange, peerConnState{peer: peer, present: present}) w.peerStateChange = append(w.peerStateChange, peerConnState{
peer: peer,
present: present,
ipPort: ipPort,
})
go w.requestMeshUpdate() go w.requestMeshUpdate()
} }
} }
@ -565,7 +570,7 @@ func (s *Server) unregisterClient(c *sclient) {
delete(s.clientsMesh, c.key) delete(s.clientsMesh, c.key)
s.notePeerGoneFromRegionLocked(c.key) s.notePeerGoneFromRegionLocked(c.key)
} }
s.broadcastPeerStateChangeLocked(c.key, false) s.broadcastPeerStateChangeLocked(c.key, netip.AddrPort{}, false)
case *dupClientSet: case *dupClientSet:
c.debugLogf("removed duplicate client") c.debugLogf("removed duplicate client")
if set.removeClient(c) { if set.removeClient(c) {
@ -655,13 +660,21 @@ func (s *Server) addWatcher(c *sclient) {
defer s.mu.Unlock() defer s.mu.Unlock()
// Queue messages for each already-connected client. // Queue messages for each already-connected client.
for peer := range s.clients { for peer, clientSet := range s.clients {
c.peerStateChange = append(c.peerStateChange, peerConnState{peer: peer, present: true}) ac := clientSet.ActiveClient()
if ac == nil {
continue
}
c.peerStateChange = append(c.peerStateChange, peerConnState{
peer: peer,
present: true,
ipPort: ac.remoteIPPort,
})
} }
// And enroll the watcher in future updates (of both // And enroll the watcher in future updates (of both
// connections & disconnections). // connections & disconnections).
s.watchers[c] = true s.watchers.Add(c)
go c.requestMeshUpdate() go c.requestMeshUpdate()
} }
@ -1349,6 +1362,7 @@ type sclient struct {
type peerConnState struct { type peerConnState struct {
peer key.NodePublic peer key.NodePublic
present bool present bool
ipPort netip.AddrPort // if present, the peer's IP:port
} }
// pkt is a request to write a data frame to an sclient. // pkt is a request to write a data frame to an sclient.
@ -1542,12 +1556,18 @@ func (c *sclient) sendPeerGone(peer key.NodePublic, reason PeerGoneReasonType) e
} }
// sendPeerPresent sends a peerPresent frame, without flushing. // sendPeerPresent sends a peerPresent frame, without flushing.
func (c *sclient) sendPeerPresent(peer key.NodePublic) error { func (c *sclient) sendPeerPresent(peer key.NodePublic, ipPort netip.AddrPort) error {
c.setWriteDeadline() c.setWriteDeadline()
if err := writeFrameHeader(c.bw.bw(), framePeerPresent, keyLen); err != nil { const frameLen = keyLen + 16 + 2
if err := writeFrameHeader(c.bw.bw(), framePeerPresent, frameLen); err != nil {
return err return err
} }
_, err := c.bw.Write(peer.AppendTo(nil)) payload := make([]byte, frameLen)
_ = peer.AppendTo(payload[:0])
a16 := ipPort.Addr().As16()
copy(payload[keyLen:], a16[:])
binary.BigEndian.PutUint16(payload[keyLen+16:], ipPort.Port())
_, err := c.bw.Write(payload)
return err return err
} }
@ -1566,7 +1586,7 @@ func (c *sclient) sendMeshUpdates() error {
} }
var err error var err error
if pcs.present { if pcs.present {
err = c.sendPeerPresent(pcs.peer) err = c.sendPeerPresent(pcs.peer, pcs.ipPort)
} else { } else {
err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected) err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected)
} }

@ -92,7 +92,7 @@ func TestSendRecv(t *testing.T) {
defer cancel() defer cancel()
brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin)) brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin))
go s.Accept(ctx, cin, brwServer, fmt.Sprintf("test-client-%d", i)) go s.Accept(ctx, cin, brwServer, fmt.Sprintf("[abc::def]:%v", i))
key := clientPrivateKeys[i] key := clientPrivateKeys[i]
brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout)) brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout))
@ -528,7 +528,7 @@ func newTestServer(t *testing.T, ctx context.Context) *testServer {
// TODO: register c in ts so Close also closes it? // TODO: register c in ts so Close also closes it?
go func(i int) { go func(i int) {
brwServer := bufio.NewReadWriter(bufio.NewReader(c), bufio.NewWriter(c)) brwServer := bufio.NewReadWriter(bufio.NewReader(c), bufio.NewWriter(c))
go s.Accept(ctx, c, brwServer, fmt.Sprintf("test-client-%d", i)) go s.Accept(ctx, c, brwServer, c.RemoteAddr().String())
}(i) }(i)
} }
}() }()
@ -615,7 +615,7 @@ func (tc *testClient) wantPresent(t *testing.T, peers ...key.NodePublic) {
} }
switch m := m.(type) { switch m := m.(type) {
case PeerPresentMessage: case PeerPresentMessage:
got := key.NodePublic(m) got := m.Key
if !want[got] { if !want[got] {
t.Fatalf("got peer present for %v; want present for %v", tc.ts.keyName(got), logger.ArgWriter(func(bw *bufio.Writer) { t.Fatalf("got peer present for %v; want present for %v", tc.ts.keyName(got), logger.ArgWriter(func(bw *bufio.Writer) {
for _, pub := range peers { for _, pub := range peers {
@ -623,6 +623,7 @@ func (tc *testClient) wantPresent(t *testing.T, peers ...key.NodePublic) {
} }
})) }))
} }
t.Logf("got present with IP %v", m.IPPort)
delete(want, got) delete(want, got)
if len(want) == 0 { if len(want) == 0 {
return return

@ -5,6 +5,7 @@ package derphttp
import ( import (
"context" "context"
"net/netip"
"sync" "sync"
"time" "time"
@ -26,7 +27,7 @@ import (
// //
// To force RunWatchConnectionLoop to return quickly, its ctx needs to // To force RunWatchConnectionLoop to return quickly, its ctx needs to
// be closed, and c itself needs to be closed. // be closed, and c itself needs to be closed.
func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add, remove func(key.NodePublic)) { func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add func(key.NodePublic, netip.AddrPort), remove func(key.NodePublic)) {
if infoLogf == nil { if infoLogf == nil {
infoLogf = logger.Discard infoLogf = logger.Discard
} }
@ -68,9 +69,9 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
}) })
defer timer.Stop() defer timer.Stop()
updatePeer := func(k key.NodePublic, isPresent bool) { updatePeer := func(k key.NodePublic, ipPort netip.AddrPort, isPresent bool) {
if isPresent { if isPresent {
add(k) add(k, ipPort)
} else { } else {
remove(k) remove(k)
} }
@ -126,7 +127,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
} }
switch m := m.(type) { switch m := m.(type) {
case derp.PeerPresentMessage: case derp.PeerPresentMessage:
updatePeer(key.NodePublic(m), true) updatePeer(m.Key, m.IPPort, true)
case derp.PeerGoneMessage: case derp.PeerGoneMessage:
switch m.Reason { switch m.Reason {
case derp.PeerGoneReasonDisconnected: case derp.PeerGoneReasonDisconnected:
@ -138,7 +139,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
logf("Recv: peer %s not at server %s for unknown reason %v", logf("Recv: peer %s not at server %s for unknown reason %v",
key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString(), m.Reason) key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString(), m.Reason)
} }
updatePeer(key.NodePublic(m.Peer), false) updatePeer(key.NodePublic(m.Peer), netip.AddrPort{}, false)
default: default:
continue continue
} }

Loading…
Cancel
Save