ipn/store/kubestore: cache state in memory (#13918)

Cache state in memory on writes, read from memory
in reads.
kubestore was previously always reading state from a Secret.
This change should fix bugs caused by temporary loss of access
to kube API server and imporove overall performance

Fixes #7671
Updates tailscale/tailscale#12079,tailscale/tailscale#13900

Signed-off-by: Maisem Ali <maisem@tailscale.com>
Signed-off-by: Irbe Krumina <irbe@tailscale.com>
Co-authored-by: Maisem Ali <maisem@tailscale.com>
pull/13176/merge
Irbe Krumina 4 days ago committed by GitHub
parent 6ab39b7bcd
commit 853fe3b713
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -13,19 +13,27 @@ import (
"time" "time"
"tailscale.com/ipn" "tailscale.com/ipn"
"tailscale.com/ipn/store/mem"
"tailscale.com/kube/kubeapi" "tailscale.com/kube/kubeapi"
"tailscale.com/kube/kubeclient" "tailscale.com/kube/kubeclient"
"tailscale.com/types/logger" "tailscale.com/types/logger"
) )
// TODO(irbekrm): should we bump this? should we have retries? See tailscale/tailscale#13024
const timeout = 5 * time.Second
// Store is an ipn.StateStore that uses a Kubernetes Secret for persistence. // Store is an ipn.StateStore that uses a Kubernetes Secret for persistence.
type Store struct { type Store struct {
client kubeclient.Client client kubeclient.Client
canPatch bool canPatch bool
secretName string secretName string
// memory holds the latest tailscale state. Writes write state to a kube Secret and memory, Reads read from
// memory.
memory mem.Store
} }
// New returns a new Store that persists to the named secret. // New returns a new Store that persists to the named Secret.
func New(_ logger.Logf, secretName string) (*Store, error) { func New(_ logger.Logf, secretName string) (*Store, error) {
c, err := kubeclient.New() c, err := kubeclient.New()
if err != nil { if err != nil {
@ -39,11 +47,16 @@ func New(_ logger.Logf, secretName string) (*Store, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Store{ s := &Store{
client: c, client: c,
canPatch: canPatch, canPatch: canPatch,
secretName: secretName, secretName: secretName,
}, nil }
// Load latest state from kube Secret if it already exists.
if err := s.loadState(); err != nil {
return nil, fmt.Errorf("error loading state from kube Secret: %w", err)
}
return s, nil
} }
func (s *Store) SetDialer(d func(ctx context.Context, network, address string) (net.Conn, error)) { func (s *Store) SetDialer(d func(ctx context.Context, network, address string) (net.Conn, error)) {
@ -54,37 +67,17 @@ func (s *Store) String() string { return "kube.Store" }
// ReadState implements the StateStore interface. // ReadState implements the StateStore interface.
func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) { func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) return s.memory.ReadState(ipn.StateKey(sanitizeKey(id)))
defer cancel()
secret, err := s.client.GetSecret(ctx, s.secretName)
if err != nil {
if st, ok := err.(*kubeapi.Status); ok && st.Code == 404 {
return nil, ipn.ErrStateNotExist
}
return nil, err
}
b, ok := secret.Data[sanitizeKey(id)]
if !ok {
return nil, ipn.ErrStateNotExist
}
return b, nil
}
func sanitizeKey(k ipn.StateKey) string {
// The only valid characters in a Kubernetes secret key are alphanumeric, -,
// _, and .
return strings.Map(func(r rune) rune {
if r >= 'a' && r <= 'z' || r >= 'A' && r <= 'Z' || r >= '0' && r <= '9' || r == '-' || r == '_' || r == '.' {
return r
}
return '_'
}, string(k))
} }
// WriteState implements the StateStore interface. // WriteState implements the StateStore interface.
func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer func() {
if err == nil {
s.memory.WriteState(ipn.StateKey(sanitizeKey(id)), bs)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
secret, err := s.client.GetSecret(ctx, s.secretName) secret, err := s.client.GetSecret(ctx, s.secretName)
@ -137,3 +130,29 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
} }
return err return err
} }
func (s *Store) loadState() error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
secret, err := s.client.GetSecret(ctx, s.secretName)
if err != nil {
if st, ok := err.(*kubeapi.Status); ok && st.Code == 404 {
return ipn.ErrStateNotExist
}
return err
}
s.memory.LoadFromMap(secret.Data)
return nil
}
func sanitizeKey(k ipn.StateKey) string {
// The only valid characters in a Kubernetes secret key are alphanumeric, -,
// _, and .
return strings.Map(func(r rune) rune {
if r >= 'a' && r <= 'z' || r >= 'A' && r <= 'Z' || r >= '0' && r <= '9' || r == '-' || r == '_' || r == '.' {
return r
}
return '_'
}, string(k))
}

@ -9,8 +9,10 @@ import (
"encoding/json" "encoding/json"
"sync" "sync"
xmaps "golang.org/x/exp/maps"
"tailscale.com/ipn" "tailscale.com/ipn"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/mak"
) )
// New returns a new Store. // New returns a new Store.
@ -28,6 +30,7 @@ type Store struct {
func (s *Store) String() string { return "mem.Store" } func (s *Store) String() string { return "mem.Store" }
// ReadState implements the StateStore interface. // ReadState implements the StateStore interface.
// It returns ipn.ErrStateNotExist if the state does not exist.
func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) { func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -39,6 +42,7 @@ func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
} }
// WriteState implements the StateStore interface. // WriteState implements the StateStore interface.
// It never returns an error.
func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -49,6 +53,19 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
return nil return nil
} }
// LoadFromMap loads the in-memory cache from the provided map.
// Any existing content is cleared, and the provided map is
// copied into the cache.
func (s *Store) LoadFromMap(m map[string][]byte) {
s.mu.Lock()
defer s.mu.Unlock()
xmaps.Clear(s.cache)
for k, v := range m {
mak.Set(&s.cache, ipn.StateKey(k), v)
}
return
}
// LoadFromJSON attempts to unmarshal json content into the // LoadFromJSON attempts to unmarshal json content into the
// in-memory cache. // in-memory cache.
func (s *Store) LoadFromJSON(data []byte) error { func (s *Store) LoadFromJSON(data []byte) error {

Loading…
Cancel
Save