From ec4f849079ce409bf161f8bc4bce20b171102171 Mon Sep 17 00:00:00 2001 From: Tom DNetto Date: Tue, 12 Jul 2022 12:23:43 -0700 Subject: [PATCH] tka: implement filesystem-based tailchonk implementation FS implements Chonk, and given the expected load characteristics (frequent use of AUM() + ChildAUMs(), and infrequent use of Heads() + CommitVerifiedAUMs()), the implementation avoids scanning the filesystem to service AUM() and ChildAUMs(). Signed-off-by: Tom DNetto --- tka/tailchonk.go | 263 ++++++++++++++++++++++++++++++++++++++++++ tka/tailchonk_test.go | 158 +++++++++++++++---------- 2 files changed, 357 insertions(+), 64 deletions(-) diff --git a/tka/tailchonk.go b/tka/tailchonk.go index 4a5abdadb..dfcc2d080 100644 --- a/tka/tailchonk.go +++ b/tka/tailchonk.go @@ -5,8 +5,17 @@ package tka import ( + "bytes" + "encoding/base32" + "encoding/hex" + "fmt" + "io/ioutil" "os" + "path/filepath" "sync" + + "github.com/fxamacker/cbor/v2" + "tailscale.com/atomicfile" ) // Chonk implementations provide durable storage for AUMs and other @@ -156,3 +165,257 @@ updateLoop: return nil } + +// FS implements filesystem storage of TKA state. +// +// FS implements the Chonk interface. +type FS struct { + base string + mu sync.RWMutex +} + +// fsHashInfo describes how information about an AUMHash is represented +// on disk. +// +// The CBOR-serialization of this struct is stored to base/hex(hash[0])/base32(hash[1:]) +// +// CBOR was chosen because we are already using it and it serializes +// much smaller than JSON for AUMs. The 'keyasint' thing isn't essential +// but again it saves a bunch of bytes. +type fsHashInfo struct { + Children []AUMHash `cbor:"1,keyasint"` + AUM *AUM `cbor:"2,keyasint"` +} + +func (c *FS) dirPrefix(h AUMHash) string { + return filepath.Join(c.base, hex.EncodeToString(h[:1])) +} + +func (c *FS) filename(h AUMHash) string { + return base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(h[1:]) +} + +// AUM returns the AUM with the specified digest. +// +// If the AUM does not exist, then os.ErrNotExist is returned. +func (c *FS) AUM(hash AUMHash) (AUM, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + info, err := c.get(hash) + if err != nil { + return AUM{}, err + } + if info.AUM == nil { + return AUM{}, os.ErrNotExist + } + return *info.AUM, nil +} + +// AUM returns any known AUMs with a specific parent hash. +func (c *FS) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + info, err := c.get(prevAUMHash) + if err != nil { + if os.IsNotExist(err) { + // not knowing about this hash is not an error + return nil, nil + } + return nil, err + } + + out := make([]AUM, len(info.Children)) + for i, h := range info.Children { + c, err := c.get(h) + if err != nil { + // We expect any AUM recorded as a child on its parent to exist. + return nil, fmt.Errorf("reading child %d of %x: %v", i, h, err) + } + if c.AUM == nil { + return nil, fmt.Errorf("child %d of %x: AUM not stored", i, h) + } + out[i] = *c.AUM + } + + return out, nil +} + +func (c *FS) get(h AUMHash) (*fsHashInfo, error) { + f, err := os.Open(filepath.Join(c.dirPrefix(h), c.filename(h))) + if err != nil { + return nil, err + } + defer f.Close() + + var out fsHashInfo + if err := cbor.NewDecoder(f).Decode(&out); err != nil { + return nil, err + } + return &out, nil +} + +// Heads returns AUMs for which there are no children. In other +// words, the latest AUM in all possible chains (the 'leaves'). +// +// Heads is expected to be called infrequently compared to AUM() or +// ChildAUMs(), so we haven't put any work into maintaining an index. +// Instead, the full set of AUMs is scanned. +func (c *FS) Heads() ([]AUM, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + out := make([]AUM, 0, 6) // 6 is arbitrary. + err := c.scanHashes(func(info *fsHashInfo) { + if len(info.Children) == 0 && info.AUM != nil { + out = append(out, *info.AUM) + } + }) + return out, err +} + +func (c *FS) scanHashes(eachHashInfo func(*fsHashInfo)) error { + prefixDirs, err := os.ReadDir(c.base) + if err != nil { + return fmt.Errorf("reading prefix dirs: %v", err) + } + for _, prefix := range prefixDirs { + if !prefix.IsDir() { + continue + } + pb, err := hex.DecodeString(prefix.Name()) + if err != nil || len(pb) != 1 { + return fmt.Errorf("invalid prefix directory %q: %v", prefix.Name(), err) + } + + files, err := os.ReadDir(filepath.Join(c.base, prefix.Name())) + if err != nil { + return fmt.Errorf("reading prefix dir: %v", err) + } + for _, file := range files { + remainingHash, err := base32.StdEncoding.WithPadding(base32.NoPadding).DecodeString(file.Name()) + if err != nil { + return fmt.Errorf("invalid aum file %s/%s: %v", prefix.Name(), file.Name(), err) + } + var h AUMHash + h[0] = pb[0] + copy(h[1:], remainingHash) + + info, err := c.get(h) + if err != nil { + return fmt.Errorf("reading %x: %v", h, err) + } + + eachHashInfo(info) + } + } + + return nil +} + +// SetLastActiveAncestor is called to record the oldest-known AUM +// that contributed to the current state. This value is used as +// a hint on next startup to determine which chain to pick when computing +// the current state, if there are multiple distinct chains. +func (c *FS) SetLastActiveAncestor(hash AUMHash) error { + c.mu.Lock() + defer c.mu.Unlock() + return atomicfile.WriteFile(filepath.Join(c.base, "last_active_ancestor"), hash[:], 0644) +} + +// LastActiveAncestor returns the oldest-known AUM that was (in a +// previous run) an ancestor of the current state. This is used +// as a hint to pick the correct chain in the event that the Chonk stores +// multiple distinct chains. +// +// Nil is returned if no last-active ancestor is set. +func (c *FS) LastActiveAncestor() (*AUMHash, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + hash, err := ioutil.ReadFile(filepath.Join(c.base, "last_active_ancestor")) + if err != nil { + if os.IsNotExist(err) { + return nil, nil // Not exist == none set. + } + return nil, err + } + + var out AUMHash + if len(hash) != len(out) { + return nil, fmt.Errorf("stored hash is of wrong length: %d != %d", len(hash), len(out)) + } + copy(out[:], hash) + return &out, nil +} + +// CommitVerifiedAUMs durably stores the provided AUMs. +// Callers MUST ONLY provide AUMs which are verified (specifically, +// a call to aumVerify must return a nil error), as the +// implementation assumes that only verified AUMs are stored. +func (c *FS) CommitVerifiedAUMs(updates []AUM) error { + c.mu.Lock() + defer c.mu.Unlock() + + for i, aum := range updates { + h := aum.Hash() + // We keep track of children against their parent so that + // ChildAUMs() do not need to scan all AUMs. + parent, hasParent := aum.Parent() + if hasParent { + err := c.commit(parent, func(info *fsHashInfo) { + // Only add it if its not already there. + for i := range info.Children { + if info.Children[i] == h { + return + } + } + info.Children = append(info.Children, h) + }) + if err != nil { + return fmt.Errorf("committing update[%d] to parent %x: %v", i, parent, err) + } + } + + err := c.commit(h, func(info *fsHashInfo) { + info.AUM = &aum + }) + if err != nil { + return fmt.Errorf("committing update[%d] (%x): %v", i, h, err) + } + } + + return nil +} + +// commit calls the provided updater function to record changes relevant +// to the given hash. The caller is expected to update the AUM and +// Children fields, as relevant. +func (c *FS) commit(h AUMHash, updater func(*fsHashInfo)) error { + toCommit := fsHashInfo{} + + existing, err := c.get(h) + switch { + case os.IsNotExist(err): + case err != nil: + return err + default: + toCommit = *existing + } + + updater(&toCommit) + if toCommit.AUM != nil && toCommit.AUM.Hash() != h { + return fmt.Errorf("cannot commit AUM with hash %x to %x", toCommit.AUM.Hash(), h) + } + + if err := os.MkdirAll(c.dirPrefix(h), 0755); err != nil && !os.IsExist(err) { + return fmt.Errorf("creating directory: %v", err) + } + + var buff bytes.Buffer + if err := cbor.NewEncoder(&buff).Encode(toCommit); err != nil { + return fmt.Errorf("encoding: %v", err) + } + return atomicfile.WriteFile(filepath.Join(c.dirPrefix(h), c.filename(h)), buff.Bytes(), 0644) +} diff --git a/tka/tailchonk_test.go b/tka/tailchonk_test.go index 334323275..f7fec6643 100644 --- a/tka/tailchonk_test.go +++ b/tka/tailchonk_test.go @@ -5,6 +5,9 @@ package tka import ( + "fmt" + "os" + "path/filepath" "testing" "github.com/google/go-cmp/cmp" @@ -20,35 +23,38 @@ func randHash(t *testing.T, seed int64) [blake2s.Size]byte { } func TestImplementsChonk(t *testing.T) { - impls := []Chonk{&Mem{}} + impls := []Chonk{&Mem{}, &FS{}} t.Logf("chonks: %v", impls) } -func TestTailchonkMem_ChildAUMs(t *testing.T) { - chonk := Mem{} - parentHash := randHash(t, 1) - data := []AUM{ - { - MessageKind: AUMRemoveKey, - KeyID: []byte{1, 2}, - PrevAUMHash: parentHash[:], - }, - { - MessageKind: AUMRemoveKey, - KeyID: []byte{3, 4}, - PrevAUMHash: parentHash[:], - }, - } +func TestTailchonk_ChildAUMs(t *testing.T) { + for _, chonk := range []Chonk{&Mem{}, &FS{base: t.TempDir()}} { + t.Run(fmt.Sprintf("%T", chonk), func(t *testing.T) { + parentHash := randHash(t, 1) + data := []AUM{ + { + MessageKind: AUMRemoveKey, + KeyID: []byte{1, 2}, + PrevAUMHash: parentHash[:], + }, + { + MessageKind: AUMRemoveKey, + KeyID: []byte{3, 4}, + PrevAUMHash: parentHash[:], + }, + } - if err := chonk.CommitVerifiedAUMs(data); err != nil { - t.Fatalf("CommitVerifiedAUMs failed: %v", err) - } - stored, err := chonk.ChildAUMs(parentHash) - if err != nil { - t.Fatalf("ChildAUMs failed: %v", err) - } - if diff := cmp.Diff(data, stored); diff != "" { - t.Errorf("stored AUM differs (-want, +got):\n%s", diff) + if err := chonk.CommitVerifiedAUMs(data); err != nil { + t.Fatalf("CommitVerifiedAUMs failed: %v", err) + } + stored, err := chonk.ChildAUMs(parentHash) + if err != nil { + t.Fatalf("ChildAUMs failed: %v", err) + } + if diff := cmp.Diff(data, stored); diff != "" { + t.Errorf("stored AUM differs (-want, +got):\n%s", diff) + } + }) } } @@ -79,50 +85,74 @@ func TestTailchonkMem_Orphans(t *testing.T) { } } -func TestTailchonkMem_ReadChainFromHead(t *testing.T) { - chonk := Mem{} - genesis := AUM{MessageKind: AUMRemoveKey, KeyID: []byte{1, 2}} - gHash := genesis.Hash() - intermediate := AUM{PrevAUMHash: gHash[:]} - iHash := intermediate.Hash() - leaf := AUM{PrevAUMHash: iHash[:]} - - commitSet := []AUM{ - genesis, - intermediate, - leaf, - } - if err := chonk.CommitVerifiedAUMs(commitSet); err != nil { - t.Fatalf("CommitVerifiedAUMs failed: %v", err) - } - // t.Logf("genesis hash = %X", genesis.Hash()) - // t.Logf("intermediate hash = %X", intermediate.Hash()) - // t.Logf("leaf hash = %X", leaf.Hash()) +func TestTailchonk_ReadChainFromHead(t *testing.T) { + for _, chonk := range []Chonk{&Mem{}, &FS{base: t.TempDir()}} { - // Read the chain from the leaf backwards. - gotLeafs, err := chonk.Heads() - if err != nil { - t.Fatalf("Heads failed: %v", err) - } - if diff := cmp.Diff([]AUM{leaf}, gotLeafs); diff != "" { - t.Fatalf("leaf AUM differs (-want, +got):\n%s", diff) - } + t.Run(fmt.Sprintf("%T", chonk), func(t *testing.T) { + genesis := AUM{MessageKind: AUMRemoveKey, KeyID: []byte{1, 2}} + gHash := genesis.Hash() + intermediate := AUM{PrevAUMHash: gHash[:]} + iHash := intermediate.Hash() + leaf := AUM{PrevAUMHash: iHash[:]} - parent, _ := gotLeafs[0].Parent() - gotIntermediate, err := chonk.AUM(parent) - if err != nil { - t.Fatalf("AUM() failed: %v", err) + commitSet := []AUM{ + genesis, + intermediate, + leaf, + } + if err := chonk.CommitVerifiedAUMs(commitSet); err != nil { + t.Fatalf("CommitVerifiedAUMs failed: %v", err) + } + // t.Logf("genesis hash = %X", genesis.Hash()) + // t.Logf("intermediate hash = %X", intermediate.Hash()) + // t.Logf("leaf hash = %X", leaf.Hash()) + + // Read the chain from the leaf backwards. + gotLeafs, err := chonk.Heads() + if err != nil { + t.Fatalf("Heads failed: %v", err) + } + if diff := cmp.Diff([]AUM{leaf}, gotLeafs); diff != "" { + t.Fatalf("leaf AUM differs (-want, +got):\n%s", diff) + } + + parent, _ := gotLeafs[0].Parent() + gotIntermediate, err := chonk.AUM(parent) + if err != nil { + t.Fatalf("AUM() failed: %v", err) + } + if diff := cmp.Diff(intermediate, gotIntermediate); diff != "" { + t.Errorf("intermediate AUM differs (-want, +got):\n%s", diff) + } + + parent, _ = gotIntermediate.Parent() + gotGenesis, err := chonk.AUM(parent) + if err != nil { + t.Fatalf("AUM() failed: %v", err) + } + if diff := cmp.Diff(genesis, gotGenesis); diff != "" { + t.Errorf("genesis AUM differs (-want, +got):\n%s", diff) + } + }) } - if diff := cmp.Diff(intermediate, gotIntermediate); diff != "" { - t.Errorf("intermediate AUM differs (-want, +got):\n%s", diff) +} + +func TestTailchonkFS_Commit(t *testing.T) { + chonk := &FS{base: t.TempDir()} + parentHash := randHash(t, 1) + aum := AUM{MessageKind: AUMNoOp, PrevAUMHash: parentHash[:]} + + if err := chonk.CommitVerifiedAUMs([]AUM{aum}); err != nil { + t.Fatal(err) } - parent, _ = gotIntermediate.Parent() - gotGenesis, err := chonk.AUM(parent) - if err != nil { - t.Fatalf("AUM() failed: %v", err) + if got, want := chonk.filename(aum.Hash()), "HJX3LPJJQVRFSQX4QONESBU4DUO5JPORA66ZUCFS6NHZWDZTP4"; got != want { + t.Errorf("aum filename = %q, want %q", got, want) + } + if _, err := os.Stat(filepath.Join(chonk.base, "ad", "HJX3LPJJQVRFSQX4QONESBU4DUO5JPORA66ZUCFS6NHZWDZTP4")); err != nil { + t.Errorf("stat of AUM file failed: %v", err) } - if diff := cmp.Diff(genesis, gotGenesis); diff != "" { - t.Errorf("genesis AUM differs (-want, +got):\n%s", diff) + if _, err := os.Stat(filepath.Join(chonk.base, "67", "226TIYPDKQWKFD5MXUI3GRVDSDFXRBABNINTFIT5ADMCLZ464U")); err != nil { + t.Errorf("stat of AUM parent failed: %v", err) } }