tka: implement filesystem-based tailchonk implementation

FS implements Chonk, and given the expected load characteristics (frequent use
of AUM() + ChildAUMs(), and infrequent use of Heads() + CommitVerifiedAUMs()), the
implementation avoids scanning the filesystem to service AUM() and ChildAUMs().

Signed-off-by: Tom DNetto <tom@tailscale.com>
bradfitz/shared_split_dns
Tom DNetto 2 years ago committed by Tom
parent 505ca2750d
commit ec4f849079

@ -5,8 +5,17 @@
package tka package tka
import ( import (
"bytes"
"encoding/base32"
"encoding/hex"
"fmt"
"io/ioutil"
"os" "os"
"path/filepath"
"sync" "sync"
"github.com/fxamacker/cbor/v2"
"tailscale.com/atomicfile"
) )
// Chonk implementations provide durable storage for AUMs and other // Chonk implementations provide durable storage for AUMs and other
@ -156,3 +165,257 @@ updateLoop:
return nil return nil
} }
// FS implements filesystem storage of TKA state.
//
// FS implements the Chonk interface.
type FS struct {
base string
mu sync.RWMutex
}
// fsHashInfo describes how information about an AUMHash is represented
// on disk.
//
// The CBOR-serialization of this struct is stored to base/hex(hash[0])/base32(hash[1:])
//
// CBOR was chosen because we are already using it and it serializes
// much smaller than JSON for AUMs. The 'keyasint' thing isn't essential
// but again it saves a bunch of bytes.
type fsHashInfo struct {
Children []AUMHash `cbor:"1,keyasint"`
AUM *AUM `cbor:"2,keyasint"`
}
func (c *FS) dirPrefix(h AUMHash) string {
return filepath.Join(c.base, hex.EncodeToString(h[:1]))
}
func (c *FS) filename(h AUMHash) string {
return base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(h[1:])
}
// AUM returns the AUM with the specified digest.
//
// If the AUM does not exist, then os.ErrNotExist is returned.
func (c *FS) AUM(hash AUMHash) (AUM, error) {
c.mu.RLock()
defer c.mu.RUnlock()
info, err := c.get(hash)
if err != nil {
return AUM{}, err
}
if info.AUM == nil {
return AUM{}, os.ErrNotExist
}
return *info.AUM, nil
}
// AUM returns any known AUMs with a specific parent hash.
func (c *FS) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
c.mu.RLock()
defer c.mu.RUnlock()
info, err := c.get(prevAUMHash)
if err != nil {
if os.IsNotExist(err) {
// not knowing about this hash is not an error
return nil, nil
}
return nil, err
}
out := make([]AUM, len(info.Children))
for i, h := range info.Children {
c, err := c.get(h)
if err != nil {
// We expect any AUM recorded as a child on its parent to exist.
return nil, fmt.Errorf("reading child %d of %x: %v", i, h, err)
}
if c.AUM == nil {
return nil, fmt.Errorf("child %d of %x: AUM not stored", i, h)
}
out[i] = *c.AUM
}
return out, nil
}
func (c *FS) get(h AUMHash) (*fsHashInfo, error) {
f, err := os.Open(filepath.Join(c.dirPrefix(h), c.filename(h)))
if err != nil {
return nil, err
}
defer f.Close()
var out fsHashInfo
if err := cbor.NewDecoder(f).Decode(&out); err != nil {
return nil, err
}
return &out, nil
}
// Heads returns AUMs for which there are no children. In other
// words, the latest AUM in all possible chains (the 'leaves').
//
// Heads is expected to be called infrequently compared to AUM() or
// ChildAUMs(), so we haven't put any work into maintaining an index.
// Instead, the full set of AUMs is scanned.
func (c *FS) Heads() ([]AUM, error) {
c.mu.RLock()
defer c.mu.RUnlock()
out := make([]AUM, 0, 6) // 6 is arbitrary.
err := c.scanHashes(func(info *fsHashInfo) {
if len(info.Children) == 0 && info.AUM != nil {
out = append(out, *info.AUM)
}
})
return out, err
}
func (c *FS) scanHashes(eachHashInfo func(*fsHashInfo)) error {
prefixDirs, err := os.ReadDir(c.base)
if err != nil {
return fmt.Errorf("reading prefix dirs: %v", err)
}
for _, prefix := range prefixDirs {
if !prefix.IsDir() {
continue
}
pb, err := hex.DecodeString(prefix.Name())
if err != nil || len(pb) != 1 {
return fmt.Errorf("invalid prefix directory %q: %v", prefix.Name(), err)
}
files, err := os.ReadDir(filepath.Join(c.base, prefix.Name()))
if err != nil {
return fmt.Errorf("reading prefix dir: %v", err)
}
for _, file := range files {
remainingHash, err := base32.StdEncoding.WithPadding(base32.NoPadding).DecodeString(file.Name())
if err != nil {
return fmt.Errorf("invalid aum file %s/%s: %v", prefix.Name(), file.Name(), err)
}
var h AUMHash
h[0] = pb[0]
copy(h[1:], remainingHash)
info, err := c.get(h)
if err != nil {
return fmt.Errorf("reading %x: %v", h, err)
}
eachHashInfo(info)
}
}
return nil
}
// SetLastActiveAncestor is called to record the oldest-known AUM
// that contributed to the current state. This value is used as
// a hint on next startup to determine which chain to pick when computing
// the current state, if there are multiple distinct chains.
func (c *FS) SetLastActiveAncestor(hash AUMHash) error {
c.mu.Lock()
defer c.mu.Unlock()
return atomicfile.WriteFile(filepath.Join(c.base, "last_active_ancestor"), hash[:], 0644)
}
// LastActiveAncestor returns the oldest-known AUM that was (in a
// previous run) an ancestor of the current state. This is used
// as a hint to pick the correct chain in the event that the Chonk stores
// multiple distinct chains.
//
// Nil is returned if no last-active ancestor is set.
func (c *FS) LastActiveAncestor() (*AUMHash, error) {
c.mu.RLock()
defer c.mu.RUnlock()
hash, err := ioutil.ReadFile(filepath.Join(c.base, "last_active_ancestor"))
if err != nil {
if os.IsNotExist(err) {
return nil, nil // Not exist == none set.
}
return nil, err
}
var out AUMHash
if len(hash) != len(out) {
return nil, fmt.Errorf("stored hash is of wrong length: %d != %d", len(hash), len(out))
}
copy(out[:], hash)
return &out, nil
}
// CommitVerifiedAUMs durably stores the provided AUMs.
// Callers MUST ONLY provide AUMs which are verified (specifically,
// a call to aumVerify must return a nil error), as the
// implementation assumes that only verified AUMs are stored.
func (c *FS) CommitVerifiedAUMs(updates []AUM) error {
c.mu.Lock()
defer c.mu.Unlock()
for i, aum := range updates {
h := aum.Hash()
// We keep track of children against their parent so that
// ChildAUMs() do not need to scan all AUMs.
parent, hasParent := aum.Parent()
if hasParent {
err := c.commit(parent, func(info *fsHashInfo) {
// Only add it if its not already there.
for i := range info.Children {
if info.Children[i] == h {
return
}
}
info.Children = append(info.Children, h)
})
if err != nil {
return fmt.Errorf("committing update[%d] to parent %x: %v", i, parent, err)
}
}
err := c.commit(h, func(info *fsHashInfo) {
info.AUM = &aum
})
if err != nil {
return fmt.Errorf("committing update[%d] (%x): %v", i, h, err)
}
}
return nil
}
// commit calls the provided updater function to record changes relevant
// to the given hash. The caller is expected to update the AUM and
// Children fields, as relevant.
func (c *FS) commit(h AUMHash, updater func(*fsHashInfo)) error {
toCommit := fsHashInfo{}
existing, err := c.get(h)
switch {
case os.IsNotExist(err):
case err != nil:
return err
default:
toCommit = *existing
}
updater(&toCommit)
if toCommit.AUM != nil && toCommit.AUM.Hash() != h {
return fmt.Errorf("cannot commit AUM with hash %x to %x", toCommit.AUM.Hash(), h)
}
if err := os.MkdirAll(c.dirPrefix(h), 0755); err != nil && !os.IsExist(err) {
return fmt.Errorf("creating directory: %v", err)
}
var buff bytes.Buffer
if err := cbor.NewEncoder(&buff).Encode(toCommit); err != nil {
return fmt.Errorf("encoding: %v", err)
}
return atomicfile.WriteFile(filepath.Join(c.dirPrefix(h), c.filename(h)), buff.Bytes(), 0644)
}

