From 58ffe928af19c06664d7d91befae474e20a6a73b Mon Sep 17 00:00:00 2001 From: Tom DNetto Date: Tue, 27 Sep 2022 12:30:04 -0700 Subject: [PATCH] ipn/ipnlocal, tka: Implement TKA synchronization with the control plane Signed-off-by: Tom DNetto --- ipn/ipnlocal/network-lock.go | 200 ++++++++++++++++++++++++- ipn/ipnlocal/network-lock_test.go | 234 +++++++++++++++++++++++++++++- tka/tailchonk.go | 3 + tka/tailchonk_test.go | 12 ++ 4 files changed, 442 insertions(+), 7 deletions(-) diff --git a/ipn/ipnlocal/network-lock.go b/ipn/ipnlocal/network-lock.go index 56ee9b1ff..32007543b 100644 --- a/ipn/ipnlocal/network-lock.go +++ b/ipn/ipnlocal/network-lock.go @@ -55,6 +55,7 @@ func (b *LocalBackend) tkaSyncIfNeededLocked(nm *netmap.NetworkMap) error { // If the feature flag is not enabled, pretend we don't exist. return nil } + ourNodeKey := b.prefs.Persist.PrivateNodeKey.Public() isEnabled := b.tka != nil wantEnabled := nm.TKAEnabled @@ -66,17 +67,16 @@ func (b *LocalBackend) tkaSyncIfNeededLocked(nm *netmap.NetworkMap) error { // Regardless of whether we are moving to disabled or enabled, we // need information from the tka bootstrap endpoint. - ourNodeKey := b.prefs.Persist.PrivateNodeKey.Public() b.mu.Unlock() bs, err := b.tkaFetchBootstrap(ourNodeKey, ourHead) b.mu.Lock() if err != nil { - return fmt.Errorf("fetching bootstrap: %v", err) + return fmt.Errorf("fetching bootstrap: %w", err) } if wantEnabled && !isEnabled { if err := b.tkaBootstrapFromGenesisLocked(bs.GenesisAUM); err != nil { - return fmt.Errorf("bootstrap: %v", err) + return fmt.Errorf("bootstrap: %w", err) } isEnabled = true } else if !wantEnabled && isEnabled { @@ -96,7 +96,96 @@ func (b *LocalBackend) tkaSyncIfNeededLocked(nm *netmap.NetworkMap) error { } if isEnabled && b.tka.authority.Head() != nm.TKAHead { - // TODO(tom): Implement sync + if err := b.tkaSyncLocked(ourNodeKey); err != nil { + return fmt.Errorf("tka sync: %w", err) + } + } + + return nil +} + +func toSyncOffer(head string, ancestors []string) (tka.SyncOffer, error) { + var out tka.SyncOffer + if err := out.Head.UnmarshalText([]byte(head)); err != nil { + return tka.SyncOffer{}, fmt.Errorf("head.UnmarshalText: %v", err) + } + out.Ancestors = make([]tka.AUMHash, len(ancestors)) + for i, a := range ancestors { + if err := out.Ancestors[i].UnmarshalText([]byte(a)); err != nil { + return tka.SyncOffer{}, fmt.Errorf("ancestor[%d].UnmarshalText: %v", i, err) + } + } + return out, nil +} + +// tkaSyncLocked synchronizes TKA state with control. b.mu must be held +// and tka must be initialized. b.mu will be stepped out of (and back into) +// during network RPCs. +func (b *LocalBackend) tkaSyncLocked(ourNodeKey key.NodePublic) error { + offer, err := b.tka.authority.SyncOffer(b.tka.storage) + if err != nil { + return fmt.Errorf("offer: %w", err) + } + + b.mu.Unlock() + offerResp, err := b.tkaDoSyncOffer(ourNodeKey, offer) + b.mu.Lock() + if err != nil { + return fmt.Errorf("offer RPC: %w", err) + } + controlOffer, err := toSyncOffer(offerResp.Head, offerResp.Ancestors) + if err != nil { + return fmt.Errorf("control offer: %v", err) + } + + if controlOffer.Head == offer.Head { + // We are up to date. + return nil + } + + // Compute missing AUMs before we apply any AUMs from the control-plane, + // so we still submit AUMs to control even if they are not part of the + // active chain. + toSendAUMs, err := b.tka.authority.MissingAUMs(b.tka.storage, controlOffer) + if err != nil { + return fmt.Errorf("computing missing AUMs: %w", err) + } + + // If we got this far, then we are not up to date. Either the control-plane + // has updates for us, or we have updates for the control plane. + // + // TODO(tom): Do we want to keep processing even if the Inform fails? Need + // to think through if theres holdback concerns here or not. + if len(offerResp.MissingAUMs) > 0 { + aums := make([]tka.AUM, len(offerResp.MissingAUMs)) + for i, a := range offerResp.MissingAUMs { + if err := aums[i].Unserialize(a); err != nil { + return fmt.Errorf("MissingAUMs[%d]: %v", i, err) + } + } + + if err := b.tka.authority.Inform(b.tka.storage, aums); err != nil { + return fmt.Errorf("inform failed: %v", err) + } + } + + // NOTE(tom): We could short-circuit here if our HEAD equals the + // control-plane's head, but we don't just so control always has a + // copy of all forks that clients had. + + b.mu.Unlock() + sendResp, err := b.tkaDoSyncSend(ourNodeKey, toSendAUMs) + b.mu.Lock() + if err != nil { + return fmt.Errorf("send RPC: %v", err) + } + + var remoteHead tka.AUMHash + if err := remoteHead.UnmarshalText([]byte(sendResp.Head)); err != nil { + return fmt.Errorf("head unmarshal: %v", err) + } + if remoteHead != b.tka.authority.Head() { + b.logf("TKA desync: expected consensus after sync but our head is %v and the control plane's is %v", b.tka.authority.Head(), remoteHead) } return nil @@ -405,3 +494,106 @@ func (b *LocalBackend) tkaFetchBootstrap(ourNodeKey key.NodePublic, head tka.AUM return a, nil } + +func fromSyncOffer(offer tka.SyncOffer) (head string, ancestors []string, err error) { + headBytes, err := offer.Head.MarshalText() + if err != nil { + return "", nil, fmt.Errorf("head.MarshalText: %v", err) + } + + ancestors = make([]string, len(offer.Ancestors)) + for i, ancestor := range offer.Ancestors { + hash, err := ancestor.MarshalText() + if err != nil { + return "", nil, fmt.Errorf("ancestor[%d].MarshalText: %v", i, err) + } + ancestors[i] = string(hash) + } + return string(headBytes), ancestors, nil +} + +// tkaDoSyncOffer sends a /machine/tka/sync/offer RPC to the control plane +// over noise. This is the first of two RPCs implementing tka synchronization. +func (b *LocalBackend) tkaDoSyncOffer(ourNodeKey key.NodePublic, offer tka.SyncOffer) (*tailcfg.TKASyncOfferResponse, error) { + head, ancestors, err := fromSyncOffer(offer) + if err != nil { + return nil, fmt.Errorf("encoding offer: %v", err) + } + syncReq := tailcfg.TKASyncOfferRequest{ + Version: tailcfg.CurrentCapabilityVersion, + NodeKey: ourNodeKey, + Head: head, + Ancestors: ancestors, + } + + var req bytes.Buffer + if err := json.NewEncoder(&req).Encode(syncReq); err != nil { + return nil, fmt.Errorf("encoding request: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + req2, err := http.NewRequestWithContext(ctx, "GET", "https://unused/machine/tka/sync/offer", &req) + if err != nil { + return nil, fmt.Errorf("req: %w", err) + } + res, err := b.DoNoiseRequest(req2) + if err != nil { + return nil, fmt.Errorf("resp: %w", err) + } + if res.StatusCode != 200 { + body, _ := io.ReadAll(res.Body) + res.Body.Close() + return nil, fmt.Errorf("request returned (%d): %s", res.StatusCode, string(body)) + } + a := new(tailcfg.TKASyncOfferResponse) + err = json.NewDecoder(res.Body).Decode(a) + res.Body.Close() + if err != nil { + return nil, fmt.Errorf("decoding JSON: %w", err) + } + + return a, nil +} + +// tkaDoSyncSend sends a /machine/tka/sync/send RPC to the control plane +// over noise. This is the second of two RPCs implementing tka synchronization. +func (b *LocalBackend) tkaDoSyncSend(ourNodeKey key.NodePublic, aums []tka.AUM) (*tailcfg.TKASyncSendResponse, error) { + sendReq := tailcfg.TKASyncSendRequest{ + Version: tailcfg.CurrentCapabilityVersion, + NodeKey: ourNodeKey, + MissingAUMs: make([]tkatype.MarshaledAUM, len(aums)), + } + for i, a := range aums { + sendReq.MissingAUMs[i] = a.Serialize() + } + + var req bytes.Buffer + if err := json.NewEncoder(&req).Encode(sendReq); err != nil { + return nil, fmt.Errorf("encoding request: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + req2, err := http.NewRequestWithContext(ctx, "GET", "https://unused/machine/tka/sync/send", &req) + if err != nil { + return nil, fmt.Errorf("req: %w", err) + } + res, err := b.DoNoiseRequest(req2) + if err != nil { + return nil, fmt.Errorf("resp: %w", err) + } + if res.StatusCode != 200 { + body, _ := io.ReadAll(res.Body) + res.Body.Close() + return nil, fmt.Errorf("request returned (%d): %s", res.StatusCode, string(body)) + } + a := new(tailcfg.TKASyncSendResponse) + err = json.NewDecoder(res.Body).Decode(a) + res.Body.Close() + if err != nil { + return nil, fmt.Errorf("decoding JSON: %w", err) + } + + return a, nil +} diff --git a/ipn/ipnlocal/network-lock_test.go b/ipn/ipnlocal/network-lock_test.go index 3b1d43287..8f151f654 100644 --- a/ipn/ipnlocal/network-lock_test.go +++ b/ipn/ipnlocal/network-lock_test.go @@ -23,6 +23,7 @@ import ( "tailscale.com/types/key" "tailscale.com/types/netmap" "tailscale.com/types/persist" + "tailscale.com/types/tkatype" ) func fakeControlClient(t *testing.T, c *http.Client) *controlclient.Auto { @@ -49,8 +50,6 @@ func fakeControlClient(t *testing.T, c *http.Client) *controlclient.Auto { return cc } -// NOTE: URLs must have a https scheme and example.com domain to work with the underlying -// httptest plumbing, despite the domain being unused in the actual noise request transport. func fakeNoiseServer(t *testing.T, handler http.HandlerFunc) (*httptest.Server, *http.Client) { ts := httptest.NewUnstartedServer(handler) ts.StartTLS() @@ -104,6 +103,9 @@ func TestTKAEnablementFlow(t *testing.T) { t.Fatal(err) } + case "/machine/tka/sync/offer", "/machine/tka/sync/send": + t.Error("node attempted to sync, but should have been up to date") + default: t.Errorf("unhandled endpoint path: %v", r.URL.Path) w.WriteHeader(404) @@ -126,7 +128,7 @@ func TestTKAEnablementFlow(t *testing.T) { b.mu.Lock() err = b.tkaSyncIfNeededLocked(&netmap.NetworkMap{ TKAEnabled: true, - TKAHead: tka.AUMHash{}, + TKAHead: a1.Head(), }) b.mu.Unlock() if err != nil { @@ -256,3 +258,229 @@ func TestTKADisablementFlow(t *testing.T) { t.Errorf("os.Stat(chonkDir) = %v, want ErrNotExist", err) } } + +func TestTKASync(t *testing.T) { + networkLockAvailable = func() bool { return true } // Enable the feature flag + + someKeyPriv := key.NewNLPrivate() + someKey := tka.Key{Kind: tka.Key25519, Public: someKeyPriv.Public().Verifier(), Votes: 1} + + type tkaSyncScenario struct { + name string + // controlAUMs is called (if non-nil) to get any AUMs which the tka state + // on control should be seeded with. + controlAUMs func(*testing.T, *tka.Authority, tka.Chonk, tka.Signer) []tka.AUM + // controlAUMs is called (if non-nil) to get any AUMs which the tka state + // on the node should be seeded with. + nodeAUMs func(*testing.T, *tka.Authority, tka.Chonk, tka.Signer) []tka.AUM + } + + tcs := []tkaSyncScenario{ + {name: "up to date"}, + { + name: "control has an update", + controlAUMs: func(t *testing.T, a *tka.Authority, storage tka.Chonk, signer tka.Signer) []tka.AUM { + b := a.NewUpdater(signer) + if err := b.RemoveKey(someKey.ID()); err != nil { + t.Fatal(err) + } + aums, err := b.Finalize(storage) + if err != nil { + t.Fatal(err) + } + return aums + }, + }, + { + // AKA 'control data loss' scenario + name: "node has an update", + nodeAUMs: func(t *testing.T, a *tka.Authority, storage tka.Chonk, signer tka.Signer) []tka.AUM { + b := a.NewUpdater(signer) + if err := b.RemoveKey(someKey.ID()); err != nil { + t.Fatal(err) + } + aums, err := b.Finalize(storage) + if err != nil { + t.Fatal(err) + } + return aums + }, + }, + { + // AKA 'control data loss + update in the meantime' scenario + name: "node and control diverge", + controlAUMs: func(t *testing.T, a *tka.Authority, storage tka.Chonk, signer tka.Signer) []tka.AUM { + b := a.NewUpdater(signer) + if err := b.SetKeyMeta(someKey.ID(), map[string]string{"ye": "swiggity"}); err != nil { + t.Fatal(err) + } + aums, err := b.Finalize(storage) + if err != nil { + t.Fatal(err) + } + return aums + }, + nodeAUMs: func(t *testing.T, a *tka.Authority, storage tka.Chonk, signer tka.Signer) []tka.AUM { + b := a.NewUpdater(signer) + if err := b.SetKeyMeta(someKey.ID(), map[string]string{"ye": "swooty"}); err != nil { + t.Fatal(err) + } + aums, err := b.Finalize(storage) + if err != nil { + t.Fatal(err) + } + return aums + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + temp := t.TempDir() + os.Mkdir(filepath.Join(temp, "tka"), 0755) + nodePriv := key.NewNode() + nlPriv := key.NewNLPrivate() + + // Setup the tka authority on the control plane. + key := tka.Key{Kind: tka.Key25519, Public: nlPriv.Public().Verifier(), Votes: 2} + controlStorage := &tka.Mem{} + controlAuthority, bootstrap, err := tka.Create(controlStorage, tka.State{ + Keys: []tka.Key{key, someKey}, + DisablementSecrets: [][]byte{bytes.Repeat([]byte{0xa5}, 32)}, + }, nlPriv) + if err != nil { + t.Fatalf("tka.Create() failed: %v", err) + } + if tc.controlAUMs != nil { + if err := controlAuthority.Inform(controlStorage, tc.controlAUMs(t, controlAuthority, controlStorage, nlPriv)); err != nil { + t.Fatalf("controlAuthority.Inform() failed: %v", err) + } + } + + // Setup the TKA authority on the node. + nodeStorage, err := tka.ChonkDir(filepath.Join(temp, "tka")) + if err != nil { + t.Fatal(err) + } + nodeAuthority, err := tka.Bootstrap(nodeStorage, bootstrap) + if err != nil { + t.Fatalf("tka.Bootstrap() failed: %v", err) + } + if tc.nodeAUMs != nil { + if err := nodeAuthority.Inform(nodeStorage, tc.nodeAUMs(t, nodeAuthority, nodeStorage, nlPriv)); err != nil { + t.Fatalf("nodeAuthority.Inform() failed: %v", err) + } + } + + // Make a mock control server. + ts, client := fakeNoiseServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + switch r.URL.Path { + case "/machine/tka/sync/offer": + body := new(tailcfg.TKASyncOfferRequest) + if err := json.NewDecoder(r.Body).Decode(body); err != nil { + t.Fatal(err) + } + t.Logf("got sync offer:\n%+v", body) + nodeOffer, err := toSyncOffer(body.Head, body.Ancestors) + if err != nil { + t.Fatal(err) + } + controlOffer, err := controlAuthority.SyncOffer(controlStorage) + if err != nil { + t.Fatal(err) + } + sendAUMs, err := controlAuthority.MissingAUMs(controlStorage, nodeOffer) + if err != nil { + t.Fatal(err) + } + + head, ancestors, err := fromSyncOffer(controlOffer) + if err != nil { + t.Fatal(err) + } + resp := tailcfg.TKASyncOfferResponse{ + Head: head, + Ancestors: ancestors, + MissingAUMs: make([]tkatype.MarshaledAUM, len(sendAUMs)), + } + for i, a := range sendAUMs { + resp.MissingAUMs[i] = a.Serialize() + } + + t.Logf("responding to sync offer with:\n%+v", resp) + w.WriteHeader(200) + if err := json.NewEncoder(w).Encode(resp); err != nil { + t.Fatal(err) + } + + case "/machine/tka/sync/send": + body := new(tailcfg.TKASyncSendRequest) + if err := json.NewDecoder(r.Body).Decode(body); err != nil { + t.Fatal(err) + } + t.Logf("got sync send:\n%+v", body) + toApply := make([]tka.AUM, len(body.MissingAUMs)) + for i, a := range body.MissingAUMs { + if err := toApply[i].Unserialize(a); err != nil { + t.Fatalf("decoding missingAUM[%d]: %v", i, err) + } + } + + if len(toApply) > 0 { + if err := controlAuthority.Inform(controlStorage, toApply); err != nil { + t.Fatalf("control.Inform(%+v) failed: %v", toApply, err) + } + } + head, err := controlAuthority.Head().MarshalText() + if err != nil { + t.Fatal(err) + } + + w.WriteHeader(200) + if err := json.NewEncoder(w).Encode(tailcfg.TKASyncSendResponse{Head: string(head)}); err != nil { + t.Fatal(err) + } + + default: + t.Errorf("unhandled endpoint path: %v", r.URL.Path) + w.WriteHeader(404) + } + })) + defer ts.Close() + + // Setup the client. + cc := fakeControlClient(t, client) + b := LocalBackend{ + varRoot: temp, + cc: cc, + ccAuto: cc, + logf: t.Logf, + tka: &tkaState{ + authority: nodeAuthority, + storage: nodeStorage, + }, + prefs: &ipn.Prefs{ + Persist: &persist.Persist{PrivateNodeKey: nodePriv}, + }, + } + + // Finally, lets trigger a sync. + b.mu.Lock() + err = b.tkaSyncIfNeededLocked(&netmap.NetworkMap{ + TKAEnabled: true, + TKAHead: controlAuthority.Head(), + }) + b.mu.Unlock() + if err != nil { + t.Errorf("tkaSyncIfNeededLocked() failed: %v", err) + } + + // Check that at the end of this ordeal, the node and the control + // plane are in sync. + if nodeHead, controlHead := b.tka.authority.Head(), controlAuthority.Head(); nodeHead != controlHead { + t.Errorf("node head = %v, want %v", nodeHead, controlHead) + } + }) + } +} diff --git a/tka/tailchonk.go b/tka/tailchonk.go index 880e09ac1..46510bead 100644 --- a/tka/tailchonk.go +++ b/tka/tailchonk.go @@ -214,6 +214,9 @@ func (c *FS) AUM(hash AUMHash) (AUM, error) { info, err := c.get(hash) if err != nil { + if os.IsNotExist(err) { + return AUM{}, os.ErrNotExist + } return AUM{}, err } if info.AUM == nil { diff --git a/tka/tailchonk_test.go b/tka/tailchonk_test.go index bb8b61fc6..af8aade3f 100644 --- a/tka/tailchonk_test.go +++ b/tka/tailchonk_test.go @@ -58,6 +58,18 @@ func TestTailchonk_ChildAUMs(t *testing.T) { } } +func TestTailchonk_AUMMissing(t *testing.T) { + for _, chonk := range []Chonk{&Mem{}, &FS{base: t.TempDir()}} { + t.Run(fmt.Sprintf("%T", chonk), func(t *testing.T) { + var notExists AUMHash + notExists[:][0] = 42 + if _, err := chonk.AUM(notExists); err != os.ErrNotExist { + t.Errorf("chonk.AUM(notExists).err = %v, want %v", err, os.ErrNotExist) + } + }) + } +} + func TestTailchonkMem_Orphans(t *testing.T) { chonk := Mem{}