wgengine, ipn/ipnlocal, wgengine/magicsock: remove RequestStatus, eventbus-ify things

Updates #17900

Change-Id: Ia53a3f195a82256d8f915c8928d8a775f723259d
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
bradfitz/getstatus
Brad Fitzpatrick 3 weeks ago
parent 124301fbb6
commit 5b0997536f

@ -845,6 +845,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/result from tailscale.com/util/lineiter
tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/structs from tailscale.com/control/controlclient+
tailscale.com/types/tkatype from tailscale.com/client/local+ tailscale.com/types/tkatype from tailscale.com/client/local+
tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+
tailscale.com/types/views from tailscale.com/appc+ tailscale.com/types/views from tailscale.com/appc+
tailscale.com/util/backoff from tailscale.com/cmd/k8s-operator+ tailscale.com/util/backoff from tailscale.com/cmd/k8s-operator+
tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+

@ -141,6 +141,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/result from tailscale.com/util/lineiter
tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/structs from tailscale.com/control/controlclient+
tailscale.com/types/tkatype from tailscale.com/control/controlclient+ tailscale.com/types/tkatype from tailscale.com/control/controlclient+
tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+
tailscale.com/types/views from tailscale.com/appc+ tailscale.com/types/views from tailscale.com/appc+
tailscale.com/util/backoff from tailscale.com/control/controlclient+ tailscale.com/util/backoff from tailscale.com/control/controlclient+
tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+

@ -168,6 +168,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/result from tailscale.com/util/lineiter
tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/structs from tailscale.com/control/controlclient+
tailscale.com/types/tkatype from tailscale.com/control/controlclient+ tailscale.com/types/tkatype from tailscale.com/control/controlclient+
tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+
tailscale.com/types/views from tailscale.com/appc+ tailscale.com/types/views from tailscale.com/appc+
tailscale.com/util/backoff from tailscale.com/control/controlclient+ tailscale.com/util/backoff from tailscale.com/control/controlclient+
tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+

@ -413,6 +413,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/result from tailscale.com/util/lineiter
tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/structs from tailscale.com/control/controlclient+
tailscale.com/types/tkatype from tailscale.com/tka+ tailscale.com/types/tkatype from tailscale.com/tka+
tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+
tailscale.com/types/views from tailscale.com/ipn/ipnlocal+ tailscale.com/types/views from tailscale.com/ipn/ipnlocal+
tailscale.com/util/backoff from tailscale.com/cmd/tailscaled+ tailscale.com/util/backoff from tailscale.com/cmd/tailscaled+
tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+

