You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
tailscale/tka/tailchonk.go

759 lines
22 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package tka
import (
"bytes"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/fxamacker/cbor/v2"
"tailscale.com/atomicfile"
)
// Chonk implementations provide durable storage for AUMs and other
// TKA state.
//
// All methods must be thread-safe.
//
// The name 'tailchonk' was coined by @catzkorn.
type Chonk interface {
// AUM returns the AUM with the specified digest.
//
// If the AUM does not exist, then os.ErrNotExist is returned.
AUM(hash AUMHash) (AUM, error)
// ChildAUMs returns all AUMs with a specified previous
// AUM hash.
ChildAUMs(prevAUMHash AUMHash) ([]AUM, error)
// CommitVerifiedAUMs durably stores the provided AUMs.
// Callers MUST ONLY provide AUMs which are verified (specifically,
// a call to aumVerify() must return a nil error).
// as the implementation assumes that only verified AUMs are stored.
CommitVerifiedAUMs(updates []AUM) error
// Heads returns AUMs for which there are no children. In other
// words, the latest AUM in all possible chains (the 'leaves').
Heads() ([]AUM, error)
// SetLastActiveAncestor is called to record the oldest-known AUM
// that contributed to the current state. This value is used as
// a hint on next startup to determine which chain to pick when computing
// the current state, if there are multiple distinct chains.
SetLastActiveAncestor(hash AUMHash) error
// LastActiveAncestor returns the oldest-known AUM that was (in a
// previous run) an ancestor of the current state. This is used
// as a hint to pick the correct chain in the event that the Chonk stores
// multiple distinct chains.
LastActiveAncestor() (*AUMHash, error)
}
// CompactableChonk implementation are extensions of Chonk, which are
// able to be operated by compaction logic to deleted old AUMs.
type CompactableChonk interface {
Chonk
// AllAUMs returns all AUMs stored in the chonk.
AllAUMs() ([]AUMHash, error)
// CommitTime returns the time at which the AUM was committed.
//
// If the AUM does not exist, then os.ErrNotExist is returned.
CommitTime(hash AUMHash) (time.Time, error)
// PurgeAUMs permanently and irrevocably deletes the specified
// AUMs from storage.
PurgeAUMs(hashes []AUMHash) error
}
// Mem implements in-memory storage of TKA state, suitable for
// tests.
//
// Mem implements the Chonk interface.
type Mem struct {
l sync.RWMutex
aums map[AUMHash]AUM
parentIndex map[AUMHash][]AUMHash
lastActiveAncestor *AUMHash
}
func (c *Mem) SetLastActiveAncestor(hash AUMHash) error {
c.l.Lock()
defer c.l.Unlock()
c.lastActiveAncestor = &hash
return nil
}
func (c *Mem) LastActiveAncestor() (*AUMHash, error) {
c.l.RLock()
defer c.l.RUnlock()
return c.lastActiveAncestor, nil
}
// Heads returns AUMs for which there are no children. In other
// words, the latest AUM in all chains (the 'leaf').
func (c *Mem) Heads() ([]AUM, error) {
c.l.RLock()
defer c.l.RUnlock()
out := make([]AUM, 0, 6)
// An AUM is a 'head' if there are no nodes for which it is the parent.
for _, a := range c.aums {
if len(c.parentIndex[a.Hash()]) == 0 {
out = append(out, a)
}
}
return out, nil
}
// AUM returns the AUM with the specified digest.
func (c *Mem) AUM(hash AUMHash) (AUM, error) {
c.l.RLock()
defer c.l.RUnlock()
aum, ok := c.aums[hash]
if !ok {
return AUM{}, os.ErrNotExist
}
return aum, nil
}
// Orphans returns all AUMs which do not have a parent.
func (c *Mem) Orphans() ([]AUM, error) {
c.l.RLock()
defer c.l.RUnlock()
out := make([]AUM, 0, 6)
for _, a := range c.aums {
if _, ok := a.Parent(); !ok {
out = append(out, a)
}
}
return out, nil
}
// ChildAUMs returns all AUMs with a specified previous
// AUM hash.
func (c *Mem) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
c.l.RLock()
defer c.l.RUnlock()
out := make([]AUM, 0, 6)
for _, entry := range c.parentIndex[prevAUMHash] {
out = append(out, c.aums[entry])
}
return out, nil
}
// CommitVerifiedAUMs durably stores the provided AUMs.
// Callers MUST ONLY provide well-formed and verified AUMs,
// as the rest of the TKA implementation assumes that only
// verified AUMs are stored.
func (c *Mem) CommitVerifiedAUMs(updates []AUM) error {
c.l.Lock()
defer c.l.Unlock()
if c.aums == nil {
c.parentIndex = make(map[AUMHash][]AUMHash, 64)
c.aums = make(map[AUMHash]AUM, 64)
}
updateLoop:
for _, aum := range updates {
aumHash := aum.Hash()
c.aums[aumHash] = aum
parent, ok := aum.Parent()
if ok {
for _, exists := range c.parentIndex[parent] {
if exists == aumHash {
continue updateLoop
}
}
c.parentIndex[parent] = append(c.parentIndex[parent], aumHash)
}
}
return nil
}
// FS implements filesystem storage of TKA state.
//
// FS implements the Chonk interface.
type FS struct {
base string
mu sync.RWMutex
}
// ChonkDir returns an implementation of Chonk which uses the
// given directory to store TKA state.
func ChonkDir(dir string) (*FS, error) {
stat, err := os.Stat(dir)
if err != nil {
return nil, err
}
if !stat.IsDir() {
return nil, fmt.Errorf("chonk directory %q is a file", dir)
}
return &FS{base: dir}, nil
}
// fsHashInfo describes how information about an AUMHash is represented
// on disk.
//
// The CBOR-serialization of this struct is stored to base/__/base32(hash)
// where __ are the first two characters of base32(hash).
//
// CBOR was chosen because we are already using it and it serializes
// much smaller than JSON for AUMs. The 'keyasint' thing isn't essential
// but again it saves a bunch of bytes.
type fsHashInfo struct {
Children []AUMHash `cbor:"1,keyasint"`
AUM *AUM `cbor:"2,keyasint"`
}
// aumDir returns the directory an AUM is stored in, and its filename
// within the directory.
func (c *FS) aumDir(h AUMHash) (dir, base string) {
s := h.String()
return filepath.Join(c.base, s[:2]), s
}
// AUM returns the AUM with the specified digest.
//
// If the AUM does not exist, then os.ErrNotExist is returned.
func (c *FS) AUM(hash AUMHash) (AUM, error) {
c.mu.RLock()
defer c.mu.RUnlock()
info, err := c.get(hash)
if err != nil {
if os.IsNotExist(err) {
return AUM{}, os.ErrNotExist
}
return AUM{}, err
}
if info.AUM == nil {
return AUM{}, os.ErrNotExist
}
return *info.AUM, nil
}
// AUM returns any known AUMs with a specific parent hash.
func (c *FS) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
c.mu.RLock()
defer c.mu.RUnlock()
info, err := c.get(prevAUMHash)
if err != nil {
if os.IsNotExist(err) {
// not knowing about this hash is not an error
return nil, nil
}
return nil, err
}
out := make([]AUM, len(info.Children))
for i, h := range info.Children {
c, err := c.get(h)
if err != nil {
// We expect any AUM recorded as a child on its parent to exist.
return nil, fmt.Errorf("reading child %d of %x: %v", i, h, err)
}
if c.AUM == nil {
return nil, fmt.Errorf("child %d of %x: AUM not stored", i, h)
}
out[i] = *c.AUM
}
return out, nil
}
func (c *FS) get(h AUMHash) (*fsHashInfo, error) {
dir, base := c.aumDir(h)
f, err := os.Open(filepath.Join(dir, base))
if err != nil {
return nil, err
}
defer f.Close()
m, err := cborDecOpts.DecMode()
if err != nil {
return nil, err
}
var out fsHashInfo
if err := m.NewDecoder(f).Decode(&out); err != nil {
return nil, err
}
if out.AUM != nil && out.AUM.Hash() != h {
return nil, fmt.Errorf("%s: AUM does not match file name hash %s", f.Name(), out.AUM.Hash())
}
return &out, nil
}
// Heads returns AUMs for which there are no children. In other
// words, the latest AUM in all possible chains (the 'leaves').
//
// Heads is expected to be called infrequently compared to AUM() or
// ChildAUMs(), so we haven't put any work into maintaining an index.
// Instead, the full set of AUMs is scanned.
func (c *FS) Heads() ([]AUM, error) {
c.mu.RLock()
defer c.mu.RUnlock()
out := make([]AUM, 0, 6) // 6 is arbitrary.
err := c.scanHashes(func(info *fsHashInfo) {
if len(info.Children) == 0 && info.AUM != nil {
out = append(out, *info.AUM)
}
})
return out, err
}
func (c *FS) scanHashes(eachHashInfo func(*fsHashInfo)) error {
prefixDirs, err := os.ReadDir(c.base)
if err != nil {
return fmt.Errorf("reading prefix dirs: %v", err)
}
for _, prefix := range prefixDirs {
if !prefix.IsDir() {
continue
}
files, err := os.ReadDir(filepath.Join(c.base, prefix.Name()))
if err != nil {
return fmt.Errorf("reading prefix dir: %v", err)
}
for _, file := range files {
var h AUMHash
if err := h.UnmarshalText([]byte(file.Name())); err != nil {
return fmt.Errorf("invalid aum file: %s: %w", file.Name(), err)
}
info, err := c.get(h)
if err != nil {
return fmt.Errorf("reading %x: %v", h, err)
}
eachHashInfo(info)
}
}
return nil
}
// SetLastActiveAncestor is called to record the oldest-known AUM
// that contributed to the current state. This value is used as
// a hint on next startup to determine which chain to pick when computing
// the current state, if there are multiple distinct chains.
func (c *FS) SetLastActiveAncestor(hash AUMHash) error {
c.mu.Lock()
defer c.mu.Unlock()
return atomicfile.WriteFile(filepath.Join(c.base, "last_active_ancestor"), hash[:], 0644)
}
// LastActiveAncestor returns the oldest-known AUM that was (in a
// previous run) an ancestor of the current state. This is used
// as a hint to pick the correct chain in the event that the Chonk stores
// multiple distinct chains.
//
// Nil is returned if no last-active ancestor is set.
func (c *FS) LastActiveAncestor() (*AUMHash, error) {
c.mu.RLock()
defer c.mu.RUnlock()
hash, err := os.ReadFile(filepath.Join(c.base, "last_active_ancestor"))
if err != nil {
if os.IsNotExist(err) {
return nil, nil // Not exist == none set.
}
return nil, err
}
var out AUMHash
if len(hash) != len(out) {
return nil, fmt.Errorf("stored hash is of wrong length: %d != %d", len(hash), len(out))
}
copy(out[:], hash)
return &out, nil
}
// CommitVerifiedAUMs durably stores the provided AUMs.
// Callers MUST ONLY provide AUMs which are verified (specifically,
// a call to aumVerify must return a nil error), as the
// implementation assumes that only verified AUMs are stored.
func (c *FS) CommitVerifiedAUMs(updates []AUM) error {
c.mu.Lock()
defer c.mu.Unlock()
for i, aum := range updates {
h := aum.Hash()
// We keep track of children against their parent so that
// ChildAUMs() do not need to scan all AUMs.
parent, hasParent := aum.Parent()
if hasParent {
err := c.commit(parent, func(info *fsHashInfo) {
// Only add it if its not already there.
for i := range info.Children {
if info.Children[i] == h {
return
}
}
info.Children = append(info.Children, h)
})
if err != nil {
return fmt.Errorf("committing update[%d] to parent %x: %v", i, parent, err)
}
}
err := c.commit(h, func(info *fsHashInfo) {
info.AUM = &aum
})
if err != nil {
return fmt.Errorf("committing update[%d] (%x): %v", i, h, err)
}
}
return nil
}
// commit calls the provided updater function to record changes relevant
// to the given hash. The caller is expected to update the AUM and
// Children fields, as relevant.
func (c *FS) commit(h AUMHash, updater func(*fsHashInfo)) error {
toCommit := fsHashInfo{}
existing, err := c.get(h)
switch {
case os.IsNotExist(err):
case err != nil:
return err
default:
toCommit = *existing
}
updater(&toCommit)
if toCommit.AUM != nil && toCommit.AUM.Hash() != h {
return fmt.Errorf("cannot commit AUM with hash %x to %x", toCommit.AUM.Hash(), h)
}
dir, base := c.aumDir(h)
if err := os.MkdirAll(dir, 0755); err != nil && !os.IsExist(err) {
return fmt.Errorf("creating directory: %v", err)
}
m, err := cbor.CTAP2EncOptions().EncMode()
if err != nil {
return fmt.Errorf("cbor EncMode: %v", err)
}
var buff bytes.Buffer
if err := m.NewEncoder(&buff).Encode(toCommit); err != nil {
return fmt.Errorf("encoding: %v", err)
}
return atomicfile.WriteFile(filepath.Join(dir, base), buff.Bytes(), 0644)
}
// CompactionOptions describes tuneables to use when compacting a Chonk.
type CompactionOptions struct {
// The minimum number of ancestor AUMs to remember. The actual length
// of the chain post-compaction may be longer to reach a Checkpoint AUM.
MinChain int
// The minimum duration to store an AUM before it is a candidate for deletion.
MinAge time.Duration
}
// retainState tracks the state of an AUM hash as it is being considered for
// deletion.
type retainState uint8
// Valid retainState flags.
const (
retainStateActive retainState = 1 << iota // The AUM is part of the active chain and less than MinChain hops from HEAD.
retainStateYoung // The AUM is younger than MinAge.
retainStateLeaf // The AUM is a descendant of an AUM to be retained.
retainStateAncestor // The AUM is part of a chain between a retained AUM and the new lastActiveAncestor.
retainStateCandidate // The AUM is part of the active chain.
// retainAUMMask is a bit mask of any bit which should prevent
// the deletion of an AUM.
retainAUMMask retainState = retainStateActive | retainStateYoung | retainStateLeaf | retainStateAncestor
)
// markActiveChain marks AUMs in the active chain.
// All AUMs that are within minChain ancestors of head are
// marked retainStateActive, and all remaining ancestors are
// marked retainStateCandidate.
//
// markActiveChain returns the next ancestor AUM which is a checkpoint AUM.
func markActiveChain(storage Chonk, verdict map[AUMHash]retainState, minChain int, head AUMHash) (lastActiveAncestor AUMHash, err error) {
next, err := storage.AUM(head)
if err != nil {
return AUMHash{}, err
}
for i := 0; i < minChain; i++ {
h := next.Hash()
verdict[h] |= retainStateActive
parent, hasParent := next.Parent()
if !hasParent {
// Genesis AUM (beginning of time). The chain isnt long enough to need truncating.
return h, nil
}
if next, err = storage.AUM(parent); err != nil {
if err == os.ErrNotExist {
// We've reached the end of the chain we have stored.
return h, nil
}
return AUMHash{}, fmt.Errorf("reading active chain (retainStateActive) (%d): %w", i, err)
}
}
// If we got this far, we have at least minChain AUMs stored, and minChain number
// of ancestors have been marked for retention. We now continue to iterate backwards
// till we find an AUM which we can compact to (a Checkpoint AUM).
for {
h := next.Hash()
verdict[h] |= retainStateActive
if next.MessageKind == AUMCheckpoint {
lastActiveAncestor = h
break
}
parent, hasParent := next.Parent()
if !hasParent {
return AUMHash{}, errors.New("reached genesis AUM without finding an appropriate lastActiveAncestor")
}
if next, err = storage.AUM(parent); err != nil {
return AUMHash{}, fmt.Errorf("searching for compaction target: %w", err)
}
}
// Mark remaining known ancestors as retainStateCandidate.
for {
parent, hasParent := next.Parent()
if !hasParent {
break
}
verdict[parent] |= retainStateCandidate
if next, err = storage.AUM(parent); err != nil {
if err == os.ErrNotExist {
// We've reached the end of the chain we have stored.
break
}
return AUMHash{}, fmt.Errorf("reading active chain (retainStateCandidate): %w", err)
}
}
return lastActiveAncestor, nil
}
// markYoungAUMs marks all AUMs younger than minAge for retention. All
// candidate AUMs must exist in verdict.
func markYoungAUMs(storage CompactableChonk, verdict map[AUMHash]retainState, minAge time.Duration) error {
minTime := time.Now().Add(-minAge)
for h, _ := range verdict {
commitTime, err := storage.CommitTime(h)
if err != nil {
return err
}
if commitTime.After(minTime) {
verdict[h] |= retainStateYoung
}
}
return nil
}
// markAncestorIntersectionAUMs walks backwards from all AUMs to be retained,
// ensuring they intersect with candidateAncestor. All AUMs between a retained
// AUM and candidateAncestor are marked for retention.
//
// If there is no intersection between candidateAncestor and the ancestors of
// a retained AUM (this can happen if a retained AUM intersects the main chain
// before candidateAncestor) then candidate ancestor is recomputed based on
// the new oldest intersection.
//
// The final value for lastActiveAncestor is returned.
func markAncestorIntersectionAUMs(storage Chonk, verdict map[AUMHash]retainState, candidateAncestor AUMHash) (lastActiveAncestor AUMHash, err error) {
toScan := make([]AUMHash, 0, len(verdict))
for h, v := range verdict {
if (v & retainAUMMask) == 0 {
continue // not marked for retention, so dont need to consider it
}
if h == candidateAncestor {
continue
}
toScan = append(toScan, h)
}
var didAdjustCandidateAncestor bool
for len(toScan) > 0 {
nextIterScan := make([]AUMHash, 0, len(verdict))
for _, h := range toScan {
if verdict[h]&retainStateAncestor != 0 {
// This AUM and its ancestors have already been iterated.
continue
}
verdict[h] |= retainStateAncestor
a, err := storage.AUM(h)
if err != nil {
return AUMHash{}, fmt.Errorf("reading %v: %w", h, err)
}
parent, hasParent := a.Parent()
if !hasParent {
return AUMHash{}, errors.New("reached genesis AUM without intersecting with candidate ancestor")
}
if verdict[parent]&retainAUMMask != 0 {
// Includes candidateAncestor (has retainStateActive set)
continue
}
if verdict[parent]&retainStateCandidate != 0 {
// We've intersected with the active chain but haven't done so through
// candidateAncestor. That means that we intersect the active chain
// before candidateAncestor, hence candidateAncestor actually needs
// to be earlier than it is now.
candidateAncestor = parent
didAdjustCandidateAncestor = true
verdict[parent] |= retainStateAncestor
// There could be AUMs on the active chain between our new candidateAncestor
// and the old one, make sure they are marked as retained.
next := parent
childLoop:
for {
children, err := storage.ChildAUMs(next)
if err != nil {
return AUMHash{}, fmt.Errorf("reading children %v: %w", next, err)
}
// While there can be many children of an AUM, there can only be
// one child on the active chain (it will have retainStateCandidate set).
for _, a := range children {
h := a.Hash()
if v := verdict[h]; v&retainStateCandidate != 0 && v&retainStateActive == 0 {
verdict[h] |= retainStateAncestor
next = h
continue childLoop
}
}
break
}
}
nextIterScan = append(nextIterScan, parent)
}
toScan = nextIterScan
}
// If candidateAncestor was adjusted backwards, then it may not be a checkpoint
// (and hence a valid compaction candidate). If so, iterate backwards and adjust
// the candidateAncestor till we find a checkpoint.
if didAdjustCandidateAncestor {
var next AUM
if next, err = storage.AUM(candidateAncestor); err != nil {
return AUMHash{}, fmt.Errorf("searching for compaction target: %w", err)
}
for {
h := next.Hash()
verdict[h] |= retainStateActive
if next.MessageKind == AUMCheckpoint {
candidateAncestor = h
break
}
parent, hasParent := next.Parent()
if !hasParent {
return AUMHash{}, errors.New("reached genesis AUM without finding an appropriate candidateAncestor")
}
if next, err = storage.AUM(parent); err != nil {
return AUMHash{}, fmt.Errorf("searching for compaction target: %w", err)
}
}
}
return candidateAncestor, nil
}
// markDescendantAUMs marks all children of a retained AUM as retained.
func markDescendantAUMs(storage Chonk, verdict map[AUMHash]retainState) error {
toScan := make([]AUMHash, 0, len(verdict))
for h, v := range verdict {
if v&retainAUMMask == 0 {
continue // not marked, so dont need to mark descendants
}
toScan = append(toScan, h)
}
for len(toScan) > 0 {
nextIterScan := make([]AUMHash, 0, len(verdict))
for _, h := range toScan {
if verdict[h]&retainStateLeaf != 0 {
// This AUM and its decendants have already been marked.
continue
}
verdict[h] |= retainStateLeaf
children, err := storage.ChildAUMs(h)
if err != nil {
return err
}
for _, a := range children {
nextIterScan = append(nextIterScan, a.Hash())
}
}
toScan = nextIterScan
}
return nil
}
// Compact deletes old AUMs from storage, based on the parameters given in opts.
func Compact(storage CompactableChonk, head AUMHash, opts CompactionOptions) (lastActiveAncestor AUMHash, err error) {
if opts.MinChain == 0 {
return AUMHash{}, errors.New("opts.MinChain must be set")
}
if opts.MinAge == 0 {
return AUMHash{}, errors.New("opts.MinAge must be set")
}
all, err := storage.AllAUMs()
if err != nil {
return AUMHash{}, fmt.Errorf("AllAUMs: %w", err)
}
verdict := make(map[AUMHash]retainState, len(all))
for _, h := range all {
verdict[h] = 0
}
if lastActiveAncestor, err = markActiveChain(storage, verdict, opts.MinChain, head); err != nil {
return AUMHash{}, fmt.Errorf("marking active chain: %w", err)
}
if err := markYoungAUMs(storage, verdict, opts.MinAge); err != nil {
return AUMHash{}, fmt.Errorf("marking young AUMs: %w", err)
}
if err := markDescendantAUMs(storage, verdict); err != nil {
return AUMHash{}, fmt.Errorf("marking decendant AUMs: %w", err)
}
if lastActiveAncestor, err = markAncestorIntersectionAUMs(storage, verdict, lastActiveAncestor); err != nil {
return AUMHash{}, fmt.Errorf("marking ancestor intersection: %w", err)
}
toDelete := make([]AUMHash, 0, len(verdict))
for h, v := range verdict {
if v&retainAUMMask == 0 { // no retention set
toDelete = append(toDelete, h)
}
}
return lastActiveAncestor, storage.PurgeAUMs(toDelete)
}