diff --git a/cmd/k8s-operator/depaware.txt b/cmd/k8s-operator/depaware.txt index b800b78c6..344369979 100644 --- a/cmd/k8s-operator/depaware.txt +++ b/cmd/k8s-operator/depaware.txt @@ -845,6 +845,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/ tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/tkatype from tailscale.com/client/local+ + tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+ tailscale.com/types/views from tailscale.com/appc+ tailscale.com/util/backoff from tailscale.com/cmd/k8s-operator+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ diff --git a/cmd/tailscaled/depaware-min.txt b/cmd/tailscaled/depaware-min.txt index e750f86e6..9f70ecb59 100644 --- a/cmd/tailscaled/depaware-min.txt +++ b/cmd/tailscaled/depaware-min.txt @@ -141,6 +141,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/tkatype from tailscale.com/control/controlclient+ + tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+ tailscale.com/types/views from tailscale.com/appc+ tailscale.com/util/backoff from tailscale.com/control/controlclient+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ diff --git a/cmd/tailscaled/depaware-minbox.txt b/cmd/tailscaled/depaware-minbox.txt index 17f1a22b2..3f2c8c966 100644 --- a/cmd/tailscaled/depaware-minbox.txt +++ b/cmd/tailscaled/depaware-minbox.txt @@ -168,6 +168,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/tkatype from tailscale.com/control/controlclient+ + tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+ tailscale.com/types/views from tailscale.com/appc+ tailscale.com/util/backoff from tailscale.com/control/controlclient+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index 1b5bdab91..4f1797832 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -413,6 +413,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/tkatype from tailscale.com/tka+ + tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+ tailscale.com/types/views from tailscale.com/ipn/ipnlocal+ tailscale.com/util/backoff from tailscale.com/cmd/tailscaled+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ diff --git a/cmd/tsidp/depaware.txt b/cmd/tsidp/depaware.txt index 21ca122c4..d053bafdf 100644 --- a/cmd/tsidp/depaware.txt +++ b/cmd/tsidp/depaware.txt @@ -250,6 +250,7 @@ tailscale.com/cmd/tsidp dependencies: (generated by github.com/tailscale/depawar tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/tkatype from tailscale.com/client/local+ + tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+ tailscale.com/types/views from tailscale.com/appc+ tailscale.com/util/backoff from tailscale.com/control/controlclient+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index f0a77531b..0b065e7cb 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -81,6 +81,7 @@ import ( "tailscale.com/types/persist" "tailscale.com/types/preftype" "tailscale.com/types/ptr" + "tailscale.com/types/topics" "tailscale.com/types/views" "tailscale.com/util/checkchange" "tailscale.com/util/clientmetric" @@ -287,7 +288,6 @@ type LocalBackend struct { hostinfo *tailcfg.Hostinfo // TODO(nickkhyl): move to nodeBackend nmExpiryTimer tstime.TimerController // for updating netMap on node expiry; can be nil; TODO(nickkhyl): move to nodeBackend activeLogin string // last logged LoginName from netMap; TODO(nickkhyl): move to nodeBackend (or remove? it's in [ipn.LoginProfile]). - engineStatus ipn.EngineStatus endpoints []tailcfg.Endpoint blocked bool keyExpired bool // TODO(nickkhyl): move to nodeBackend @@ -300,10 +300,11 @@ type LocalBackend struct { peerAPIListeners []*peerAPIListener loginFlags controlclient.LoginFlags notifyWatchers map[string]*watchSession // by session ID - lastStatusTime time.Time // status.AsOf value of the last processed status update componentLogUntil map[string]componentLogState currentUser ipnauth.Actor + liveDERPs int // number of live DERP connections, per eventbus notification + // capForcedNetfilter is the netfilter that control instructs Linux clients // to use, unless overridden locally. capForcedNetfilter string // TODO(nickkhyl): move to nodeBackend @@ -558,8 +559,6 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo b.setTCPPortsIntercepted(nil) - b.e.SetStatusCallback(b.setWgengineStatus) - b.prevIfState = netMon.InterfaceState() // Call our linkChange code once with the current state. // Following changes are triggered via the eventbus. @@ -590,6 +589,8 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend") b.eventClient = ec eventbus.SubscribeFunc(ec, b.onClientVersion) + eventbus.SubscribeFunc(ec, b.onEndpointsChange) + eventbus.SubscribeFunc(ec, b.onDERPConnChange) eventbus.SubscribeFunc(ec, func(au controlclient.AutoUpdate) { b.onTailnetDefaultAutoUpdate(au.Value) }) @@ -2251,65 +2252,25 @@ func (b *LocalBackend) resolveExitNodeIPLocked(prefs *ipn.Prefs) (prefsChanged b return prefsChanged } -// setWgengineStatus is the callback by the wireguard engine whenever it posts a new status. -// This updates the endpoints both in the backend and in the control client. -func (b *LocalBackend) setWgengineStatus(s *wgengine.Status, err error) { - if err != nil { - b.logf("wgengine status error: %v", err) - return - } - if s == nil { - b.logf("[unexpected] non-error wgengine update with status=nil: %v", s) - return - } - +func (b *LocalBackend) onEndpointsChange(eps topics.EndpointsChanged) { b.mu.Lock() defer b.mu.Unlock() - // For now, only check this in the callback, but don't check it in setWgengineStatusLocked - if s.AsOf.Before(b.lastStatusTime) { - // Don't process a status update that is older than the one we have - // already processed. (corp#2579) - return - } - b.lastStatusTime = s.AsOf - - b.setWgengineStatusLocked(s) -} - -// setWgengineStatusLocked updates LocalBackend's view of the engine status and -// updates the endpoints both in the backend and in the control client. -// -// Unlike setWgengineStatus it does not discard out-of-order updates, so -// statuses sent here are always processed. This is useful for ensuring we don't -// miss a "we shut down" status during backend shutdown even if other statuses -// arrive out of order. -// -// TODO(zofrex): we should ensure updates actually do arrive in order and move -// the out-of-order check into this function. -// -// b.mu must be held. -func (b *LocalBackend) setWgengineStatusLocked(s *wgengine.Status) { - es := b.parseWgStatusLocked(s) cc := b.cc - - // TODO(zofrex): the only reason we even write this is to transition from - // "Starting" to "Running" in the call to state machine a few lines below - // this. Maybe we don't even need to store it at all. - b.engineStatus = es - - needUpdateEndpoints := !slices.Equal(s.LocalAddrs, b.endpoints) - if needUpdateEndpoints { - b.endpoints = append([]tailcfg.Endpoint{}, s.LocalAddrs...) + if cc != nil { + cc.UpdateEndpoints(eps) + b.stateMachineLocked() + b.endpoints = append([]tailcfg.Endpoint{}, eps...) } +} - if cc != nil { - if needUpdateEndpoints { - cc.UpdateEndpoints(s.LocalAddrs) - } +func (b *LocalBackend) onDERPConnChange(c topics.DERPConnChange) { + b.mu.Lock() + defer b.mu.Unlock() + b.liveDERPs = c.LiveDERPs + if b.state == ipn.Starting { b.stateMachineLocked() } - b.sendLocked(ipn.Notify{Engine: &es}) } // SetNotifyCallback sets the function to call when the backend has something to @@ -3214,15 +3175,27 @@ func appendHealthActions(fn func(roNotify *ipn.Notify) (keepGoing bool)) func(*i } } -// pollRequestEngineStatus calls b.e.RequestStatus every 2 seconds until ctx -// is done. +// pollRequestEngineStatus calls b.e.RequestStatus every 2 seconds until ctx is +// done. +// +// TODO(bradfitz): this is all too heavy and doesn't scale with large numbers of +// clients. See tailscale/tailscale#1909, tailscale/tailscale#13392, +// tailscale/tailscale#13392. func (b *LocalBackend) pollRequestEngineStatus(ctx context.Context) { ticker, tickerChannel := b.clock.NewTicker(2 * time.Second) defer ticker.Stop() + var last *wgengine.Status for { select { case <-tickerChannel: - b.e.RequestStatus() + st := b.e.GetStatus() + if reflect.DeepEqual(last, st) { + continue + } + b.mu.Lock() + stBusForm := b.parseWgStatusLocked(st) + b.mu.Unlock() + b.send(ipn.Notify{Engine: &stBusForm}) case <-ctx.Done(): return } @@ -5660,8 +5633,6 @@ func (b *LocalBackend) enterStateLocked(newState ipn.State) { } case ipn.Starting, ipn.NeedsMachineAuth: b.authReconfigLocked() - // Needed so that UpdateEndpoints can run - b.goTracker.Go(b.e.RequestStatus) case ipn.Running: if feature.CanSystemdStatus { var addrStrs []string @@ -5703,7 +5674,6 @@ func (b *LocalBackend) nextStateLocked() ipn.State { netMap = cn.NetMap() state = b.state blocked = b.blocked - st = b.engineStatus keyExpired = b.keyExpired wantRunning = false @@ -5754,7 +5724,7 @@ func (b *LocalBackend) nextStateLocked() ipn.State { // (if we get here, we know MachineAuthorized == true) return ipn.Starting case state == ipn.Starting: - if st.NumLive > 0 || st.LiveDERPs > 0 { + if b.e.NumConfiguredPeers() > 0 || b.liveDERPs > 0 { return ipn.Running } else { return state @@ -5782,8 +5752,7 @@ func (b *LocalBackend) stateMachineLocked() { // b.mu must be held. func (b *LocalBackend) stopEngineAndWaitLocked() { b.logf("stopEngineAndWait...") - st, _ := b.e.ResetAndStop() // TODO: what should we do if this returns an error? - b.setWgengineStatusLocked(st) + b.e.ResetAndStop() b.logf("stopEngineAndWait: done.") } diff --git a/ipn/ipnlocal/state_test.go b/ipn/ipnlocal/state_test.go index 2197112b2..54fb9402b 100644 --- a/ipn/ipnlocal/state_test.go +++ b/ipn/ipnlocal/state_test.go @@ -39,6 +39,7 @@ import ( "tailscale.com/types/netmap" "tailscale.com/types/persist" "tailscale.com/types/preftype" + "tailscale.com/types/topics" "tailscale.com/util/dnsname" "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/mak" @@ -1004,7 +1005,7 @@ func runTestStateMachine(t *testing.T, seamless bool) { } notifies.expect(1) // Fake a DERP connection. - b.setWgengineStatus(&wgengine.Status{DERPs: 1, AsOf: time.Now()}, nil) + b.onDERPConnChange(topics.DERPConnChange{RegionID: 1, Connected: true, LiveDERPs: 1}) { nn := notifies.drain(1) cc.assertCalls() @@ -1144,11 +1145,11 @@ func TestWGEngineStatusRace(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - n := 0 if i == 0 { - n = 1 + b.onDERPConnChange(topics.DERPConnChange{RegionID: 1, Connected: true, LiveDERPs: 1}) + } else { + b.onDERPConnChange(topics.DERPConnChange{RegionID: 1, Connected: false, LiveDERPs: 0}) } - b.setWgengineStatus(&wgengine.Status{AsOf: time.Now(), DERPs: n}, nil) }(i) } wg.Wait() @@ -1615,7 +1616,7 @@ func runTestSendPreservesAuthURL(t *testing.T, seamless bool) { }}) t.Logf("Running") - b.setWgengineStatus(&wgengine.Status{AsOf: time.Now(), DERPs: 1}, nil) + b.onDERPConnChange(topics.DERPConnChange{RegionID: 1, Connected: true, LiveDERPs: 1}) t.Logf("Re-auth (StartLoginInteractive)") b.StartLoginInteractive(t.Context()) @@ -1781,10 +1782,9 @@ type mockEngine struct { cfg *wgcfg.Config routerCfg *router.Config dnsCfg *dns.Config + status *wgengine.Status filter, jailedFilter *filter.Filter - - statusCb wgengine.StatusCallback } func newMockEngine() *mockEngine { @@ -1805,6 +1805,24 @@ func (e *mockEngine) Reconfig(cfg *wgcfg.Config, routerCfg *router.Config, dnsCf return nil } +func (e *mockEngine) GetStatus() *wgengine.Status { + e.mu.Lock() + defer e.mu.Unlock() + if e.status == nil { + return &wgengine.Status{} + } + return e.status +} + +func (e *mockEngine) NumConfiguredPeers() int { + e.mu.Lock() + defer e.mu.Unlock() + if e.status == nil { + return 0 + } + return len(e.status.Peers) +} + func (e *mockEngine) Config() *wgcfg.Config { e.mu.Lock() defer e.mu.Unlock() @@ -1851,27 +1869,12 @@ func (e *mockEngine) SetJailedFilter(f *filter.Filter) { e.mu.Unlock() } -func (e *mockEngine) SetStatusCallback(cb wgengine.StatusCallback) { - e.mu.Lock() - e.statusCb = cb - e.mu.Unlock() -} - -func (e *mockEngine) RequestStatus() { - e.mu.Lock() - cb := e.statusCb - e.mu.Unlock() - if cb != nil { - cb(&wgengine.Status{AsOf: time.Now()}, nil) - } -} - func (e *mockEngine) ResetAndStop() (*wgengine.Status, error) { err := e.Reconfig(&wgcfg.Config{}, &router.Config{}, &dns.Config{}) if err != nil { return nil, err } - return &wgengine.Status{AsOf: time.Now()}, nil + return &wgengine.Status{}, nil } func (e *mockEngine) PeerByKey(key.NodePublic) (_ wgint.Peer, ok bool) { diff --git a/tsnet/depaware.txt b/tsnet/depaware.txt index cf91aa483..692c126af 100644 --- a/tsnet/depaware.txt +++ b/tsnet/depaware.txt @@ -245,6 +245,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware) tailscale.com/types/result from tailscale.com/util/lineiter tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/tkatype from tailscale.com/client/local+ + tailscale.com/types/topics from tailscale.com/ipn/ipnlocal+ tailscale.com/types/views from tailscale.com/appc+ tailscale.com/util/backoff from tailscale.com/control/controlclient+ tailscale.com/util/checkchange from tailscale.com/ipn/ipnlocal+ diff --git a/types/topics/topics.go b/types/topics/topics.go new file mode 100644 index 000000000..4080ad53b --- /dev/null +++ b/types/topics/topics.go @@ -0,0 +1,31 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package topics defines event types used with the eventbus. +package topics + +import ( + "tailscale.com/tailcfg" + "tailscale.com/types/key" +) + +// DERPConnChange is published when the set of DERP connections changes. +type DERPConnChange struct { + RegionID int // DERP region ID + Connected bool // true for connected, false for disconnected + LiveDERPs int // total number of live DERP connections after this change +} + +// EndpointsChanged is published when magicsock's endpoints change. +type EndpointsChanged []tailcfg.Endpoint + +// TUNStatusChange is published when the TUN device goes up or down. +type TUNStatusChange struct { + Up bool // true if TUN is up, false if down +} + +// PeerRecvActivity is published periodically when a packet is received from a peer. +// This is called no more than once every 10 seconds per peer. +type PeerRecvActivity struct { + PeerKey key.NodePublic +} diff --git a/wgengine/magicsock/derp.go b/wgengine/magicsock/derp.go index 37a4f1a64..8f44d3977 100644 --- a/wgengine/magicsock/derp.go +++ b/wgengine/magicsock/derp.go @@ -27,6 +27,7 @@ import ( "tailscale.com/tstime/mono" "tailscale.com/types/key" "tailscale.com/types/logger" + "tailscale.com/types/topics" "tailscale.com/util/backoff" "tailscale.com/util/mak" "tailscale.com/util/rands" @@ -396,6 +397,11 @@ func (c *Conn) derpWriteChanForRegion(regionID int, peer key.NodePublic) chan de *ad.lastWrite = time.Now() ad.createTime = time.Now() c.activeDerp[regionID] = ad + c.derpConnChangePub.Publish(topics.DERPConnChange{ + RegionID: regionID, + Connected: true, + LiveDERPs: len(c.activeDerp), + }) metricNumDERPConns.Set(int64(len(c.activeDerp))) c.logActiveDerpLocked() c.setPeerLastDerpLocked(peer, regionID, regionID) @@ -424,8 +430,6 @@ func (c *Conn) derpWriteChanForRegion(regionID int, peer key.NodePublic) chan de go c.runDerpReader(ctx, regionID, dc, wg, startGate) go c.runDerpWriter(ctx, dc, ch, wg, startGate) - go c.derpActiveFunc() - return ad.writeCh } @@ -874,6 +878,11 @@ func (c *Conn) closeDerpLocked(regionID int, why string) { go ad.c.Close() ad.cancel() delete(c.activeDerp, regionID) + c.derpConnChangePub.Publish(topics.DERPConnChange{ + RegionID: regionID, + Connected: false, + LiveDERPs: len(c.activeDerp), + }) metricNumDERPConns.Set(int64(len(c.activeDerp))) } } diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index d44cf1c11..00ea623a2 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -58,6 +58,7 @@ import ( "tailscale.com/types/netlogfunc" "tailscale.com/types/netmap" "tailscale.com/types/nettype" + "tailscale.com/types/topics" "tailscale.com/types/views" "tailscale.com/util/clientmetric" "tailscale.com/util/eventbus" @@ -158,8 +159,6 @@ type Conn struct { eventBus *eventbus.Bus eventClient *eventbus.Client logf logger.Logf - epFunc func([]tailcfg.Endpoint) - derpActiveFunc func() idleFunc func() time.Duration // nil means unknown testOnlyPacketListener nettype.PacketListener noteRecvActivity func(key.NodePublic) // or nil, see Options.NoteRecvActivity @@ -181,6 +180,8 @@ type Conn struct { syncPub *eventbus.Publisher[syncPoint] allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq] portUpdatePub *eventbus.Publisher[router.PortUpdate] + derpConnChangePub *eventbus.Publisher[topics.DERPConnChange] + epChangePub *eventbus.Publisher[topics.EndpointsChanged] // pconn4 and pconn6 are the underlying UDP sockets used to // send/receive packets for wireguard and other magicsock @@ -446,14 +447,6 @@ type Options struct { // Zero means to pick one automatically. Port uint16 - // EndpointsFunc optionally provides a func to be called when - // endpoints change. The called func does not own the slice. - EndpointsFunc func([]tailcfg.Endpoint) - - // DERPActiveFunc optionally provides a func to be called when - // a connection is made to a DERP server. - DERPActiveFunc func() - // IdleFunc optionally provides a func to return how long // it's been since a TUN packet was sent or received. IdleFunc func() time.Duration @@ -507,20 +500,6 @@ func (o *Options) logf() logger.Logf { return o.Logf } -func (o *Options) endpointsFunc() func([]tailcfg.Endpoint) { - if o == nil || o.EndpointsFunc == nil { - return func([]tailcfg.Endpoint) {} - } - return o.EndpointsFunc -} - -func (o *Options) derpActiveFunc() func() { - if o == nil || o.DERPActiveFunc == nil { - return func() {} - } - return o.DERPActiveFunc -} - // NodeViewsUpdate represents an update event of [tailcfg.NodeView] for all // nodes. This event is published over an [eventbus.Bus]. It may be published // with an invalid SelfNode, and/or zero/nil Peers. [magicsock.Conn] is the sole @@ -686,8 +665,6 @@ func NewConn(opts Options) (*Conn, error) { c.eventBus = opts.EventBus c.port.Store(uint32(opts.Port)) c.controlKnobs = opts.ControlKnobs - c.epFunc = opts.endpointsFunc() - c.derpActiveFunc = opts.derpActiveFunc() c.idleFunc = opts.IdleFunc c.testOnlyPacketListener = opts.TestOnlyPacketListener c.noteRecvActivity = opts.NoteRecvActivity @@ -699,6 +676,8 @@ func NewConn(opts Options) (*Conn, error) { c.syncPub = eventbus.Publish[syncPoint](ec) c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec) c.portUpdatePub = eventbus.Publish[router.PortUpdate](ec) + c.derpConnChangePub = eventbus.Publish[topics.DERPConnChange](ec) + c.epChangePub = eventbus.Publish[topics.EndpointsChanged](ec) eventbus.SubscribeFunc(ec, c.onPortMapChanged) eventbus.SubscribeFunc(ec, c.onFilterUpdate) eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate) @@ -973,7 +952,7 @@ func (c *Conn) updateEndpoints(why string) { if c.setEndpoints(endpoints) { c.logEndpointChange(endpoints) - c.epFunc(endpoints) + c.epChangePub.Publish(topics.EndpointsChanged(endpoints)) } } diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 8ad771fc5..9d7449336 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -15,7 +15,6 @@ import ( "net/netip" "reflect" "runtime" - "slices" "strings" "sync" "time" @@ -46,6 +45,7 @@ import ( "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/types/netmap" + "tailscale.com/types/topics" "tailscale.com/types/views" "tailscale.com/util/backoff" "tailscale.com/util/checkchange" @@ -95,8 +95,10 @@ const networkLoggerUploadTimeout = 5 * time.Second type userspaceEngine struct { // eventBus will eventually become required, but for now may be nil. - eventBus *eventbus.Bus - eventClient *eventbus.Client + eventBus *eventbus.Bus + eventClient *eventbus.Client + tunStatusPub *eventbus.Publisher[topics.TUNStatusChange] + peerRecvActivityPub *eventbus.Publisher[topics.PeerRecvActivity] logf logger.Logf wgLogger *wglog.Logger // a wireguard-go logging wrapper @@ -142,13 +144,12 @@ type userspaceEngine struct { lastStatusPollTime mono.Time // last time we polled the engine status reconfigureVPN func() error // or nil - mu sync.Mutex // guards following; see lock order comment below - netMap *netmap.NetworkMap // or nil - closing bool // Close was called (even if we're still closing) - statusCallback StatusCallback - peerSequence views.Slice[key.NodePublic] - endpoints []tailcfg.Endpoint - pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go + mu sync.Mutex // guards following; see lock order comment below + netMap *netmap.NetworkMap // or nil + closing bool // Close was called (even if we're still closing) + peerSequence views.Slice[key.NodePublic] + endpoints []tailcfg.Endpoint + pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go // pongCallback is the map of response handlers waiting for disco or TSMP // pong callbacks. The map key is a random slice of bytes. @@ -391,25 +392,16 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) logf("link state: %+v", e.netMon.InterfaceState()) - endpointsFn := func(endpoints []tailcfg.Endpoint) { - e.mu.Lock() - e.endpoints = append(e.endpoints[:0], endpoints...) - e.mu.Unlock() - - e.RequestStatus() - } magicsockOpts := magicsock.Options{ - EventBus: e.eventBus, - Logf: logf, - Port: conf.ListenPort, - EndpointsFunc: endpointsFn, - DERPActiveFunc: e.RequestStatus, - IdleFunc: e.tundev.IdleDuration, - NetMon: e.netMon, - HealthTracker: e.health, - Metrics: conf.Metrics, - ControlKnobs: conf.ControlKnobs, - PeerByKeyFunc: e.PeerByKey, + EventBus: e.eventBus, + Logf: logf, + Port: conf.ListenPort, + IdleFunc: e.tundev.IdleDuration, + NetMon: e.netMon, + HealthTracker: e.health, + Metrics: conf.Metrics, + ControlKnobs: conf.ControlKnobs, + PeerByKeyFunc: e.PeerByKey, } if buildfeatures.HasLazyWG { magicsockOpts.NoteRecvActivity = e.noteRecvActivity @@ -476,22 +468,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) } }) - go func() { - up := false - for event := range e.tundev.EventsUpDown() { - if event&tun.EventUp != 0 && !up { - e.logf("external route: up") - e.RequestStatus() - up = true - } - if event&tun.EventDown != 0 && up { - e.logf("external route: down") - e.RequestStatus() - up = false - } - } - }() - go func() { select { case <-e.wgdev.Wait(): @@ -547,10 +523,29 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) e.linkChange(&cd) }) e.eventClient = ec + e.tunStatusPub = eventbus.Publish[topics.TUNStatusChange](ec) + e.peerRecvActivityPub = eventbus.Publish[topics.PeerRecvActivity](ec) + go e.publishTUNSTatusLoop() e.logf("Engine created.") return e, nil } +func (e *userspaceEngine) publishTUNSTatusLoop() { + up := false + for event := range e.tundev.EventsUpDown() { + if event&tun.EventUp != 0 && !up { + e.logf("external route: up") + e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true}) + up = true + } + if event&tun.EventDown != 0 && up { + e.logf("external route: down") + e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true}) + up = false + } + } +} + // echoRespondToAll is an inbound post-filter responding to all echo requests. func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) { if p.IsEchoRequest() { @@ -673,7 +668,7 @@ func (e *userspaceEngine) noteRecvActivity(nk key.NodePublic) { // tailscaled alone did not, hence this. if e.lastStatusPollTime.IsZero() || now.Sub(e.lastStatusPollTime) >= statusPollInterval { e.lastStatusPollTime = now - go e.RequestStatus() + e.peerRecvActivityPub.Publish(topics.PeerRecvActivity{PeerKey: nk}) } // If the last activity time jumped a bunch (say, at least @@ -940,10 +935,7 @@ func (e *userspaceEngine) ResetAndStop() (*Status, error) { } bo := backoff.NewBackoff("UserspaceEngineResetAndStop", e.logf, 1*time.Second) for { - st, err := e.getStatus() - if err != nil { - return nil, err - } + st := e.GetStatus() if len(st.Peers) == 0 && st.DERPs == 0 { return st, nil } @@ -1180,18 +1172,6 @@ func (e *userspaceEngine) SetJailedFilter(filt *filter.Filter) { e.tundev.SetJailedFilter(filt) } -func (e *userspaceEngine) SetStatusCallback(cb StatusCallback) { - e.mu.Lock() - defer e.mu.Unlock() - e.statusCallback = cb -} - -func (e *userspaceEngine) getStatusCallback() StatusCallback { - e.mu.Lock() - defer e.mu.Unlock() - return e.statusCallback -} - var ErrEngineClosing = errors.New("engine closing; no status") func (e *userspaceEngine) PeerByKey(pubKey key.NodePublic) (_ wgint.Peer, ok bool) { @@ -1221,7 +1201,13 @@ func (e *userspaceEngine) getPeerStatusLite(pk key.NodePublic) (status ipnstate. return status, true } -func (e *userspaceEngine) getStatus() (*Status, error) { +func (e *userspaceEngine) NumConfiguredPeers() int { + e.mu.Lock() + defer e.mu.Unlock() + return e.peerSequence.Len() +} + +func (e *userspaceEngine) GetStatus() *Status { // Grab derpConns before acquiring wgLock to not violate lock ordering; // the DERPs method acquires magicsock.Conn.mu. // (See comment in userspaceEngine's declaration.) @@ -1230,60 +1216,22 @@ func (e *userspaceEngine) getStatus() (*Status, error) { e.mu.Lock() closing := e.closing peerKeys := e.peerSequence - localAddrs := slices.Clone(e.endpoints) e.mu.Unlock() if closing { - return nil, ErrEngineClosing + return new(Status) } - peers := make([]ipnstate.PeerStatusLite, 0, peerKeys.Len()) + st := &Status{ + DERPs: derpConns, + Peers: make([]ipnstate.PeerStatusLite, 0, peerKeys.Len()), + } for _, key := range peerKeys.All() { if status, ok := e.getPeerStatusLite(key); ok { - peers = append(peers, status) - } - } - - return &Status{ - AsOf: time.Now(), - LocalAddrs: localAddrs, - Peers: peers, - DERPs: derpConns, - }, nil -} - -func (e *userspaceEngine) RequestStatus() { - // This is slightly tricky. e.getStatus() can theoretically get - // blocked inside wireguard for a while, and RequestStatus() is - // sometimes called from a goroutine, so we don't want a lot of - // them hanging around. On the other hand, requesting multiple - // status updates simultaneously is pointless anyway; they will - // all say the same thing. - - // Enqueue at most one request. If one is in progress already, this - // adds one more to the queue. If one has been requested but not - // started, it is a no-op. - select { - case e.reqCh <- struct{}{}: - default: - } - - // Dequeue at most one request. Another thread may have already - // dequeued the request we enqueued above, which is fine, since the - // information is guaranteed to be at least as recent as the current - // call to RequestStatus(). - select { - case <-e.reqCh: - s, err := e.getStatus() - if s == nil && err == nil { - e.logf("[unexpected] RequestStatus: both s and err are nil") - return - } - if cb := e.getStatusCallback(); cb != nil { - cb(s, err) + st.Peers = append(st.Peers, status) } - default: } + return st } func (e *userspaceEngine) Close() { @@ -1388,11 +1336,7 @@ func (e *userspaceEngine) SetNetworkMap(nm *netmap.NetworkMap) { } func (e *userspaceEngine) UpdateStatus(sb *ipnstate.StatusBuilder) { - st, err := e.getStatus() - if err != nil { - e.logf("wgengine: getStatus: %v", err) - return - } + st := e.GetStatus() if sb.WantPeers { for _, ps := range st.Peers { sb.AddPeer(ps.NodeKey, &ipnstate.PeerStatus{ diff --git a/wgengine/watchdog.go b/wgengine/watchdog.go index 9cc4ed3b5..e3e9a024c 100644 --- a/wgengine/watchdog.go +++ b/wgengine/watchdog.go @@ -142,14 +142,16 @@ func (e *watchdogEngine) GetJailedFilter() *filter.Filter { func (e *watchdogEngine) SetJailedFilter(filt *filter.Filter) { e.watchdog("SetJailedFilter", func() { e.wrap.SetJailedFilter(filt) }) } -func (e *watchdogEngine) SetStatusCallback(cb StatusCallback) { - e.watchdog("SetStatusCallback", func() { e.wrap.SetStatusCallback(cb) }) -} func (e *watchdogEngine) UpdateStatus(sb *ipnstate.StatusBuilder) { e.watchdog("UpdateStatus", func() { e.wrap.UpdateStatus(sb) }) } -func (e *watchdogEngine) RequestStatus() { - e.watchdog("RequestStatus", func() { e.wrap.RequestStatus() }) +func (e *watchdogEngine) GetStatus() (st *Status) { + e.watchdog("GetStatus", func() { st = e.wrap.GetStatus() }) + return st +} +func (e *watchdogEngine) NumConfiguredPeers() (n int) { + e.watchdog("NumConfiguredPeers", func() { n = e.wrap.NumConfiguredPeers() }) + return n } func (e *watchdogEngine) SetNetworkMap(nm *netmap.NetworkMap) { e.watchdog("SetNetworkMap", func() { e.wrap.SetNetworkMap(nm) }) diff --git a/wgengine/wgengine.go b/wgengine/wgengine.go index be7873147..04de2bed2 100644 --- a/wgengine/wgengine.go +++ b/wgengine/wgengine.go @@ -7,7 +7,6 @@ package wgengine import ( "errors" "net/netip" - "time" "tailscale.com/ipn/ipnstate" "tailscale.com/net/dns" @@ -24,19 +23,12 @@ import ( // Status is the Engine status. // // TODO(bradfitz): remove this, subset of ipnstate? Need to migrate users. +// TODO(bradfitz): at least view-ify this? type Status struct { - AsOf time.Time // the time at which the status was calculated - Peers []ipnstate.PeerStatusLite - LocalAddrs []tailcfg.Endpoint // the set of possible endpoints for the magic conn - DERPs int // number of active DERP connections + Peers []ipnstate.PeerStatusLite + DERPs int // number of active DERP connections } -// StatusCallback is the type of status callbacks used by -// Engine.SetStatusCallback. -// -// Exactly one of Status or error is non-nil. -type StatusCallback func(*Status, error) - // NetworkMapCallback is the type used by callbacks that hook // into network map updates. type NetworkMapCallback func(*netmap.NetworkMap) @@ -93,13 +85,12 @@ type Engine interface { // SetJailedFilter updates the packet filter for jailed nodes. SetJailedFilter(*filter.Filter) - // SetStatusCallback sets the function to call when the - // WireGuard status changes. - SetStatusCallback(StatusCallback) + // GetStatus returns the current Engine status. + GetStatus() *Status - // RequestStatus requests a WireGuard status update right - // away, sent to the callback registered via SetStatusCallback. - RequestStatus() + // NumConfiguredPeers returns the number of currently configured peers, + // regardless of activity. + NumConfiguredPeers() int // PeerByKey returns the WireGuard status of the provided peer. // If the peer is not found, ok is false.