@ -250,6 +250,7 @@ tailscale.com/cmd/tsidp dependencies: (generated by github.com/tailscale/depawar
tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/result from tailscale.com/util/lineiter
tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/structs from tailscale.com/control/controlclient+
tailscale.com/types/tkatype from tailscale.com/client/local+ tailscale.com/types/tkatype from tailscale.com/client/local+
tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+
tailscale.com/types/views from tailscale.com/appc+ tailscale.com/types/views from tailscale.com/appc+
tailscale.com/util/backoff from tailscale.com/control/controlclient+ tailscale.com/util/backoff from tailscale.com/control/controlclient+
tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+

@ -81,6 +81,7 @@ import (
"tailscale.com/types/persist" "tailscale.com/types/persist"
"tailscale.com/types/preftype" "tailscale.com/types/preftype"
"tailscale.com/types/ptr" "tailscale.com/types/ptr"
"tailscale.com/types/topics"
"tailscale.com/types/views" "tailscale.com/types/views"
"tailscale.com/util/checkchange" "tailscale.com/util/checkchange"
"tailscale.com/util/clientmetric" "tailscale.com/util/clientmetric"
@ -287,7 +288,6 @@ type LocalBackend struct {
hostinfo *tailcfg.Hostinfo // TODO(nickkhyl): move to nodeBackend hostinfo *tailcfg.Hostinfo // TODO(nickkhyl): move to nodeBackend
nmExpiryTimer tstime.TimerController // for updating netMap on node expiry; can be nil; TODO(nickkhyl): move to nodeBackend nmExpiryTimer tstime.TimerController // for updating netMap on node expiry; can be nil; TODO(nickkhyl): move to nodeBackend
activeLogin string // last logged LoginName from netMap; TODO(nickkhyl): move to nodeBackend (or remove? it's in [ipn.LoginProfile]). activeLogin string // last logged LoginName from netMap; TODO(nickkhyl): move to nodeBackend (or remove? it's in [ipn.LoginProfile]).
engineStatus ipn.EngineStatus
endpoints []tailcfg.Endpoint endpoints []tailcfg.Endpoint
blocked bool blocked bool
keyExpired bool // TODO(nickkhyl): move to nodeBackend keyExpired bool // TODO(nickkhyl): move to nodeBackend
@ -300,10 +300,11 @@ type LocalBackend struct {
peerAPIListeners []*peerAPIListener peerAPIListeners []*peerAPIListener
loginFlags controlclient.LoginFlags loginFlags controlclient.LoginFlags
notifyWatchers map[string]*watchSession // by session ID notifyWatchers map[string]*watchSession // by session ID
lastStatusTime time.Time // status.AsOf value of the last processed status update
componentLogUntil map[string]componentLogState componentLogUntil map[string]componentLogState
currentUser ipnauth.Actor currentUser ipnauth.Actor
liveDERPs int // number of live DERP connections, per eventbus notification
// capForcedNetfilter is the netfilter that control instructs Linux clients // capForcedNetfilter is the netfilter that control instructs Linux clients
// to use, unless overridden locally. // to use, unless overridden locally.
capForcedNetfilter string // TODO(nickkhyl): move to nodeBackend capForcedNetfilter string // TODO(nickkhyl): move to nodeBackend
@ -558,8 +559,6 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
b.setTCPPortsIntercepted(nil) b.setTCPPortsIntercepted(nil)
b.e.SetStatusCallback(b.setWgengineStatus)
b.prevIfState = netMon.InterfaceState() b.prevIfState = netMon.InterfaceState()
// Call our linkChange code once with the current state. // Call our linkChange code once with the current state.
// Following changes are triggered via the eventbus. // Following changes are triggered via the eventbus.
@ -590,6 +589,8 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend") ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend")
b.eventClient = ec b.eventClient = ec
eventbus.SubscribeFunc(ec, b.onClientVersion) eventbus.SubscribeFunc(ec, b.onClientVersion)
eventbus.SubscribeFunc(ec, b.onEndpointsChange)
eventbus.SubscribeFunc(ec, b.onDERPConnChange)
eventbus.SubscribeFunc(ec, func(au controlclient.AutoUpdate) { eventbus.SubscribeFunc(ec, func(au controlclient.AutoUpdate) {
b.onTailnetDefaultAutoUpdate(au.Value) b.onTailnetDefaultAutoUpdate(au.Value)
}) })
@ -2251,65 +2252,25 @@ func (b *LocalBackend) resolveExitNodeIPLocked(prefs *ipn.Prefs) (prefsChanged b
return prefsChanged return prefsChanged
} }
// setWgengineStatus is the callback by the wireguard engine whenever it posts a new status. func (b *LocalBackend) onEndpointsChange(eps topics.EndpointsChanged) {
// This updates the endpoints both in the backend and in the control client.
func (b *LocalBackend) setWgengineStatus(s *wgengine.Status, err error) {
if err != nil {
b.logf("wgengine status error: %v", err)
return
}
if s == nil {
b.logf("[unexpected] non-error wgengine update with status=nil: %v", s)
return
}
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
// For now, only check this in the callback, but don't check it in setWgengineStatusLocked
if s.AsOf.Before(b.lastStatusTime) {
// Don't process a status update that is older than the one we have
// already processed. (corp#2579)
return
}
b.lastStatusTime = s.AsOf
b.setWgengineStatusLocked(s)
}
// setWgengineStatusLocked updates LocalBackend's view of the engine status and
// updates the endpoints both in the backend and in the control client.
//
// Unlike setWgengineStatus it does not discard out-of-order updates, so
// statuses sent here are always processed. This is useful for ensuring we don't
// miss a "we shut down" status during backend shutdown even if other statuses
// arrive out of order.
//
// TODO(zofrex): we should ensure updates actually do arrive in order and move
// the out-of-order check into this function.
//
// b.mu must be held.
func (b *LocalBackend) setWgengineStatusLocked(s *wgengine.Status) {
es := b.parseWgStatusLocked(s)
cc := b.cc cc := b.cc
// TODO(zofrex): the only reason we even write this is to transition from
// "Starting" to "Running" in the call to state machine a few lines below
// this. Maybe we don't even need to store it at all.
b.engineStatus = es
needUpdateEndpoints := !slices.Equal(s.LocalAddrs, b.endpoints)
if needUpdateEndpoints {
b.endpoints = append([]tailcfg.Endpoint{}, s.LocalAddrs...)
}
if cc != nil { if cc != nil {
if needUpdateEndpoints { cc.UpdateEndpoints(eps)
cc.UpdateEndpoints(s.LocalAddrs) b.stateMachineLocked()
b.endpoints = append([]tailcfg.Endpoint{}, eps...)
} }
}
func (b *LocalBackend) onDERPConnChange(c topics.DERPConnChange) {
b.mu.Lock()
defer b.mu.Unlock()
b.liveDERPs = c.LiveDERPs
if b.state == ipn.Starting {
b.stateMachineLocked() b.stateMachineLocked()
} }
b.sendLocked(ipn.Notify{Engine: &es})
} }
// SetNotifyCallback sets the function to call when the backend has something to // SetNotifyCallback sets the function to call when the backend has something to
@ -3214,15 +3175,27 @@ func appendHealthActions(fn func(roNotify *ipn.Notify) (keepGoing bool)) func(*i
} }
} }
// pollRequestEngineStatus calls b.e.RequestStatus every 2 seconds until ctx // pollRequestEngineStatus calls b.e.RequestStatus every 2 seconds until ctx is
// is done. // done.
//
// TODO(bradfitz): this is all too heavy and doesn't scale with large numbers of
// clients. See tailscale/tailscale#1909, tailscale/tailscale#13392,
// tailscale/tailscale#13392.
func (b *LocalBackend) pollRequestEngineStatus(ctx context.Context) { func (b *LocalBackend) pollRequestEngineStatus(ctx context.Context) {
ticker, tickerChannel := b.clock.NewTicker(2 * time.Second) ticker, tickerChannel := b.clock.NewTicker(2 * time.Second)
defer ticker.Stop() defer ticker.Stop()
var last *wgengine.Status
for { for {
select { select {
case <-tickerChannel: case <-tickerChannel:
b.e.RequestStatus() st := b.e.GetStatus()
if reflect.DeepEqual(last, st) {
continue
}
b.mu.Lock()
stBusForm := b.parseWgStatusLocked(st)
b.mu.Unlock()
b.send(ipn.Notify{Engine: &stBusForm})
case <-ctx.Done(): case <-ctx.Done():
return return
} }
@ -5660,8 +5633,6 @@ func (b *LocalBackend) enterStateLocked(newState ipn.State) {
} }
case ipn.Starting, ipn.NeedsMachineAuth: case ipn.Starting, ipn.NeedsMachineAuth:
b.authReconfigLocked() b.authReconfigLocked()
// Needed so that UpdateEndpoints can run
b.goTracker.Go(b.e.RequestStatus)
case ipn.Running: case ipn.Running:
if feature.CanSystemdStatus { if feature.CanSystemdStatus {
var addrStrs []string var addrStrs []string
@ -5703,7 +5674,6 @@ func (b *LocalBackend) nextStateLocked() ipn.State {
netMap = cn.NetMap() netMap = cn.NetMap()
state = b.state state = b.state
blocked = b.blocked blocked = b.blocked
st = b.engineStatus
keyExpired = b.keyExpired keyExpired = b.keyExpired
wantRunning = false wantRunning = false
@ -5754,7 +5724,7 @@ func (b *LocalBackend) nextStateLocked() ipn.State {
// (if we get here, we know MachineAuthorized == true) // (if we get here, we know MachineAuthorized == true)
return ipn.Starting return ipn.Starting
case state == ipn.Starting: case state == ipn.Starting:
if st.NumLive > 0 || st.LiveDERPs > 0 { if b.e.NumConfiguredPeers() > 0 || b.liveDERPs > 0 {
return ipn.Running return ipn.Running
} else { } else {
return state return state
@ -5782,8 +5752,7 @@ func (b *LocalBackend) stateMachineLocked() {
// b.mu must be held. // b.mu must be held.
func (b *LocalBackend) stopEngineAndWaitLocked() { func (b *LocalBackend) stopEngineAndWaitLocked() {
b.logf("stopEngineAndWait...") b.logf("stopEngineAndWait...")
st, _ := b.e.ResetAndStop() // TODO: what should we do if this returns an error? b.e.ResetAndStop()
b.setWgengineStatusLocked(st)
b.logf("stopEngineAndWait: done.") b.logf("stopEngineAndWait: done.")
} }

@ -39,6 +39,7 @@ import (
"tailscale.com/types/netmap" "tailscale.com/types/netmap"
"tailscale.com/types/persist" "tailscale.com/types/persist"
"tailscale.com/types/preftype" "tailscale.com/types/preftype"
"tailscale.com/types/topics"
"tailscale.com/util/dnsname" "tailscale.com/util/dnsname"
"tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/eventbus/eventbustest"
"tailscale.com/util/mak" "tailscale.com/util/mak"
@ -1004,7 +1005,7 @@ func runTestStateMachine(t *testing.T, seamless bool) {
} }
notifies.expect(1) notifies.expect(1)
// Fake a DERP connection. // Fake a DERP connection.
b.setWgengineStatus(&wgengine.Status{DERPs: 1, AsOf: time.Now()}, nil) b.onDERPConnChange(topics.DERPConnChange{RegionID: 1, Connected: true, LiveDERPs: 1})
{ {
nn := notifies.drain(1) nn := notifies.drain(1)
cc.assertCalls() cc.assertCalls()
@ -1144,11 +1145,11 @@ func TestWGEngineStatusRace(t *testing.T) {
wg.Add(1) wg.Add(1)
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
n := 0
if i == 0 { if i == 0 {
n = 1 b.onDERPConnChange(topics.DERPConnChange{RegionID: 1, Connected: true, LiveDERPs: 1})
} else {
b.onDERPConnChange(topics.DERPConnChange{RegionID: 1, Connected: false, LiveDERPs: 0})
} }
b.setWgengineStatus(&wgengine.Status{AsOf: time.Now(), DERPs: n}, nil)
}(i) }(i)
} }
wg.Wait() wg.Wait()
@ -1615,7 +1616,7 @@ func runTestSendPreservesAuthURL(t *testing.T, seamless bool) {
}}) }})
t.Logf("Running") t.Logf("Running")
b.setWgengineStatus(&wgengine.Status{AsOf: time.Now(), DERPs: 1}, nil) b.onDERPConnChange(topics.DERPConnChange{RegionID: 1, Connected: true, LiveDERPs: 1})
t.Logf("Re-auth (StartLoginInteractive)") t.Logf("Re-auth (StartLoginInteractive)")
b.StartLoginInteractive(t.Context()) b.StartLoginInteractive(t.Context())
@ -1781,10 +1782,9 @@ type mockEngine struct {
cfg *wgcfg.Config cfg *wgcfg.Config
routerCfg *router.Config routerCfg *router.Config
dnsCfg *dns.Config dnsCfg *dns.Config
status *wgengine.Status
filter, jailedFilter *filter.Filter filter, jailedFilter *filter.Filter
statusCb wgengine.StatusCallback
} }
func newMockEngine() *mockEngine { func newMockEngine() *mockEngine {
@ -1805,6 +1805,24 @@ func (e *mockEngine) Reconfig(cfg *wgcfg.Config, routerCfg *router.Config, dnsCf
return nil return nil
} }
func (e *mockEngine) GetStatus() *wgengine.Status {
e.mu.Lock()
defer e.mu.Unlock()
if e.status == nil {
return &wgengine.Status{}
}
return e.status
}
func (e *mockEngine) NumConfiguredPeers() int {
e.mu.Lock()
defer e.mu.Unlock()
if e.status == nil {
return 0
}
return len(e.status.Peers)
}
func (e *mockEngine) Config() *wgcfg.Config { func (e *mockEngine) Config() *wgcfg.Config {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
@ -1851,27 +1869,12 @@ func (e *mockEngine) SetJailedFilter(f *filter.Filter) {
e.mu.Unlock() e.mu.Unlock()
} }
func (e *mockEngine) SetStatusCallback(cb wgengine.StatusCallback) {
e.mu.Lock()
e.statusCb = cb
e.mu.Unlock()
}
func (e *mockEngine) RequestStatus() {
e.mu.Lock()
cb := e.statusCb
e.mu.Unlock()
if cb != nil {
cb(&wgengine.Status{AsOf: time.Now()}, nil)
}
}
func (e *mockEngine) ResetAndStop() (*wgengine.Status, error) { func (e *mockEngine) ResetAndStop() (*wgengine.Status, error) {
err := e.Reconfig(&wgcfg.Config{}, &router.Config{}, &dns.Config{}) err := e.Reconfig(&wgcfg.Config{}, &router.Config{}, &dns.Config{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &wgengine.Status{AsOf: time.Now()}, nil return &wgengine.Status{}, nil
} }
func (e *mockEngine) PeerByKey(key.NodePublic) (_ wgint.Peer, ok bool) { func (e *mockEngine) PeerByKey(key.NodePublic) (_ wgint.Peer, ok bool) {

@ -245,6 +245,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware)
tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/result from tailscale.com/util/lineiter
tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/structs from tailscale.com/control/controlclient+
tailscale.com/types/tkatype from tailscale.com/client/local+ tailscale.com/types/tkatype from tailscale.com/client/local+
tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+
tailscale.com/types/views from tailscale.com/appc+ tailscale.com/types/views from tailscale.com/appc+
tailscale.com/util/backoff from tailscale.com/control/controlclient+ tailscale.com/util/backoff from tailscale.com/control/controlclient+
tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+

@ -0,0 +1,31 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package topics defines event types used with the eventbus.
package topics
import (
"tailscale.com/tailcfg"
"tailscale.com/types/key"
)
// DERPConnChange is published when the set of DERP connections changes.
type DERPConnChange struct {
RegionID int // DERP region ID
Connected bool // true for connected, false for disconnected
LiveDERPs int // total number of live DERP connections after this change
}
// EndpointsChanged is published when magicsock's endpoints change.
type EndpointsChanged []tailcfg.Endpoint
// TUNStatusChange is published when the TUN device goes up or down.
type TUNStatusChange struct {
Up bool // true if TUN is up, false if down
}
// PeerRecvActivity is published periodically when a packet is received from a peer.
// This is called no more than once every 10 seconds per peer.
type PeerRecvActivity struct {
PeerKey key.NodePublic
}

@ -27,6 +27,7 @@ import (
"tailscale.com/tstime/mono" "tailscale.com/tstime/mono"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/topics"
"tailscale.com/util/backoff" "tailscale.com/util/backoff"
"tailscale.com/util/mak" "tailscale.com/util/mak"
"tailscale.com/util/rands" "tailscale.com/util/rands"
@ -396,6 +397,11 @@ func (c *Conn) derpWriteChanForRegion(regionID int, peer key.NodePublic) chan de
*ad.lastWrite = time.Now() *ad.lastWrite = time.Now()
ad.createTime = time.Now() ad.createTime = time.Now()
c.activeDerp[regionID] = ad c.activeDerp[regionID] = ad
c.derpConnChangePub.Publish(topics.DERPConnChange{
RegionID: regionID,
Connected: true,
LiveDERPs: len(c.activeDerp),
})
metricNumDERPConns.Set(int64(len(c.activeDerp))) metricNumDERPConns.Set(int64(len(c.activeDerp)))
c.logActiveDerpLocked() c.logActiveDerpLocked()
c.setPeerLastDerpLocked(peer, regionID, regionID) c.setPeerLastDerpLocked(peer, regionID, regionID)
@ -424,8 +430,6 @@ func (c *Conn) derpWriteChanForRegion(regionID int, peer key.NodePublic) chan de
go c.runDerpReader(ctx, regionID, dc, wg, startGate) go c.runDerpReader(ctx, regionID, dc, wg, startGate)
go c.runDerpWriter(ctx, dc, ch, wg, startGate) go c.runDerpWriter(ctx, dc, ch, wg, startGate)
go c.derpActiveFunc()
return ad.writeCh return ad.writeCh
} }
@ -874,6 +878,11 @@ func (c *Conn) closeDerpLocked(regionID int, why string) {
go ad.c.Close() go ad.c.Close()
ad.cancel() ad.cancel()
delete(c.activeDerp, regionID) delete(c.activeDerp, regionID)
c.derpConnChangePub.Publish(topics.DERPConnChange{
RegionID: regionID,
Connected: false,
LiveDERPs: len(c.activeDerp),
})
metricNumDERPConns.Set(int64(len(c.activeDerp))) metricNumDERPConns.Set(int64(len(c.activeDerp)))
} }
} }

@ -58,6 +58,7 @@ import (
"tailscale.com/types/netlogfunc" "tailscale.com/types/netlogfunc"
"tailscale.com/types/netmap" "tailscale.com/types/netmap"
"tailscale.com/types/nettype" "tailscale.com/types/nettype"
"tailscale.com/types/topics"
"tailscale.com/types/views" "tailscale.com/types/views"
"tailscale.com/util/clientmetric" "tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus" "tailscale.com/util/eventbus"
@ -158,8 +159,6 @@ type Conn struct {
eventBus *eventbus.Bus eventBus *eventbus.Bus
eventClient *eventbus.Client eventClient *eventbus.Client
logf logger.Logf logf logger.Logf
epFunc func([]tailcfg.Endpoint)
derpActiveFunc func()
idleFunc func() time.Duration // nil means unknown idleFunc func() time.Duration // nil means unknown
testOnlyPacketListener nettype.PacketListener testOnlyPacketListener nettype.PacketListener
noteRecvActivity func(key.NodePublic) // or nil, see Options.NoteRecvActivity noteRecvActivity func(key.NodePublic) // or nil, see Options.NoteRecvActivity
@ -181,6 +180,8 @@ type Conn struct {
syncPub *eventbus.Publisher[syncPoint] syncPub *eventbus.Publisher[syncPoint]
allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq] allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq]
portUpdatePub *eventbus.Publisher[router.PortUpdate] portUpdatePub *eventbus.Publisher[router.PortUpdate]
derpConnChangePub *eventbus.Publisher[topics.DERPConnChange]
epChangePub *eventbus.Publisher[topics.EndpointsChanged]
// pconn4 and pconn6 are the underlying UDP sockets used to // pconn4 and pconn6 are the underlying UDP sockets used to
// send/receive packets for wireguard and other magicsock // send/receive packets for wireguard and other magicsock
@ -446,14 +447,6 @@ type Options struct {
// Zero means to pick one automatically. // Zero means to pick one automatically.
Port uint16 Port uint16
// EndpointsFunc optionally provides a func to be called when
// endpoints change. The called func does not own the slice.
EndpointsFunc func([]tailcfg.Endpoint)
// DERPActiveFunc optionally provides a func to be called when
// a connection is made to a DERP server.
DERPActiveFunc func()
// IdleFunc optionally provides a func to return how long // IdleFunc optionally provides a func to return how long
// it's been since a TUN packet was sent or received. // it's been since a TUN packet was sent or received.
IdleFunc func() time.Duration IdleFunc func() time.Duration
@ -507,20 +500,6 @@ func (o *Options) logf() logger.Logf {
return o.Logf return o.Logf
} }
func (o *Options) endpointsFunc() func([]tailcfg.Endpoint) {
if o == nil || o.EndpointsFunc == nil {
return func([]tailcfg.Endpoint) {}
}
return o.EndpointsFunc
}
func (o *Options) derpActiveFunc() func() {
if o == nil || o.DERPActiveFunc == nil {
return func() {}
}
return o.DERPActiveFunc
}
// NodeViewsUpdate represents an update event of [tailcfg.NodeView] for all // NodeViewsUpdate represents an update event of [tailcfg.NodeView] for all
// nodes. This event is published over an [eventbus.Bus]. It may be published // nodes. This event is published over an [eventbus.Bus]. It may be published
// with an invalid SelfNode, and/or zero/nil Peers. [magicsock.Conn] is the sole // with an invalid SelfNode, and/or zero/nil Peers. [magicsock.Conn] is the sole
@ -686,8 +665,6 @@ func NewConn(opts Options) (*Conn, error) {
c.eventBus = opts.EventBus c.eventBus = opts.EventBus
c.port.Store(uint32(opts.Port)) c.port.Store(uint32(opts.Port))
c.controlKnobs = opts.ControlKnobs c.controlKnobs = opts.ControlKnobs
c.epFunc = opts.endpointsFunc()
c.derpActiveFunc = opts.derpActiveFunc()
c.idleFunc = opts.IdleFunc c.idleFunc = opts.IdleFunc
c.testOnlyPacketListener = opts.TestOnlyPacketListener c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity c.noteRecvActivity = opts.NoteRecvActivity
@ -699,6 +676,8 @@ func NewConn(opts Options) (*Conn, error) {
c.syncPub = eventbus.Publish[syncPoint](ec) c.syncPub = eventbus.Publish[syncPoint](ec)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec) c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec)
c.portUpdatePub = eventbus.Publish[router.PortUpdate](ec) c.portUpdatePub = eventbus.Publish[router.PortUpdate](ec)
c.derpConnChangePub = eventbus.Publish[topics.DERPConnChange](ec)
c.epChangePub = eventbus.Publish[topics.EndpointsChanged](ec)
eventbus.SubscribeFunc(ec, c.onPortMapChanged) eventbus.SubscribeFunc(ec, c.onPortMapChanged)
eventbus.SubscribeFunc(ec, c.onFilterUpdate) eventbus.SubscribeFunc(ec, c.onFilterUpdate)
eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate) eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate)
@ -973,7 +952,7 @@ func (c *Conn) updateEndpoints(why string) {
if c.setEndpoints(endpoints) { if c.setEndpoints(endpoints) {
c.logEndpointChange(endpoints) c.logEndpointChange(endpoints)
c.epFunc(endpoints) c.epChangePub.Publish(topics.EndpointsChanged(endpoints))
} }
} }

@ -15,7 +15,6 @@ import (
"net/netip" "net/netip"
"reflect" "reflect"
"runtime" "runtime"
"slices"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -46,6 +45,7 @@ import (
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/netmap" "tailscale.com/types/netmap"
"tailscale.com/types/topics"
"tailscale.com/types/views" "tailscale.com/types/views"
"tailscale.com/util/backoff" "tailscale.com/util/backoff"
"tailscale.com/util/checkchange" "tailscale.com/util/checkchange"
@ -97,6 +97,8 @@ type userspaceEngine struct {
// eventBus will eventually become required, but for now may be nil. // eventBus will eventually become required, but for now may be nil.
eventBus *eventbus.Bus eventBus *eventbus.Bus
eventClient *eventbus.Client eventClient *eventbus.Client
tunStatusPub *eventbus.Publisher[topics.TUNStatusChange]
peerRecvActivityPub *eventbus.Publisher[topics.PeerRecvActivity]
logf logger.Logf logf logger.Logf
wgLogger *wglog.Logger // a wireguard-go logging wrapper wgLogger *wglog.Logger // a wireguard-go logging wrapper
@ -145,7 +147,6 @@ type userspaceEngine struct {
mu sync.Mutex // guards following; see lock order comment below mu sync.Mutex // guards following; see lock order comment below
netMap *netmap.NetworkMap // or nil netMap *netmap.NetworkMap // or nil
closing bool // Close was called (even if we're still closing) closing bool // Close was called (even if we're still closing)
statusCallback StatusCallback
peerSequence views.Slice[key.NodePublic] peerSequence views.Slice[key.NodePublic]
endpoints []tailcfg.Endpoint endpoints []tailcfg.Endpoint
pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go
@ -391,19 +392,10 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
logf("link state: %+v", e.netMon.InterfaceState()) logf("link state: %+v", e.netMon.InterfaceState())
endpointsFn := func(endpoints []tailcfg.Endpoint) {
e.mu.Lock()
e.endpoints = append(e.endpoints[:0], endpoints...)
e.mu.Unlock()
e.RequestStatus()
}
magicsockOpts := magicsock.Options{ magicsockOpts := magicsock.Options{
EventBus: e.eventBus, EventBus: e.eventBus,
Logf: logf, Logf: logf,
Port: conf.ListenPort, Port: conf.ListenPort,
EndpointsFunc: endpointsFn,
DERPActiveFunc: e.RequestStatus,
IdleFunc: e.tundev.IdleDuration, IdleFunc: e.tundev.IdleDuration,
NetMon: e.netMon, NetMon: e.netMon,
HealthTracker: e.health, HealthTracker: e.health,
@ -476,22 +468,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
} }
}) })
go func() {
up := false
for event := range e.tundev.EventsUpDown() {
if event&tun.EventUp != 0 && !up {
e.logf("external route: up")
e.RequestStatus()
up = true
}
if event&tun.EventDown != 0 && up {
e.logf("external route: down")
e.RequestStatus()
up = false
}
}
}()
go func() { go func() {
select { select {
case <-e.wgdev.Wait(): case <-e.wgdev.Wait():
@ -547,10 +523,29 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
e.linkChange(&cd) e.linkChange(&cd)
}) })
e.eventClient = ec e.eventClient = ec
e.tunStatusPub = eventbus.Publish[topics.TUNStatusChange](ec)
e.peerRecvActivityPub = eventbus.Publish[topics.PeerRecvActivity](ec)
go e.publishTUNSTatusLoop()
e.logf("Engine created.") e.logf("Engine created.")
return e, nil return e, nil
} }
func (e *userspaceEngine) publishTUNSTatusLoop() {
up := false
for event := range e.tundev.EventsUpDown() {
if event&tun.EventUp != 0 && !up {
e.logf("external route: up")
e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true})
up = true
}
if event&tun.EventDown != 0 && up {
e.logf("external route: down")
e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true})
up = false
}
}
}
// echoRespondToAll is an inbound post-filter responding to all echo requests. // echoRespondToAll is an inbound post-filter responding to all echo requests.
func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) { func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) {
if p.IsEchoRequest() { if p.IsEchoRequest() {
@ -673,7 +668,7 @@ func (e *userspaceEngine) noteRecvActivity(nk key.NodePublic) {
// tailscaled alone did not, hence this. // tailscaled alone did not, hence this.
if e.lastStatusPollTime.IsZero() || now.Sub(e.lastStatusPollTime) >= statusPollInterval { if e.lastStatusPollTime.IsZero() || now.Sub(e.lastStatusPollTime) >= statusPollInterval {
e.lastStatusPollTime = now e.lastStatusPollTime = now
go e.RequestStatus() e.peerRecvActivityPub.Publish(topics.PeerRecvActivity{PeerKey: nk})
} }
// If the last activity time jumped a bunch (say, at least // If the last activity time jumped a bunch (say, at least
@ -940,10 +935,7 @@ func (e *userspaceEngine) ResetAndStop() (*Status, error) {
} }
bo := backoff.NewBackoff("UserspaceEngineResetAndStop", e.logf, 1*time.Second) bo := backoff.NewBackoff("UserspaceEngineResetAndStop", e.logf, 1*time.Second)
for { for {
st, err := e.getStatus() st := e.GetStatus()
if err != nil {
return nil, err
}
if len(st.Peers) == 0 && st.DERPs == 0 { if len(st.Peers) == 0 && st.DERPs == 0 {
return st, nil return st, nil
} }
@ -1180,18 +1172,6 @@ func (e *userspaceEngine) SetJailedFilter(filt *filter.Filter) {
e.tundev.SetJailedFilter(filt) e.tundev.SetJailedFilter(filt)
} }
func (e *userspaceEngine) SetStatusCallback(cb StatusCallback) {
e.mu.Lock()
defer e.mu.Unlock()
e.statusCallback = cb
}
func (e *userspaceEngine) getStatusCallback() StatusCallback {
e.mu.Lock()
defer e.mu.Unlock()
return e.statusCallback
}
var ErrEngineClosing = errors.New("engine closing; no status") var ErrEngineClosing = errors.New("engine closing; no status")
func (e *userspaceEngine) PeerByKey(pubKey key.NodePublic) (_ wgint.Peer, ok bool) { func (e *userspaceEngine) PeerByKey(pubKey key.NodePublic) (_ wgint.Peer, ok bool) {
@ -1221,7 +1201,13 @@ func (e *userspaceEngine) getPeerStatusLite(pk key.NodePublic) (status ipnstate.
return status, true return status, true
} }
func (e *userspaceEngine) getStatus() (*Status, error) { func (e *userspaceEngine) NumConfiguredPeers() int {
e.mu.Lock()
defer e.mu.Unlock()
return e.peerSequence.Len()
}
func (e *userspaceEngine) GetStatus() *Status {
// Grab derpConns before acquiring wgLock to not violate lock ordering; // Grab derpConns before acquiring wgLock to not violate lock ordering;
// the DERPs method acquires magicsock.Conn.mu. // the DERPs method acquires magicsock.Conn.mu.
// (See comment in userspaceEngine's declaration.) // (See comment in userspaceEngine's declaration.)
@ -1230,60 +1216,22 @@ func (e *userspaceEngine) getStatus() (*Status, error) {
e.mu.Lock() e.mu.Lock()
closing := e.closing closing := e.closing
peerKeys := e.peerSequence peerKeys := e.peerSequence
localAddrs := slices.Clone(e.endpoints)
e.mu.Unlock() e.mu.Unlock()
if closing { if closing {
return nil, ErrEngineClosing return new(Status)
}
peers := make([]ipnstate.PeerStatusLite, 0, peerKeys.Len())
for _, key := range peerKeys.All() {
if status, ok := e.getPeerStatusLite(key); ok {
peers = append(peers, status)
}
} }
return &Status{ st := &Status{
AsOf: time.Now(),
LocalAddrs: localAddrs,
Peers: peers,
DERPs: derpConns, DERPs: derpConns,
}, nil Peers: make([]ipnstate.PeerStatusLite, 0, peerKeys.Len()),
}
func (e *userspaceEngine) RequestStatus() {
// This is slightly tricky. e.getStatus() can theoretically get
// blocked inside wireguard for a while, and RequestStatus() is
// sometimes called from a goroutine, so we don't want a lot of
// them hanging around. On the other hand, requesting multiple
// status updates simultaneously is pointless anyway; they will
// all say the same thing.
// Enqueue at most one request. If one is in progress already, this
// adds one more to the queue. If one has been requested but not
// started, it is a no-op.
select {
case e.reqCh <- struct{}{}:
default:
}
// Dequeue at most one request. Another thread may have already
// dequeued the request we enqueued above, which is fine, since the
// information is guaranteed to be at least as recent as the current
// call to RequestStatus().
select {
case <-e.reqCh:
s, err := e.getStatus()
if s == nil && err == nil {
e.logf("[unexpected] RequestStatus: both s and err are nil")
return
} }
if cb := e.getStatusCallback(); cb != nil { for _, key := range peerKeys.All() {
cb(s, err) if status, ok := e.getPeerStatusLite(key); ok {
st.Peers = append(st.Peers, status)
} }
default:
} }
return st
} }
func (e *userspaceEngine) Close() { func (e *userspaceEngine) Close() {
@ -1388,11 +1336,7 @@ func (e *userspaceEngine) SetNetworkMap(nm *netmap.NetworkMap) {
} }
func (e *userspaceEngine) UpdateStatus(sb *ipnstate.StatusBuilder) { func (e *userspaceEngine) UpdateStatus(sb *ipnstate.StatusBuilder) {
st, err := e.getStatus() st := e.GetStatus()
if err != nil {
e.logf("wgengine: getStatus: %v", err)
return
}
if sb.WantPeers { if sb.WantPeers {
for _, ps := range st.Peers { for _, ps := range st.Peers {
sb.AddPeer(ps.NodeKey, &ipnstate.PeerStatus{ sb.AddPeer(ps.NodeKey, &ipnstate.PeerStatus{

@ -142,14 +142,16 @@ func (e *watchdogEngine) GetJailedFilter() *filter.Filter {
func (e *watchdogEngine) SetJailedFilter(filt *filter.Filter) { func (e *watchdogEngine) SetJailedFilter(filt *filter.Filter) {
e.watchdog("SetJailedFilter", func() { e.wrap.SetJailedFilter(filt) }) e.watchdog("SetJailedFilter", func() { e.wrap.SetJailedFilter(filt) })
} }
func (e *watchdogEngine) SetStatusCallback(cb StatusCallback) {
e.watchdog("SetStatusCallback", func() { e.wrap.SetStatusCallback(cb) })
}
func (e *watchdogEngine) UpdateStatus(sb *ipnstate.StatusBuilder) { func (e *watchdogEngine) UpdateStatus(sb *ipnstate.StatusBuilder) {
e.watchdog("UpdateStatus", func() { e.wrap.UpdateStatus(sb) }) e.watchdog("UpdateStatus", func() { e.wrap.UpdateStatus(sb) })
} }
func (e *watchdogEngine) RequestStatus() { func (e *watchdogEngine) GetStatus() (st *Status) {
e.watchdog("RequestStatus", func() { e.wrap.RequestStatus() }) e.watchdog("GetStatus", func() { st = e.wrap.GetStatus() })
return st
}
func (e *watchdogEngine) NumConfiguredPeers() (n int) {
e.watchdog("NumConfiguredPeers", func() { n = e.wrap.NumConfiguredPeers() })
return n
} }
func (e *watchdogEngine) SetNetworkMap(nm *netmap.NetworkMap) { func (e *watchdogEngine) SetNetworkMap(nm *netmap.NetworkMap) {
e.watchdog("SetNetworkMap", func() { e.wrap.SetNetworkMap(nm) }) e.watchdog("SetNetworkMap", func() { e.wrap.SetNetworkMap(nm) })

@ -7,7 +7,6 @@ package wgengine
import ( import (
"errors" "errors"
"net/netip" "net/netip"
"time"
"tailscale.com/ipn/ipnstate" "tailscale.com/ipn/ipnstate"
"tailscale.com/net/dns" "tailscale.com/net/dns"
@ -24,19 +23,12 @@ import (
// Status is the Engine status. // Status is the Engine status.
// //
// TODO(bradfitz): remove this, subset of ipnstate? Need to migrate users. // TODO(bradfitz): remove this, subset of ipnstate? Need to migrate users.
// TODO(bradfitz): at least view-ify this?
type Status struct { type Status struct {
AsOf time.Time // the time at which the status was calculated
Peers []ipnstate.PeerStatusLite Peers []ipnstate.PeerStatusLite
LocalAddrs []tailcfg.Endpoint // the set of possible endpoints for the magic conn
DERPs int // number of active DERP connections DERPs int // number of active DERP connections
} }
// StatusCallback is the type of status callbacks used by
// Engine.SetStatusCallback.
//
// Exactly one of Status or error is non-nil.
type StatusCallback func(*Status, error)
// NetworkMapCallback is the type used by callbacks that hook // NetworkMapCallback is the type used by callbacks that hook
// into network map updates. // into network map updates.
type NetworkMapCallback func(*netmap.NetworkMap) type NetworkMapCallback func(*netmap.NetworkMap)
@ -93,13 +85,12 @@ type Engine interface {
// SetJailedFilter updates the packet filter for jailed nodes. // SetJailedFilter updates the packet filter for jailed nodes.
SetJailedFilter(*filter.Filter) SetJailedFilter(*filter.Filter)
// SetStatusCallback sets the function to call when the // GetStatus returns the current Engine status.
// WireGuard status changes. GetStatus() *Status
SetStatusCallback(StatusCallback)
// RequestStatus requests a WireGuard status update right // NumConfiguredPeers returns the number of currently configured peers,
// away, sent to the callback registered via SetStatusCallback. // regardless of activity.
RequestStatus() NumConfiguredPeers() int
// PeerByKey returns the WireGuard status of the provided peer. // PeerByKey returns the WireGuard status of the provided peer.
// If the peer is not found, ok is false. // If the peer is not found, ok is false.

Loading…
Cancel
Save