control/controlclient: convert PeersChanged nodes to patches internally

So even if the server doesn't support sending patches (neither the
Tailscale control server nor Headscale yet do), this makes the client
convert a changed node to its diff so the diffs can be processed
individually in a follow-up change.

This lets us make progress on #1909 without adding a dependency on
finishing the server-side part, and also means other control servers
will get the same upcoming optimizations.

And add some clientmetrics while here.

Updates #1909

Change-Id: I9533bcb8bba5227e17389f0b10dff71f33ee54ec
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/9119/head
Brad Fitzpatrick 1 year ago committed by Brad Fitzpatrick
parent 67e48d9285
commit a79b1d23b8

@ -5,9 +5,15 @@ package controlclient
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"net"
"net/netip" "net/netip"
"reflect"
"slices"
"sort" "sort"
"strconv"
"sync"
"tailscale.com/envknob" "tailscale.com/envknob"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
@ -17,6 +23,7 @@ import (
"tailscale.com/types/netmap" "tailscale.com/types/netmap"
"tailscale.com/types/ptr" "tailscale.com/types/ptr"
"tailscale.com/types/views" "tailscale.com/types/views"
"tailscale.com/util/clientmetric"
"tailscale.com/util/cmpx" "tailscale.com/util/cmpx"
"tailscale.com/wgengine/filter" "tailscale.com/wgengine/filter"
) )
@ -183,6 +190,8 @@ func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *t
// Call Node.InitDisplayNames on any changed nodes. // Call Node.InitDisplayNames on any changed nodes.
initDisplayNames(cmpx.Or(resp.Node.View(), ms.lastNode), resp) initDisplayNames(cmpx.Or(resp.Node.View(), ms.lastNode), resp)
ms.patchifyPeersChanged(resp)
ms.updateStateFromResponse(resp) ms.updateStateFromResponse(resp)
nm := ms.netmap() nm := ms.netmap()
@ -278,6 +287,22 @@ func (ms *mapSession) updateStateFromResponse(resp *tailcfg.MapResponse) {
} }
} }
var (
patchDERPRegion = clientmetric.NewCounter("controlclient_patch_derp")
patchEndpoints = clientmetric.NewCounter("controlclient_patch_endpoints")
patchCap = clientmetric.NewCounter("controlclient_patch_capver")
patchKey = clientmetric.NewCounter("controlclient_patch_key")
patchDiscoKey = clientmetric.NewCounter("controlclient_patch_discokey")
patchOnline = clientmetric.NewCounter("controlclient_patch_online")
patchLastSeen = clientmetric.NewCounter("controlclient_patch_lastseen")
patchKeyExpiry = clientmetric.NewCounter("controlclient_patch_keyexpiry")
patchCapabilities = clientmetric.NewCounter("controlclient_patch_capabilities")
patchKeySignature = clientmetric.NewCounter("controlclient_patch_keysig")
patchifiedPeer = clientmetric.NewCounter("controlclient_patchified_peer")
patchifiedPeerEqual = clientmetric.NewCounter("controlclient_patchified_peer_equal")
)
// updatePeersStateFromResponseres updates ms.peers and ms.sortedPeers from res. It takes ownership of res. // updatePeersStateFromResponseres updates ms.peers and ms.sortedPeers from res. It takes ownership of res.
func (ms *mapSession) updatePeersStateFromResponse(resp *tailcfg.MapResponse) (stats updateStats) { func (ms *mapSession) updatePeersStateFromResponse(resp *tailcfg.MapResponse) (stats updateStats) {
defer func() { defer func() {
@ -362,33 +387,43 @@ func (ms *mapSession) updatePeersStateFromResponse(resp *tailcfg.MapResponse) (s
mut := vp.AsStruct() mut := vp.AsStruct()
if pc.DERPRegion != 0 { if pc.DERPRegion != 0 {
mut.DERP = fmt.Sprintf("%s:%v", tailcfg.DerpMagicIP, pc.DERPRegion) mut.DERP = fmt.Sprintf("%s:%v", tailcfg.DerpMagicIP, pc.DERPRegion)
patchDERPRegion.Add(1)
} }
if pc.Cap != 0 { if pc.Cap != 0 {
mut.Cap = pc.Cap mut.Cap = pc.Cap
patchCap.Add(1)
} }
if pc.Endpoints != nil { if pc.Endpoints != nil {
mut.Endpoints = pc.Endpoints mut.Endpoints = pc.Endpoints
patchEndpoints.Add(1)
} }
if pc.Key != nil { if pc.Key != nil {
mut.Key = *pc.Key mut.Key = *pc.Key
patchKey.Add(1)
} }
if pc.DiscoKey != nil { if pc.DiscoKey != nil {
mut.DiscoKey = *pc.DiscoKey mut.DiscoKey = *pc.DiscoKey
patchDiscoKey.Add(1)
} }
if v := pc.Online; v != nil { if v := pc.Online; v != nil {
mut.Online = ptr.To(*v) mut.Online = ptr.To(*v)
patchOnline.Add(1)
} }
if v := pc.LastSeen; v != nil { if v := pc.LastSeen; v != nil {
mut.LastSeen = ptr.To(*v) mut.LastSeen = ptr.To(*v)
patchLastSeen.Add(1)
} }
if v := pc.KeyExpiry; v != nil { if v := pc.KeyExpiry; v != nil {
mut.KeyExpiry = *v mut.KeyExpiry = *v
patchKeyExpiry.Add(1)
} }
if v := pc.Capabilities; v != nil { if v := pc.Capabilities; v != nil {
mut.Capabilities = *v mut.Capabilities = *v
patchCapabilities.Add(1)
} }
if v := pc.KeySignature; v != nil { if v := pc.KeySignature; v != nil {
mut.KeySignature = v mut.KeySignature = v
patchKeySignature.Add(1)
} }
*vp = mut.View() *vp = mut.View()
} }
@ -428,6 +463,217 @@ func (ms *mapSession) addUserProfile(nm *netmap.NetworkMap, userID tailcfg.UserI
} }
} }
var debugPatchifyPeer = envknob.RegisterBool("TS_DEBUG_PATCHIFY_PEER")
// patchifyPeersChanged mutates resp to promote PeersChanged entries to PeersChangedPatch
// when possible.
func (ms *mapSession) patchifyPeersChanged(resp *tailcfg.MapResponse) {
filtered := resp.PeersChanged[:0]
for _, n := range resp.PeersChanged {
if p, ok := ms.patchifyPeer(n); ok {
patchifiedPeer.Add(1)
if debugPatchifyPeer() {
patchj, _ := json.Marshal(p)
ms.logf("debug: patchifyPeer[ID=%v]: %s", n.ID, patchj)
}
if p != nil {
resp.PeersChangedPatch = append(resp.PeersChangedPatch, p)
} else {
patchifiedPeerEqual.Add(1)
}
} else {
filtered = append(filtered, n)
}
}
resp.PeersChanged = filtered
if len(resp.PeersChanged) == 0 {
resp.PeersChanged = nil
}
}
var nodeFields = sync.OnceValue(getNodeFields)
// getNodeFields returns the fails of tailcfg.Node.
func getNodeFields() []string {
rt := reflect.TypeOf((*tailcfg.Node)(nil)).Elem()
ret := make([]string, rt.NumField())
for i := 0; i < rt.NumField(); i++ {
ret[i] = rt.Field(i).Name
}
return ret
}
// patchifyPeer returns a *tailcfg.PeerChange of the session's existing copy of
// the n.ID Node to n.
//
// It returns ok=false if a patch can't be made, (V, ok) on a delta, or (nil,
// true) if all the fields were identical (a zero change).
func (ms *mapSession) patchifyPeer(n *tailcfg.Node) (_ *tailcfg.PeerChange, ok bool) {
was, ok := ms.peers[n.ID]
if !ok {
return nil, false
}
return peerChangeDiff(*was, n)
}
// peerChangeDiff returns the difference from 'was' to 'n', if possible.
//
// It returns (nil, true) if the fields were identical.
func peerChangeDiff(was tailcfg.NodeView, n *tailcfg.Node) (_ *tailcfg.PeerChange, ok bool) {
var ret *tailcfg.PeerChange
pc := func() *tailcfg.PeerChange {
if ret == nil {
ret = new(tailcfg.PeerChange)
}
return ret
}
for _, field := range nodeFields() {
switch field {
default:
// The whole point of using reflect in this function is to panic
// here in tests if we forget to handle a new field.
panic("unhandled field: " + field)
case "computedHostIfDifferent", "ComputedName", "ComputedNameWithHost":
// Caller's responsibility to have populated these.
continue
case "DataPlaneAuditLogID":
// Not sent for peers.
case "ID":
if was.ID() != n.ID {
return nil, false
}
case "StableID":
if was.StableID() != n.StableID {
return nil, false
}
case "Name":
if was.Name() != n.Name {
return nil, false
}
case "User":
if was.User() != n.User {
return nil, false
}
case "Sharer":
if was.Sharer() != n.Sharer {
return nil, false
}
case "Key":
if was.Key() != n.Key {
pc().Key = ptr.To(n.Key)
}
case "KeyExpiry":
if !was.KeyExpiry().Equal(n.KeyExpiry) {
pc().KeyExpiry = ptr.To(n.KeyExpiry)
}
case "KeySignature":
if !was.KeySignature().Equal(n.KeySignature) {
pc().KeySignature = slices.Clone(n.KeySignature)
}
case "Machine":
if was.Machine() != n.Machine {
return nil, false
}
case "DiscoKey":
if was.DiscoKey() != n.DiscoKey {
pc().DiscoKey = ptr.To(n.DiscoKey)
}
case "Addresses":
if !views.SliceEqual(was.Addresses(), views.SliceOf(n.Addresses)) {
return nil, false
}
case "AllowedIPs":
if !views.SliceEqual(was.AllowedIPs(), views.SliceOf(n.AllowedIPs)) {
return nil, false
}
case "Endpoints":
if !views.SliceEqual(was.Endpoints(), views.SliceOf(n.Endpoints)) {
pc().Endpoints = slices.Clone(n.Endpoints)
}
case "DERP":
if was.DERP() != n.DERP {
ip, portStr, err := net.SplitHostPort(n.DERP)
if err != nil || ip != "127.3.3.40" {
return nil, false
}
port, err := strconv.Atoi(portStr)
if err != nil || port < 1 || port > 65535 {
return nil, false
}
pc().DERPRegion = port
}
case "Hostinfo":
if !was.Hostinfo().Valid() && !n.Hostinfo.Valid() {
continue
}
if !was.Hostinfo().Valid() || !n.Hostinfo.Valid() {
return nil, false
}
if !was.Hostinfo().Equal(n.Hostinfo) {
return nil, false
}
case "Created":
if !was.Created().Equal(n.Created) {
return nil, false
}
case "Cap":
if was.Cap() != n.Cap {
pc().Cap = n.Cap
}
case "Tags":
if !views.SliceEqual(was.Tags(), views.SliceOf(n.Tags)) {
return nil, false
}
case "PrimaryRoutes":
if !views.SliceEqual(was.PrimaryRoutes(), views.SliceOf(n.PrimaryRoutes)) {
return nil, false
}
case "Online":
wasOnline := was.Online()
if n.Online != nil && wasOnline != nil && *n.Online != *wasOnline {
pc().Online = ptr.To(*n.Online)
}
case "LastSeen":
wasSeen := was.LastSeen()
if n.LastSeen != nil && wasSeen != nil && !wasSeen.Equal(*n.LastSeen) {
pc().LastSeen = ptr.To(*n.LastSeen)
}
case "MachineAuthorized":
if was.MachineAuthorized() != n.MachineAuthorized {
return nil, false
}
case "Capabilities":
if !views.SliceEqual(was.Capabilities(), views.SliceOf(n.Capabilities)) {
pc().Capabilities = ptr.To(n.Capabilities)
}
case "UnsignedPeerAPIOnly":
if was.UnsignedPeerAPIOnly() != n.UnsignedPeerAPIOnly {
return nil, false
}
case "IsWireGuardOnly":
if was.IsWireGuardOnly() != n.IsWireGuardOnly {
return nil, false
}
case "Expired":
if was.Expired() != n.Expired {
return nil, false
}
case "SelfNodeV4MasqAddrForThisPeer":
va, vb := was.SelfNodeV4MasqAddrForThisPeer(), n.SelfNodeV4MasqAddrForThisPeer
if va == nil && vb == nil {
continue
}
if va == nil || vb == nil || *va != *vb {
return nil, false
}
}
}
if ret != nil {
ret.NodeID = n.ID
}
return ret, true
}
// netmap returns a fully populated NetworkMap from the last state seen from // netmap returns a fully populated NetworkMap from the last state seen from
// a call to updateStateFromResponse, filling in omitted // a call to updateStateFromResponse, filling in omitted
// information from prior MapResponse values. // information from prior MapResponse values.

