mirror of https://github.com/tailscale/tailscale/
cmd/{k8s-operator,k8s-proxy}: add kube-apiserver ProxyGroup type (#16266)
Adds a new k8s-proxy command to convert operator's in-process proxy to
a separately deployable type of ProxyGroup: kube-apiserver. k8s-proxy
reads in a new config file written by the operator, modelled on tailscaled's
conffile but with some modifications to ensure multiple versions of the
config can co-exist within a file. This should make it much easier to
support reading that config file from a Kube Secret with a stable file name.
To avoid needing to give the operator ClusterRole{,Binding} permissions,
the helm chart now optionally deploys a new static ServiceAccount for
the API Server proxy to use if in auth mode.
Proxies deployed by kube-apiserver ProxyGroups currently work the same as
the operator's in-process proxy. They do not yet leverage Tailscale Services
for presenting a single HA DNS name.
Updates #13358
Change-Id: Ib6ead69b2173c5e1929f3c13fb48a9a5362195d8
Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
pull/16497/head^2
parent
90bf0a97b3
commit
4dfed6b146
@ -0,0 +1,61 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
type apiServerProxyMode int
|
||||
|
||||
func (a apiServerProxyMode) String() string {
|
||||
switch a {
|
||||
case apiServerProxyModeDisabled:
|
||||
return "disabled"
|
||||
case apiServerProxyModeEnabled:
|
||||
return "auth"
|
||||
case apiServerProxyModeNoAuth:
|
||||
return "noauth"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
apiServerProxyModeDisabled apiServerProxyMode = iota
|
||||
apiServerProxyModeEnabled
|
||||
apiServerProxyModeNoAuth
|
||||
)
|
||||
|
||||
func parseAPIProxyMode() apiServerProxyMode {
|
||||
haveAuthProxyEnv := os.Getenv("AUTH_PROXY") != ""
|
||||
haveAPIProxyEnv := os.Getenv("APISERVER_PROXY") != ""
|
||||
switch {
|
||||
case haveAPIProxyEnv && haveAuthProxyEnv:
|
||||
log.Fatal("AUTH_PROXY (deprecated) and APISERVER_PROXY are mutually exclusive, please unset AUTH_PROXY")
|
||||
case haveAuthProxyEnv:
|
||||
var authProxyEnv = defaultBool("AUTH_PROXY", false) // deprecated
|
||||
if authProxyEnv {
|
||||
return apiServerProxyModeEnabled
|
||||
}
|
||||
return apiServerProxyModeDisabled
|
||||
case haveAPIProxyEnv:
|
||||
var apiProxyEnv = defaultEnv("APISERVER_PROXY", "") // true, false or "noauth"
|
||||
switch apiProxyEnv {
|
||||
case "true":
|
||||
return apiServerProxyModeEnabled
|
||||
case "false", "":
|
||||
return apiServerProxyModeDisabled
|
||||
case "noauth":
|
||||
return apiServerProxyModeNoAuth
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown APISERVER_PROXY value %q", apiProxyEnv))
|
||||
}
|
||||
}
|
||||
return apiServerProxyModeDisabled
|
||||
}
|
||||
@ -0,0 +1,197 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
// k8s-proxy proxies between tailnet and Kubernetes cluster traffic.
|
||||
// Currently, it only supports proxying tailnet clients to the Kubernetes API
|
||||
// server.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"tailscale.com/hostinfo"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/store"
|
||||
apiproxy "tailscale.com/k8s-operator/api-proxy"
|
||||
"tailscale.com/kube/k8s-proxy/conf"
|
||||
"tailscale.com/kube/state"
|
||||
"tailscale.com/tsnet"
|
||||
)
|
||||
|
||||
func main() {
|
||||
logger := zap.Must(zap.NewProduction()).Sugar()
|
||||
defer logger.Sync()
|
||||
if err := run(logger); err != nil {
|
||||
logger.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func run(logger *zap.SugaredLogger) error {
|
||||
var (
|
||||
configFile = os.Getenv("TS_K8S_PROXY_CONFIG")
|
||||
podUID = os.Getenv("POD_UID")
|
||||
)
|
||||
if configFile == "" {
|
||||
return errors.New("TS_K8S_PROXY_CONFIG unset")
|
||||
}
|
||||
|
||||
// TODO(tomhjp): Support reloading config.
|
||||
// TODO(tomhjp): Support reading config from a Secret.
|
||||
cfg, err := conf.Load(configFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error loading config file %q: %w", configFile, err)
|
||||
}
|
||||
|
||||
if cfg.Parsed.LogLevel != nil {
|
||||
level, err := zapcore.ParseLevel(*cfg.Parsed.LogLevel)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing log level %q: %w", *cfg.Parsed.LogLevel, err)
|
||||
}
|
||||
logger = logger.WithOptions(zap.IncreaseLevel(level))
|
||||
}
|
||||
|
||||
if cfg.Parsed.App != nil {
|
||||
hostinfo.SetApp(*cfg.Parsed.App)
|
||||
}
|
||||
|
||||
st, err := getStateStore(cfg.Parsed.State, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If Pod UID unset, assume we're running outside of a cluster/not managed
|
||||
// by the operator, so no need to set additional state keys.
|
||||
if podUID != "" {
|
||||
if err := state.SetInitialKeys(st, podUID); err != nil {
|
||||
return fmt.Errorf("error setting initial state: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var authKey string
|
||||
if cfg.Parsed.AuthKey != nil {
|
||||
authKey = *cfg.Parsed.AuthKey
|
||||
}
|
||||
|
||||
ts := &tsnet.Server{
|
||||
Logf: logger.Named("tsnet").Debugf,
|
||||
UserLogf: logger.Named("tsnet").Infof,
|
||||
Store: st,
|
||||
AuthKey: authKey,
|
||||
}
|
||||
if cfg.Parsed.Hostname != nil {
|
||||
ts.Hostname = *cfg.Parsed.Hostname
|
||||
}
|
||||
|
||||
// ctx to live for the lifetime of the process.
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
|
||||
// Make sure we crash loop if Up doesn't complete in reasonable time.
|
||||
upCtx, upCancel := context.WithTimeout(ctx, time.Minute)
|
||||
defer upCancel()
|
||||
if _, err := ts.Up(upCtx); err != nil {
|
||||
return fmt.Errorf("error starting tailscale server: %w", err)
|
||||
}
|
||||
defer ts.Close()
|
||||
|
||||
group, groupCtx := errgroup.WithContext(ctx)
|
||||
|
||||
// Setup for updating state keys.
|
||||
if podUID != "" {
|
||||
lc, err := ts.LocalClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting local client: %w", err)
|
||||
}
|
||||
w, err := lc.WatchIPNBus(groupCtx, ipn.NotifyInitialNetMap)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error watching IPN bus: %w", err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
group.Go(func() error {
|
||||
if err := state.KeepKeysUpdated(st, w.Next); err != nil && err != groupCtx.Err() {
|
||||
return fmt.Errorf("error keeping state keys updated: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Setup for the API server proxy.
|
||||
restConfig, err := getRestConfig(logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting rest config: %w", err)
|
||||
}
|
||||
authMode := true
|
||||
if cfg.Parsed.KubeAPIServer != nil {
|
||||
v, ok := cfg.Parsed.KubeAPIServer.AuthMode.Get()
|
||||
if ok {
|
||||
authMode = v
|
||||
}
|
||||
}
|
||||
ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, authMode)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating api server proxy: %w", err)
|
||||
}
|
||||
|
||||
// TODO(tomhjp): Work out whether we should use TS_CERT_SHARE_MODE or not,
|
||||
// and possibly issue certs upfront here before serving.
|
||||
group.Go(func() error {
|
||||
if err := ap.Run(groupCtx); err != nil {
|
||||
return fmt.Errorf("error running API server proxy: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return group.Wait()
|
||||
}
|
||||
|
||||
func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, error) {
|
||||
p := "mem:"
|
||||
if path != nil {
|
||||
p = *path
|
||||
} else {
|
||||
logger.Warn("No state Secret provided; using in-memory store, which will lose state on restart")
|
||||
}
|
||||
st, err := store.New(logger.Errorf, p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating state store: %w", err)
|
||||
}
|
||||
|
||||
return st, nil
|
||||
}
|
||||
|
||||
func getRestConfig(logger *zap.SugaredLogger) (*rest.Config, error) {
|
||||
restConfig, err := rest.InClusterConfig()
|
||||
switch err {
|
||||
case nil:
|
||||
return restConfig, nil
|
||||
case rest.ErrNotInCluster:
|
||||
logger.Info("Not running in-cluster, falling back to kubeconfig")
|
||||
default:
|
||||
return nil, fmt.Errorf("error getting in-cluster config: %w", err)
|
||||
}
|
||||
|
||||
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
|
||||
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil)
|
||||
restConfig, err = clientConfig.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error loading kubeconfig: %w", err)
|
||||
}
|
||||
|
||||
return restConfig, nil
|
||||
}
|
||||
@ -1,29 +0,0 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package apiproxy
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"tailscale.com/types/opt"
|
||||
)
|
||||
|
||||
func defaultBool(envName string, defVal bool) bool {
|
||||
vs := os.Getenv(envName)
|
||||
if vs == "" {
|
||||
return defVal
|
||||
}
|
||||
v, _ := opt.Bool(vs).Get()
|
||||
return v
|
||||
}
|
||||
|
||||
func defaultEnv(envName, defVal string) string {
|
||||
v := os.Getenv(envName)
|
||||
if v == "" {
|
||||
return defVal
|
||||
}
|
||||
return v
|
||||
}
|
||||
@ -0,0 +1,101 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
// Package conf contains code to load, manipulate, and access config file
|
||||
// settings for k8s-proxy.
|
||||
package conf
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/tailscale/hujson"
|
||||
"tailscale.com/types/opt"
|
||||
)
|
||||
|
||||
const v1Alpha1 = "v1alpha1"
|
||||
|
||||
// Config describes a config file.
|
||||
type Config struct {
|
||||
Path string // disk path of HuJSON
|
||||
Raw []byte // raw bytes from disk, in HuJSON form
|
||||
Std []byte // standardized JSON form
|
||||
Version string // "v1alpha1"
|
||||
|
||||
// Parsed is the parsed config, converted from its on-disk version to the
|
||||
// latest known format.
|
||||
Parsed ConfigV1Alpha1
|
||||
}
|
||||
|
||||
// VersionedConfig allows specifying config at the root of the object, or in
|
||||
// a versioned sub-object.
|
||||
// e.g. {"version": "v1alpha1", "authKey": "abc123"}
|
||||
// or {"version": "v1beta1", "a-beta-config": "a-beta-value", "v1alpha1": {"authKey": "abc123"}}
|
||||
type VersionedConfig struct {
|
||||
Version string `json:",omitempty"` // "v1alpha1"
|
||||
|
||||
// Latest version of the config.
|
||||
*ConfigV1Alpha1
|
||||
|
||||
// Backwards compatibility version(s) of the config. Fields and sub-fields
|
||||
// from here should only be added to, never changed in place.
|
||||
V1Alpha1 *ConfigV1Alpha1 `json:",omitempty"`
|
||||
// V1Beta1 *ConfigV1Beta1 `json:",omitempty"` // Not yet used.
|
||||
}
|
||||
|
||||
type ConfigV1Alpha1 struct {
|
||||
AuthKey *string `json:",omitempty"` // Tailscale auth key to use.
|
||||
Hostname *string `json:",omitempty"` // Tailscale device hostname.
|
||||
State *string `json:",omitempty"` // Path to the Tailscale state.
|
||||
LogLevel *string `json:",omitempty"` // "debug", "info". Defaults to "info".
|
||||
App *string `json:",omitempty"` // e.g. kubetypes.AppProxyGroupKubeAPIServer
|
||||
KubeAPIServer *KubeAPIServer `json:",omitempty"` // Config specific to the API Server proxy.
|
||||
}
|
||||
|
||||
type KubeAPIServer struct {
|
||||
AuthMode opt.Bool `json:",omitempty"`
|
||||
}
|
||||
|
||||
// Load reads and parses the config file at the provided path on disk.
|
||||
func Load(path string) (c Config, err error) {
|
||||
c.Path = path
|
||||
|
||||
c.Raw, err = os.ReadFile(path)
|
||||
if err != nil {
|
||||
return c, fmt.Errorf("error reading config file %q: %w", path, err)
|
||||
}
|
||||
c.Std, err = hujson.Standardize(c.Raw)
|
||||
if err != nil {
|
||||
return c, fmt.Errorf("error parsing config file %q HuJSON/JSON: %w", path, err)
|
||||
}
|
||||
var ver VersionedConfig
|
||||
if err := json.Unmarshal(c.Std, &ver); err != nil {
|
||||
return c, fmt.Errorf("error parsing config file %q: %w", path, err)
|
||||
}
|
||||
rootV1Alpha1 := (ver.Version == v1Alpha1)
|
||||
backCompatV1Alpha1 := (ver.V1Alpha1 != nil)
|
||||
switch {
|
||||
case ver.Version == "":
|
||||
return c, fmt.Errorf("error parsing config file %q: no \"version\" field provided", path)
|
||||
case rootV1Alpha1 && backCompatV1Alpha1:
|
||||
// Exactly one of these should be set.
|
||||
return c, fmt.Errorf("error parsing config file %q: both root and v1alpha1 config provided", path)
|
||||
case rootV1Alpha1 != backCompatV1Alpha1:
|
||||
c.Version = v1Alpha1
|
||||
switch {
|
||||
case rootV1Alpha1 && ver.ConfigV1Alpha1 != nil:
|
||||
c.Parsed = *ver.ConfigV1Alpha1
|
||||
case backCompatV1Alpha1:
|
||||
c.Parsed = *ver.V1Alpha1
|
||||
default:
|
||||
c.Parsed = ConfigV1Alpha1{}
|
||||
}
|
||||
default:
|
||||
return c, fmt.Errorf("error parsing config file %q: unsupported \"version\" value %q; want \"%s\"", path, ver.Version, v1Alpha1)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
@ -0,0 +1,86 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package conf
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"tailscale.com/types/ptr"
|
||||
)
|
||||
|
||||
// Test that the config file can be at the root of the object, or in a versioned sub-object.
|
||||
// or {"version": "v1beta1", "a-beta-config": "a-beta-value", "v1alpha1": {"authKey": "abc123"}}
|
||||
func TestVersionedConfig(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
inputConfig string
|
||||
expectedConfig ConfigV1Alpha1
|
||||
expectedError string
|
||||
}{
|
||||
"root_config_v1alpha1": {
|
||||
inputConfig: `{"version": "v1alpha1", "authKey": "abc123"}`,
|
||||
expectedConfig: ConfigV1Alpha1{AuthKey: ptr.To("abc123")},
|
||||
},
|
||||
"backwards_compat_v1alpha1_config": {
|
||||
// Client doesn't know about v1beta1, so it should read in v1alpha1.
|
||||
inputConfig: `{"version": "v1beta1", "beta-key": "beta-value", "authKey": "def456", "v1alpha1": {"authKey": "abc123"}}`,
|
||||
expectedConfig: ConfigV1Alpha1{AuthKey: ptr.To("abc123")},
|
||||
},
|
||||
"unknown_key_allowed": {
|
||||
// Adding new keys to the config doesn't require a version bump.
|
||||
inputConfig: `{"version": "v1alpha1", "unknown-key": "unknown-value", "authKey": "abc123"}`,
|
||||
expectedConfig: ConfigV1Alpha1{AuthKey: ptr.To("abc123")},
|
||||
},
|
||||
"version_only_no_authkey": {
|
||||
inputConfig: `{"version": "v1alpha1"}`,
|
||||
expectedConfig: ConfigV1Alpha1{},
|
||||
},
|
||||
"both_config_v1alpha1": {
|
||||
inputConfig: `{"version": "v1alpha1", "authKey": "abc123", "v1alpha1": {"authKey": "def456"}}`,
|
||||
expectedError: "both root and v1alpha1 config provided",
|
||||
},
|
||||
"empty_config": {
|
||||
inputConfig: `{}`,
|
||||
expectedError: `no "version" field provided`,
|
||||
},
|
||||
"v1beta1_without_backwards_compat": {
|
||||
inputConfig: `{"version": "v1beta1", "beta-key": "beta-value", "authKey": "def456"}`,
|
||||
expectedError: `unsupported "version" value "v1beta1"; want "v1alpha1"`,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "config.json")
|
||||
if err := os.WriteFile(path, []byte(tc.inputConfig), 0644); err != nil {
|
||||
t.Fatalf("failed to write config file: %v", err)
|
||||
}
|
||||
cfg, err := Load(path)
|
||||
switch {
|
||||
case tc.expectedError == "" && err != nil:
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
case tc.expectedError != "":
|
||||
if err == nil {
|
||||
t.Fatalf("expected error %q, got nil", tc.expectedError)
|
||||
} else if !strings.Contains(err.Error(), tc.expectedError) {
|
||||
t.Fatalf("expected error %q, got %q", tc.expectedError, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
if cfg.Version != "v1alpha1" {
|
||||
t.Fatalf("expected version %q, got %q", "v1alpha1", cfg.Version)
|
||||
}
|
||||
// Diff actual vs expected config.
|
||||
if diff := cmp.Diff(cfg.Parsed, tc.expectedConfig); diff != "" {
|
||||
t.Fatalf("Unexpected parsed config (-got +want):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,97 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
// Package state updates state keys for tailnet client devices managed by the
|
||||
// operator. These keys are used to signal readiness, metadata, and current
|
||||
// configuration state to the operator. Client packages deployed by the operator
|
||||
// include containerboot, tsrecorder, and k8s-proxy, but currently containerboot
|
||||
// has its own implementation to manage the same keys.
|
||||
package state
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/util/deephash"
|
||||
)
|
||||
|
||||
const (
|
||||
keyPodUID = ipn.StateKey(kubetypes.KeyPodUID)
|
||||
keyCapVer = ipn.StateKey(kubetypes.KeyCapVer)
|
||||
keyDeviceID = ipn.StateKey(kubetypes.KeyDeviceID)
|
||||
keyDeviceIPs = ipn.StateKey(kubetypes.KeyDeviceIPs)
|
||||
keyDeviceFQDN = ipn.StateKey(kubetypes.KeyDeviceFQDN)
|
||||
)
|
||||
|
||||
// SetInitialKeys sets Pod UID and cap ver and clears tailnet device state
|
||||
// keys to help stop the operator using stale tailnet device state.
|
||||
func SetInitialKeys(store ipn.StateStore, podUID string) error {
|
||||
// Clear device state keys first so the operator knows if the pod UID
|
||||
// matches, the other values are definitely not stale.
|
||||
for _, key := range []ipn.StateKey{keyDeviceID, keyDeviceFQDN, keyDeviceIPs} {
|
||||
if _, err := store.ReadState(key); err == nil {
|
||||
if err := store.WriteState(key, nil); err != nil {
|
||||
return fmt.Errorf("error writing %q to state store: %w", key, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := store.WriteState(keyPodUID, []byte(podUID)); err != nil {
|
||||
return fmt.Errorf("error writing pod UID to state store: %w", err)
|
||||
}
|
||||
if err := store.WriteState(keyCapVer, fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion)); err != nil {
|
||||
return fmt.Errorf("error writing capability version to state store: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// KeepKeysUpdated sets state store keys consistent with containerboot to
|
||||
// signal proxy readiness to the operator. It runs until its context is
|
||||
// cancelled or it hits an error. The passed in next function is expected to be
|
||||
// from a local.IPNBusWatcher that is at least subscribed to
|
||||
// ipn.NotifyInitialNetMap.
|
||||
func KeepKeysUpdated(store ipn.StateStore, next func() (ipn.Notify, error)) error {
|
||||
var currentDeviceID, currentDeviceIPs, currentDeviceFQDN deephash.Sum
|
||||
|
||||
for {
|
||||
n, err := next() // Blocks on a streaming LocalAPI HTTP call.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n.NetMap == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if deviceID := n.NetMap.SelfNode.StableID(); deephash.Update(¤tDeviceID, &deviceID) {
|
||||
if err := store.WriteState(keyDeviceID, []byte(deviceID)); err != nil {
|
||||
return fmt.Errorf("failed to store device ID in state: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if fqdn := n.NetMap.SelfNode.Name(); deephash.Update(¤tDeviceFQDN, &fqdn) {
|
||||
if err := store.WriteState(keyDeviceFQDN, []byte(fqdn)); err != nil {
|
||||
return fmt.Errorf("failed to store device FQDN in state: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if addrs := n.NetMap.SelfNode.Addresses(); deephash.Update(¤tDeviceIPs, &addrs) {
|
||||
var deviceIPs []string
|
||||
for _, addr := range addrs.AsSlice() {
|
||||
deviceIPs = append(deviceIPs, addr.Addr().String())
|
||||
}
|
||||
deviceIPsValue, err := json.Marshal(deviceIPs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := store.WriteState(keyDeviceIPs, deviceIPsValue); err != nil {
|
||||
return fmt.Errorf("failed to store device IPs in state: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,203 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
//go:build !plan9
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/store"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/types/netmap"
|
||||
)
|
||||
|
||||
func TestSetInitialStateKeys(t *testing.T) {
|
||||
var (
|
||||
podUID = []byte("test-pod-uid")
|
||||
expectedCapVer = fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion)
|
||||
)
|
||||
for name, tc := range map[string]struct {
|
||||
initial map[ipn.StateKey][]byte
|
||||
expected map[ipn.StateKey][]byte
|
||||
}{
|
||||
"empty_initial": {
|
||||
initial: map[ipn.StateKey][]byte{},
|
||||
expected: map[ipn.StateKey][]byte{
|
||||
keyPodUID: podUID,
|
||||
keyCapVer: expectedCapVer,
|
||||
},
|
||||
},
|
||||
"existing_pod_uid_and_capver": {
|
||||
initial: map[ipn.StateKey][]byte{
|
||||
keyPodUID: podUID,
|
||||
keyCapVer: expectedCapVer,
|
||||
},
|
||||
expected: map[ipn.StateKey][]byte{
|
||||
keyPodUID: podUID,
|
||||
keyCapVer: expectedCapVer,
|
||||
},
|
||||
},
|
||||
"all_keys_preexisting": {
|
||||
initial: map[ipn.StateKey][]byte{
|
||||
keyPodUID: podUID,
|
||||
keyCapVer: expectedCapVer,
|
||||
keyDeviceID: []byte("existing-device-id"),
|
||||
keyDeviceFQDN: []byte("existing-device-fqdn"),
|
||||
keyDeviceIPs: []byte(`["1.2.3.4"]`),
|
||||
},
|
||||
expected: map[ipn.StateKey][]byte{
|
||||
keyPodUID: podUID,
|
||||
keyCapVer: expectedCapVer,
|
||||
keyDeviceID: nil,
|
||||
keyDeviceFQDN: nil,
|
||||
keyDeviceIPs: nil,
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
store, err := store.New(logger.Discard, "mem:")
|
||||
if err != nil {
|
||||
t.Fatalf("error creating in-memory store: %v", err)
|
||||
}
|
||||
|
||||
for key, value := range tc.initial {
|
||||
if err := store.WriteState(key, value); err != nil {
|
||||
t.Fatalf("error writing initial state key %q: %v", key, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := SetInitialKeys(store, string(podUID)); err != nil {
|
||||
t.Fatalf("setInitialStateKeys failed: %v", err)
|
||||
}
|
||||
|
||||
actual := make(map[ipn.StateKey][]byte)
|
||||
for expectedKey, expectedValue := range tc.expected {
|
||||
actualValue, err := store.ReadState(expectedKey)
|
||||
if err != nil {
|
||||
t.Errorf("error reading state key %q: %v", expectedKey, err)
|
||||
continue
|
||||
}
|
||||
|
||||
actual[expectedKey] = actualValue
|
||||
if !bytes.Equal(actualValue, expectedValue) {
|
||||
t.Errorf("state key %q mismatch: expected %q, got %q", expectedKey, expectedValue, actualValue)
|
||||
}
|
||||
}
|
||||
if diff := cmp.Diff(actual, tc.expected); diff != "" {
|
||||
t.Errorf("state keys mismatch (-got +want):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestKeepStateKeysUpdated(t *testing.T) {
|
||||
store, err := store.New(logger.Discard, "mem:")
|
||||
if err != nil {
|
||||
t.Fatalf("error creating in-memory store: %v", err)
|
||||
}
|
||||
|
||||
nextWaiting := make(chan struct{})
|
||||
go func() {
|
||||
<-nextWaiting // Acknowledge the initial signal.
|
||||
}()
|
||||
notifyCh := make(chan ipn.Notify)
|
||||
next := func() (ipn.Notify, error) {
|
||||
nextWaiting <- struct{}{} // Send signal to test that state is consistent.
|
||||
return <-notifyCh, nil // Wait for test input.
|
||||
}
|
||||
|
||||
errs := make(chan error, 1)
|
||||
go func() {
|
||||
err := KeepKeysUpdated(store, next)
|
||||
if err != nil {
|
||||
errs <- fmt.Errorf("keepStateKeysUpdated returned with error: %w", err)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
notify ipn.Notify
|
||||
expected map[ipn.StateKey][]byte
|
||||
}{
|
||||
{
|
||||
name: "initial_not_authed",
|
||||
notify: ipn.Notify{},
|
||||
expected: map[ipn.StateKey][]byte{
|
||||
keyDeviceID: nil,
|
||||
keyDeviceFQDN: nil,
|
||||
keyDeviceIPs: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "authed",
|
||||
notify: ipn.Notify{
|
||||
NetMap: &netmap.NetworkMap{
|
||||
SelfNode: (&tailcfg.Node{
|
||||
StableID: "TESTCTRL00000001",
|
||||
Name: "test-node.test.ts.net",
|
||||
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.1/32"), netip.MustParsePrefix("fd7a:115c:a1e0:ab12:4843:cd96:0:1/128")},
|
||||
}).View(),
|
||||
},
|
||||
},
|
||||
expected: map[ipn.StateKey][]byte{
|
||||
keyDeviceID: []byte("TESTCTRL00000001"),
|
||||
keyDeviceFQDN: []byte("test-node.test.ts.net"),
|
||||
keyDeviceIPs: []byte(`["100.64.0.1","fd7a:115c:a1e0:ab12:4843:cd96:0:1"]`),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "updated_fields",
|
||||
notify: ipn.Notify{
|
||||
NetMap: &netmap.NetworkMap{
|
||||
SelfNode: (&tailcfg.Node{
|
||||
StableID: "TESTCTRL00000001",
|
||||
Name: "updated.test.ts.net",
|
||||
Addresses: []netip.Prefix{netip.MustParsePrefix("100.64.0.250/32")},
|
||||
}).View(),
|
||||
},
|
||||
},
|
||||
expected: map[ipn.StateKey][]byte{
|
||||
keyDeviceID: []byte("TESTCTRL00000001"),
|
||||
keyDeviceFQDN: []byte("updated.test.ts.net"),
|
||||
keyDeviceIPs: []byte(`["100.64.0.250"]`),
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Send test input.
|
||||
select {
|
||||
case notifyCh <- tc.notify:
|
||||
case <-errs:
|
||||
t.Fatal("keepStateKeysUpdated returned before test input")
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timed out waiting for next() to be called again")
|
||||
}
|
||||
|
||||
// Wait for next() to be called again so we know the goroutine has
|
||||
// processed the event.
|
||||
select {
|
||||
case <-nextWaiting:
|
||||
case <-errs:
|
||||
t.Fatal("keepStateKeysUpdated returned before test input")
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timed out waiting for next() to be called again")
|
||||
}
|
||||
|
||||
for key, value := range tc.expected {
|
||||
got, _ := store.ReadState(key)
|
||||
if !bytes.Equal(got, value) {
|
||||
t.Errorf("state key %q mismatch: expected %q, got %q", key, value, got)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue