Change-Id: Ib12ed1559436e2a6afc1181151d19e8499b15546
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
bradfitz/disco_change_remove_sync
Brad Fitzpatrick 1 week ago
parent 4d4cf8f5ca
commit 0698ac8362

@ -8,7 +8,9 @@ import (
"errors"
"fmt"
"maps"
"os"
"reflect"
"runtime"
"slices"
"strings"
"sync"
@ -570,6 +572,9 @@ func (h *ExtensionHost) shutdownWorkQueue() {
// for in-flight callbacks associated with those operations to finish.
if err := h.workQueue.Wait(ctx); err != nil {
h.logf("work queue shutdown failed: %v", err)
b := make([]byte, 2<<20)
n := runtime.Stack(b, true)
os.WriteFile("/tmp/shutdown-hang-stacks.txt", b[:n], 0644)
}
}

@ -521,6 +521,8 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
b.currentNodeAtomic.Store(nb)
nb.ready()
sys.Engine.Get().SetPeerByIPLookupFunc(b.lookupPeerByIP)
if sys.InitialConfig != nil {
if err := b.initPrefsFromConfig(sys.InitialConfig); err != nil {
return nil, err
@ -652,6 +654,25 @@ func (b *LocalBackend) currentNode() *nodeBackend {
return b.currentNodeAtomic.Load()
}
func (b *LocalBackend) lookupPeerByIP(ip netip.Addr) (peerKey key.NodePublic, ok bool) {
nb := b.currentNode()
nb.mu.Lock()
defer nb.mu.Unlock()
nid, ok := nb.nodeByAddr[ip]
if !ok {
log.Printf("lookupPeerByIP: %v -> no node ID", ip)
return key.NodePublic{}, false
}
peer, ok := nb.peers[nid]
if !ok {
log.Printf("lookupPeerByIP: no node ID %v", nid)
return key.NodePublic{}, false
}
log.Printf("lookupPeerByIP: %v -> %v (%v)", ip, peer.Name(), peer.Key())
return peer.Key(), true
}
// FindExtensionByName returns an active extension with the given name,
// or nil if no such extension exists.
func (b *LocalBackend) FindExtensionByName(name string) any {

@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"io"
"log"
"math"
"net/netip"
"runtime"
@ -1081,8 +1082,8 @@ func (e *userspaceEngine) PeerByKey(pubKey key.NodePublic) (_ wgint.Peer, ok boo
if dev == nil {
return wgint.Peer{}, false
}
peer := dev.LookupPeer(pubKey.Raw32())
if peer == nil {
peer, ok := dev.LookupActivePeer(pubKey.Raw32())
if !ok || peer == nil {
return wgint.Peer{}, false
}
return wgint.PeerOf(peer), true
@ -1614,3 +1615,30 @@ func (e *userspaceEngine) reconfigureVPNIfNecessary() error {
}
return e.reconfigureVPN()
}
func (e *userspaceEngine) SetPeerByIPLookupFunc(fn func(netip.Addr) (key.NodePublic, bool)) {
e.wgdev.SetPeerByIPLookupFunc(func(addr netip.Addr) (_ *device.Peer, ok bool) {
pk, ok := fn(addr)
if !ok {
return nil, false
}
// TODO(bradfitz): optimize this LookupPeer map lookup on each packet;
// store it in the leaf of the bart lookup.
if peer, ok := e.wgdev.LookupActivePeer(pk.Raw32()); ok {
log.Printf("XXX active peer for %v found in LookupActivePeer", pk.ShortString())
return peer, true
}
peer := e.wgdev.LookupPeer(pk.Raw32())
if peer == nil {
return nil, false
}
log.Printf("XXX making new peer for %v", pk.ShortString())
ep, err := e.magicConn.ParseEndpoint(fmt.Sprintf("%02x", pk.Raw32()))
if err != nil {
return nil, false
}
peer.SetEndpointFromPacket(ep)
return peer, peer != nil
})
}

@ -179,3 +179,7 @@ func (e *watchdogEngine) InstallCaptureHook(cb packet.CaptureCallback) {
func (e *watchdogEngine) PeerByKey(pubKey key.NodePublic) (_ wgint.Peer, ok bool) {
return e.wrap.PeerByKey(pubKey)
}
func (e *watchdogEngine) SetPeerByIPLookupFunc(fn func(netip.Addr) (key.NodePublic, bool)) {
e.wrap.SetPeerByIPLookupFunc(fn)
}

@ -4,8 +4,12 @@
package wgcfg
import (
"fmt"
"log"
"net/netip"
"runtime"
"sync/atomic"
"time"
"github.com/tailscale/wireguard-go/conn"
"github.com/tailscale/wireguard-go/device"
@ -40,10 +44,32 @@ func ReconfigDevice(d *device.Device, cfg *Config, logf logger.Logf) (err error)
return !exists
})
d.SetPeerLookupFunc(func(pubk device.NoisePublicKey) []netip.Prefix {
var lastStack atomic.Int64
d.SetPeerLookupFunc(func(pubk device.NoisePublicKey) (_ *device.NewPeerConfig, ok bool) {
allowedIPs, ok := peers[pubk]
log.Printf("XXX wgcfg.ReconfigDevice: lookup for peer %v, found=%v => %v", pubk, ok, allowedIPs)
return allowedIPs
if !ok {
return nil, false
}
var buf []byte
now := time.Now().Unix()
if lastStack.Swap(now) != now {
buf = make([]byte, 4<<10)
buf = buf[:runtime.Stack(buf, false)]
}
log.Printf("XXX wgcfg.ReconfigDevice: lookup for peer %v, found=%v => %v, stack: %s", pubk, ok, allowedIPs, buf)
bind := d.Bind()
ep, err := bind.ParseEndpoint(fmt.Sprintf("%02x", pubk[:]))
if err != nil {
logf("wgcfg: failed to parse endpoint for peer %v: %v", pubk, err)
return nil, false
}
return &device.NewPeerConfig{
AllowedIPs: allowedIPs,
Endpoint: ep,
}, ok
})
return nil

@ -97,6 +97,8 @@ type Engine interface {
// WireGuard status changes.
SetStatusCallback(StatusCallback)
SetPeerByIPLookupFunc(func(netip.Addr) (key.NodePublic, bool))
// RequestStatus requests a WireGuard status update right
// away, sent to the callback registered via SetStatusCallback.
RequestStatus()

Loading…
Cancel
Save