@ -4,6 +4,7 @@
package controlclient package controlclient
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -14,6 +15,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"go4.org/mem" "go4.org/mem"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/tstest" "tailscale.com/tstest"
@ -642,6 +644,132 @@ func TestDeltaDERPMap(t *testing.T) {
} }
} }
func TestPeerChangeDiff(t *testing.T) {
tests := []struct {
name string
a, b *tailcfg.Node
want *tailcfg.PeerChange // nil means want ok=false, unless wantEqual is set
wantEqual bool // means test wants (nil, true)
}{
{
name: "eq",
a: &tailcfg.Node{ID: 1},
b: &tailcfg.Node{ID: 1},
wantEqual: true,
},
{
name: "patch-derp",
a: &tailcfg.Node{ID: 1, DERP: "127.3.3.40:1"},
b: &tailcfg.Node{ID: 1, DERP: "127.3.3.40:2"},
want: &tailcfg.PeerChange{NodeID: 1, DERPRegion: 2},
},
{
name: "patch-endpoints",
a: &tailcfg.Node{ID: 1, Endpoints: []string{"10.0.0.1:1"}},
b: &tailcfg.Node{ID: 1, Endpoints: []string{"10.0.0.2:2"}},
want: &tailcfg.PeerChange{NodeID: 1, Endpoints: []string{"10.0.0.2:2"}},
},
{
name: "patch-cap",
a: &tailcfg.Node{ID: 1, Cap: 1},
b: &tailcfg.Node{ID: 1, Cap: 2},
want: &tailcfg.PeerChange{NodeID: 1, Cap: 2},
},
{
name: "patch-lastseen",
a: &tailcfg.Node{ID: 1, LastSeen: ptr.To(time.Unix(1, 0))},
b: &tailcfg.Node{ID: 1, LastSeen: ptr.To(time.Unix(2, 0))},
want: &tailcfg.PeerChange{NodeID: 1, LastSeen: ptr.To(time.Unix(2, 0))},
},
{
name: "patch-capabilities-to-nonempty",
a: &tailcfg.Node{ID: 1, Capabilities: []string{"foo"}},
b: &tailcfg.Node{ID: 1, Capabilities: []string{"bar"}},
want: &tailcfg.PeerChange{NodeID: 1, Capabilities: ptr.To([]string{"bar"})},
},
{
name: "patch-capabilities-to-empty",
a: &tailcfg.Node{ID: 1, Capabilities: []string{"foo"}},
b: &tailcfg.Node{ID: 1},
want: &tailcfg.PeerChange{NodeID: 1, Capabilities: ptr.To([]string(nil))},
},
{
name: "patch-online-to-true",
a: &tailcfg.Node{ID: 1, Online: ptr.To(false)},
b: &tailcfg.Node{ID: 1, Online: ptr.To(true)},
want: &tailcfg.PeerChange{NodeID: 1, Online: ptr.To(true)},
},
{
name: "patch-online-to-false",
a: &tailcfg.Node{ID: 1, Online: ptr.To(true)},
b: &tailcfg.Node{ID: 1, Online: ptr.To(false)},
want: &tailcfg.PeerChange{NodeID: 1, Online: ptr.To(false)},
},
{
name: "mix-patchable-and-not",
a: &tailcfg.Node{ID: 1, Cap: 1},
b: &tailcfg.Node{ID: 1, Cap: 2, StableID: "foo"},
want: nil,
},
{
name: "miss-change-stableid",
a: &tailcfg.Node{ID: 1},
b: &tailcfg.Node{ID: 1, StableID: "diff"},
want: nil,
},
{
name: "miss-change-id",
a: &tailcfg.Node{ID: 1},
b: &tailcfg.Node{ID: 2},
want: nil,
},
{
name: "miss-change-name",
a: &tailcfg.Node{ID: 1, Name: "foo"},
b: &tailcfg.Node{ID: 1, Name: "bar"},
want: nil,
},
{
name: "miss-change-user",
a: &tailcfg.Node{ID: 1, User: 1},
b: &tailcfg.Node{ID: 1, User: 2},
want: nil,
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pc, ok := peerChangeDiff(tt.a.View(), tt.b)
if tt.wantEqual {
if !ok || pc != nil {
t.Errorf("got (%p, %v); want (nil, true); pc=%v", pc, ok, must.Get(json.Marshal(pc)))
}
return
}
if (pc != nil) != ok {
t.Fatalf("inconsistent ok=%v, pc=%p", ok, pc)
}
gotj := must.Get(json.Marshal(pc))
wantj := must.Get(json.Marshal(tt.want))
if !bytes.Equal(gotj, wantj) {
t.Errorf("mismatch\n got: %s\nwant: %s\n", gotj, wantj)
}
})
}
}
func TestPeerChangeDiffAllocs(t *testing.T) {
a := &tailcfg.Node{ID: 1}
b := &tailcfg.Node{ID: 1}
n := testing.AllocsPerRun(10000, func() {
diff, ok := peerChangeDiff(a.View(), b)
if !ok || diff != nil {
t.Fatalf("unexpected result: (%s, %v)", must.Get(json.Marshal(diff)), ok)
}
})
if n != 0 {
t.Errorf("allocs = %v; want 0", int(n))
}
}
type countingNetmapUpdater struct { type countingNetmapUpdater struct {
full atomic.Int64 full atomic.Int64
} }
@ -650,6 +778,80 @@ func (nu *countingNetmapUpdater) UpdateFullNetmap(nm *netmap.NetworkMap) {
nu.full.Add(1) nu.full.Add(1)
} }
// tests (*mapSession).patchifyPeersChanged; smaller tests are in TestPeerChangeDiff
func TestPatchifyPeersChanged(t *testing.T) {
hi := (&tailcfg.Hostinfo{}).View()
tests := []struct {
name string
mr0 *tailcfg.MapResponse // initial
mr1 *tailcfg.MapResponse // incremental
want *tailcfg.MapResponse // what the incremental one should've been mutated to
}{
{
name: "change_one_endpoint",
mr0: &tailcfg.MapResponse{
Node: &tailcfg.Node{Name: "foo.bar.ts.net."},
Peers: []*tailcfg.Node{
{ID: 1, Hostinfo: hi},
},
},
mr1: &tailcfg.MapResponse{
PeersChanged: []*tailcfg.Node{
{ID: 1, Endpoints: []string{"10.0.0.1:1111"}, Hostinfo: hi},
},
},
want: &tailcfg.MapResponse{
PeersChanged: nil,
PeersChangedPatch: []*tailcfg.PeerChange{
{NodeID: 1, Endpoints: []string{"10.0.0.1:1111"}},
},
},
},
{
name: "change_some",
mr0: &tailcfg.MapResponse{
Node: &tailcfg.Node{Name: "foo.bar.ts.net."},
Peers: []*tailcfg.Node{
{ID: 1, DERP: "127.3.3.40:1", Hostinfo: hi},
{ID: 2, DERP: "127.3.3.40:2", Hostinfo: hi},
{ID: 3, DERP: "127.3.3.40:3", Hostinfo: hi},
},
},
mr1: &tailcfg.MapResponse{
PeersChanged: []*tailcfg.Node{
{ID: 1, DERP: "127.3.3.40:11", Hostinfo: hi},
{ID: 2, StableID: "other-change", Hostinfo: hi},
{ID: 3, DERP: "127.3.3.40:33", Hostinfo: hi},
{ID: 4, DERP: "127.3.3.40:4", Hostinfo: hi},
},
},
want: &tailcfg.MapResponse{
PeersChanged: []*tailcfg.Node{
{ID: 2, StableID: "other-change", Hostinfo: hi},
{ID: 4, DERP: "127.3.3.40:4", Hostinfo: hi},
},
PeersChangedPatch: []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 11},
{NodeID: 3, DERPRegion: 33},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nu := &countingNetmapUpdater{}
ms := newTestMapSession(t, nu)
ms.updateStateFromResponse(tt.mr0)
mr1 := new(tailcfg.MapResponse)
must.Do(json.Unmarshal(must.Get(json.Marshal(tt.mr1)), mr1))
ms.patchifyPeersChanged(mr1)
if diff := cmp.Diff(tt.want, mr1); diff != "" {
t.Errorf("wrong result (-want +got):\n%s", diff)
}
})
}
}
func BenchmarkMapSessionDelta(b *testing.B) { func BenchmarkMapSessionDelta(b *testing.B) {
for _, size := range []int{10, 100, 1_000, 10_000} { for _, size := range []int{10, 100, 1_000, 10_000} {
b.Run(fmt.Sprintf("size_%d", size), func(b *testing.B) { b.Run(fmt.Sprintf("size_%d", size), func(b *testing.B) {
@ -700,5 +902,4 @@ func BenchmarkMapSessionDelta(b *testing.B) {
} }
}) })
} }
} }

Loading…
Cancel
Save