@ -5,6 +5,9 @@
package tka package tka
import ( import (
"fmt"
"os"
"path/filepath"
"testing" "testing"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
@ -20,35 +23,38 @@ func randHash(t *testing.T, seed int64) [blake2s.Size]byte {
} }
func TestImplementsChonk(t *testing.T) { func TestImplementsChonk(t *testing.T) {
impls := []Chonk{&Mem{}} impls := []Chonk{&Mem{}, &FS{}}
t.Logf("chonks: %v", impls) t.Logf("chonks: %v", impls)
} }
func TestTailchonkMem_ChildAUMs(t *testing.T) { func TestTailchonk_ChildAUMs(t *testing.T) {
chonk := Mem{} for _, chonk := range []Chonk{&Mem{}, &FS{base: t.TempDir()}} {
parentHash := randHash(t, 1) t.Run(fmt.Sprintf("%T", chonk), func(t *testing.T) {
data := []AUM{ parentHash := randHash(t, 1)
{ data := []AUM{
MessageKind: AUMRemoveKey, {
KeyID: []byte{1, 2}, MessageKind: AUMRemoveKey,
PrevAUMHash: parentHash[:], KeyID: []byte{1, 2},
}, PrevAUMHash: parentHash[:],
{ },
MessageKind: AUMRemoveKey, {
KeyID: []byte{3, 4}, MessageKind: AUMRemoveKey,
PrevAUMHash: parentHash[:], KeyID: []byte{3, 4},
}, PrevAUMHash: parentHash[:],
} },
}
if err := chonk.CommitVerifiedAUMs(data); err != nil { if err := chonk.CommitVerifiedAUMs(data); err != nil {
t.Fatalf("CommitVerifiedAUMs failed: %v", err) t.Fatalf("CommitVerifiedAUMs failed: %v", err)
} }
stored, err := chonk.ChildAUMs(parentHash) stored, err := chonk.ChildAUMs(parentHash)
if err != nil { if err != nil {
t.Fatalf("ChildAUMs failed: %v", err) t.Fatalf("ChildAUMs failed: %v", err)
} }
if diff := cmp.Diff(data, stored); diff != "" { if diff := cmp.Diff(data, stored); diff != "" {
t.Errorf("stored AUM differs (-want, +got):\n%s", diff) t.Errorf("stored AUM differs (-want, +got):\n%s", diff)
}
})
} }
} }
@ -79,50 +85,74 @@ func TestTailchonkMem_Orphans(t *testing.T) {
} }
} }
func TestTailchonkMem_ReadChainFromHead(t *testing.T) { func TestTailchonk_ReadChainFromHead(t *testing.T) {
chonk := Mem{} for _, chonk := range []Chonk{&Mem{}, &FS{base: t.TempDir()}} {
genesis := AUM{MessageKind: AUMRemoveKey, KeyID: []byte{1, 2}}
gHash := genesis.Hash()
intermediate := AUM{PrevAUMHash: gHash[:]}
iHash := intermediate.Hash()
leaf := AUM{PrevAUMHash: iHash[:]}
commitSet := []AUM{
genesis,
intermediate,
leaf,
}
if err := chonk.CommitVerifiedAUMs(commitSet); err != nil {
t.Fatalf("CommitVerifiedAUMs failed: %v", err)
}
// t.Logf("genesis hash = %X", genesis.Hash())
// t.Logf("intermediate hash = %X", intermediate.Hash())
// t.Logf("leaf hash = %X", leaf.Hash())
// Read the chain from the leaf backwards. t.Run(fmt.Sprintf("%T", chonk), func(t *testing.T) {
gotLeafs, err := chonk.Heads() genesis := AUM{MessageKind: AUMRemoveKey, KeyID: []byte{1, 2}}
if err != nil { gHash := genesis.Hash()
t.Fatalf("Heads failed: %v", err) intermediate := AUM{PrevAUMHash: gHash[:]}
} iHash := intermediate.Hash()
if diff := cmp.Diff([]AUM{leaf}, gotLeafs); diff != "" { leaf := AUM{PrevAUMHash: iHash[:]}
t.Fatalf("leaf AUM differs (-want, +got):\n%s", diff)
}
parent, _ := gotLeafs[0].Parent() commitSet := []AUM{
gotIntermediate, err := chonk.AUM(parent) genesis,
if err != nil { intermediate,
t.Fatalf("AUM(<intermediate>) failed: %v", err) leaf,
}
if err := chonk.CommitVerifiedAUMs(commitSet); err != nil {
t.Fatalf("CommitVerifiedAUMs failed: %v", err)
}
// t.Logf("genesis hash = %X", genesis.Hash())
// t.Logf("intermediate hash = %X", intermediate.Hash())
// t.Logf("leaf hash = %X", leaf.Hash())
// Read the chain from the leaf backwards.
gotLeafs, err := chonk.Heads()
if err != nil {
t.Fatalf("Heads failed: %v", err)
}
if diff := cmp.Diff([]AUM{leaf}, gotLeafs); diff != "" {
t.Fatalf("leaf AUM differs (-want, +got):\n%s", diff)
}
parent, _ := gotLeafs[0].Parent()
gotIntermediate, err := chonk.AUM(parent)
if err != nil {
t.Fatalf("AUM(<intermediate>) failed: %v", err)
}
if diff := cmp.Diff(intermediate, gotIntermediate); diff != "" {
t.Errorf("intermediate AUM differs (-want, +got):\n%s", diff)
}
parent, _ = gotIntermediate.Parent()
gotGenesis, err := chonk.AUM(parent)
if err != nil {
t.Fatalf("AUM(<genesis>) failed: %v", err)
}
if diff := cmp.Diff(genesis, gotGenesis); diff != "" {
t.Errorf("genesis AUM differs (-want, +got):\n%s", diff)
}
})
} }
if diff := cmp.Diff(intermediate, gotIntermediate); diff != "" { }
t.Errorf("intermediate AUM differs (-want, +got):\n%s", diff)
func TestTailchonkFS_Commit(t *testing.T) {
chonk := &FS{base: t.TempDir()}
parentHash := randHash(t, 1)
aum := AUM{MessageKind: AUMNoOp, PrevAUMHash: parentHash[:]}
if err := chonk.CommitVerifiedAUMs([]AUM{aum}); err != nil {
t.Fatal(err)
} }
parent, _ = gotIntermediate.Parent() if got, want := chonk.filename(aum.Hash()), "HJX3LPJJQVRFSQX4QONESBU4DUO5JPORA66ZUCFS6NHZWDZTP4"; got != want {
gotGenesis, err := chonk.AUM(parent) t.Errorf("aum filename = %q, want %q", got, want)
if err != nil { }
t.Fatalf("AUM(<genesis>) failed: %v", err) if _, err := os.Stat(filepath.Join(chonk.base, "ad", "HJX3LPJJQVRFSQX4QONESBU4DUO5JPORA66ZUCFS6NHZWDZTP4")); err != nil {
t.Errorf("stat of AUM file failed: %v", err)
} }
if diff := cmp.Diff(genesis, gotGenesis); diff != "" { if _, err := os.Stat(filepath.Join(chonk.base, "67", "226TIYPDKQWKFD5MXUI3GRVDSDFXRBABNINTFIT5ADMCLZ464U")); err != nil {
t.Errorf("genesis AUM differs (-want, +got):\n%s", diff) t.Errorf("stat of AUM parent failed: %v", err)
} }
} }

Loading…
Cancel
Save