From 00517c8189569171560c073cd983164ff7735e69 Mon Sep 17 00:00:00 2001 From: Irbe Krumina Date: Tue, 19 Nov 2024 13:07:19 +0000 Subject: [PATCH] kube/{kubeapi,kubeclient},ipn/store/kubestore,cmd/{containerboot,k8s-operator}: emit kube store Events (#14112) Adds functionality to kube client to emit Events. Updates kube store to emit Events when tailscaled state has been loaded, updated or if any errors where encountered during those operations. This should help in cases where an error related to state loading/updating caused the Pod to crash in a loop- unlike logs of the originally failed container instance, Events associated with the Pod will still be accessible even after N restarts. Updates tailscale/tailscale#14080 Signed-off-by: Irbe Krumina --- cmd/containerboot/kube.go | 4 +- cmd/containerboot/services.go | 2 +- .../deploy/chart/templates/proxy-rbac.yaml | 3 + .../deploy/manifests/operator.yaml | 8 + cmd/k8s-operator/deploy/manifests/proxy.yaml | 8 + .../deploy/manifests/userspace-proxy.yaml | 8 + cmd/k8s-operator/proxygroup_specs.go | 24 +- cmd/k8s-operator/testutils_test.go | 4 + ipn/store/kubestore/store_kube.go | 44 ++- kube/kubeapi/api.go | 57 +++- kube/kubeclient/client.go | 289 +++++++++++++----- kube/kubeclient/client_test.go | 151 +++++++++ kube/kubeclient/fake_client.go | 6 +- 13 files changed, 506 insertions(+), 102 deletions(-) create mode 100644 kube/kubeclient/client_test.go diff --git a/cmd/containerboot/kube.go b/cmd/containerboot/kube.go index 908cc01ef..5a726c20b 100644 --- a/cmd/containerboot/kube.go +++ b/cmd/containerboot/kube.go @@ -61,7 +61,7 @@ func deleteAuthKey(ctx context.Context, secretName string) error { Path: "/data/authkey", }, } - if err := kc.JSONPatchSecret(ctx, secretName, m); err != nil { + if err := kc.JSONPatchResource(ctx, secretName, kubeclient.TypeSecrets, m); err != nil { if s, ok := err.(*kubeapi.Status); ok && s.Code == http.StatusUnprocessableEntity { // This is kubernetes-ese for "the field you asked to // delete already doesn't exist", aka no-op. @@ -81,7 +81,7 @@ func initKubeClient(root string) { kubeclient.SetRootPathForTesting(root) } var err error - kc, err = kubeclient.New() + kc, err = kubeclient.New("tailscale-container") if err != nil { log.Fatalf("Error creating kube client: %v", err) } diff --git a/cmd/containerboot/services.go b/cmd/containerboot/services.go index 4da7286b7..aed00250d 100644 --- a/cmd/containerboot/services.go +++ b/cmd/containerboot/services.go @@ -389,7 +389,7 @@ func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Sta Path: fmt.Sprintf("/data/%s", egressservices.KeyEgressServices), Value: bs, } - if err := ep.kc.JSONPatchSecret(ctx, ep.stateSecret, []kubeclient.JSONPatch{patch}); err != nil { + if err := ep.kc.JSONPatchResource(ctx, ep.stateSecret, kubeclient.TypeSecrets, []kubeclient.JSONPatch{patch}); err != nil { return fmt.Errorf("error patching state Secret: %w", err) } ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice() diff --git a/cmd/k8s-operator/deploy/chart/templates/proxy-rbac.yaml b/cmd/k8s-operator/deploy/chart/templates/proxy-rbac.yaml index 1c15c9119..fa552a7c7 100644 --- a/cmd/k8s-operator/deploy/chart/templates/proxy-rbac.yaml +++ b/cmd/k8s-operator/deploy/chart/templates/proxy-rbac.yaml @@ -16,6 +16,9 @@ rules: - apiGroups: [""] resources: ["secrets"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] +- apiGroups: [""] + resources: ["events"] + verbs: ["create", "patch", "get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/cmd/k8s-operator/deploy/manifests/operator.yaml b/cmd/k8s-operator/deploy/manifests/operator.yaml index 9d8e9faf6..c6d7deef5 100644 --- a/cmd/k8s-operator/deploy/manifests/operator.yaml +++ b/cmd/k8s-operator/deploy/manifests/operator.yaml @@ -4703,6 +4703,14 @@ rules: - patch - update - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create + - patch + - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/cmd/k8s-operator/deploy/manifests/proxy.yaml b/cmd/k8s-operator/deploy/manifests/proxy.yaml index a79d48d73..1ad63c265 100644 --- a/cmd/k8s-operator/deploy/manifests/proxy.yaml +++ b/cmd/k8s-operator/deploy/manifests/proxy.yaml @@ -30,6 +30,14 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid securityContext: capabilities: add: diff --git a/cmd/k8s-operator/deploy/manifests/userspace-proxy.yaml b/cmd/k8s-operator/deploy/manifests/userspace-proxy.yaml index 46b49a57b..6617f6d4b 100644 --- a/cmd/k8s-operator/deploy/manifests/userspace-proxy.yaml +++ b/cmd/k8s-operator/deploy/manifests/userspace-proxy.yaml @@ -24,3 +24,11 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid diff --git a/cmd/k8s-operator/proxygroup_specs.go b/cmd/k8s-operator/proxygroup_specs.go index 27fd9ef71..b47cb39b1 100644 --- a/cmd/k8s-operator/proxygroup_specs.go +++ b/cmd/k8s-operator/proxygroup_specs.go @@ -126,15 +126,6 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode, cfgHa }, }, }, - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - // Secret is named after the pod. - FieldPath: "metadata.name", - }, - }, - }, { Name: "TS_KUBE_SECRET", Value: "$(POD_NAME)", @@ -147,10 +138,6 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode, cfgHa Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR", Value: "/etc/tsconfig/$(POD_NAME)", }, - { - Name: "TS_USERSPACE", - Value: "false", - }, { Name: "TS_INTERNAL_APP", Value: kubetypes.AppProxyGroupEgress, @@ -171,7 +158,7 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode, cfgHa }) } - return envs + return append(c.Env, envs...) }() return ss, nil @@ -215,6 +202,15 @@ func pgRole(pg *tsapi.ProxyGroup, namespace string) *rbacv1.Role { return secrets }(), }, + { + APIGroups: []string{""}, + Resources: []string{"events"}, + Verbs: []string{ + "create", + "patch", + "get", + }, + }, }, } } diff --git a/cmd/k8s-operator/testutils_test.go b/cmd/k8s-operator/testutils_test.go index d42f1b7af..084f573e5 100644 --- a/cmd/k8s-operator/testutils_test.go +++ b/cmd/k8s-operator/testutils_test.go @@ -70,6 +70,8 @@ func expectedSTS(t *testing.T, cl client.Client, opts configOpts) *appsv1.Statef Env: []corev1.EnvVar{ {Name: "TS_USERSPACE", Value: "false"}, {Name: "POD_IP", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "", FieldPath: "status.podIP"}, ResourceFieldRef: nil, ConfigMapKeyRef: nil, SecretKeyRef: nil}}, + {Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "", FieldPath: "metadata.name"}, ResourceFieldRef: nil, ConfigMapKeyRef: nil, SecretKeyRef: nil}}, + {Name: "POD_UID", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "", FieldPath: "metadata.uid"}, ResourceFieldRef: nil, ConfigMapKeyRef: nil, SecretKeyRef: nil}}, {Name: "TS_KUBE_SECRET", Value: opts.secretName}, {Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR", Value: "/etc/tsconfig"}, }, @@ -228,6 +230,8 @@ func expectedSTSUserspace(t *testing.T, cl client.Client, opts configOpts) *apps Env: []corev1.EnvVar{ {Name: "TS_USERSPACE", Value: "true"}, {Name: "POD_IP", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "", FieldPath: "status.podIP"}, ResourceFieldRef: nil, ConfigMapKeyRef: nil, SecretKeyRef: nil}}, + {Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "", FieldPath: "metadata.name"}, ResourceFieldRef: nil, ConfigMapKeyRef: nil, SecretKeyRef: nil}}, + {Name: "POD_UID", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "", FieldPath: "metadata.uid"}, ResourceFieldRef: nil, ConfigMapKeyRef: nil, SecretKeyRef: nil}}, {Name: "TS_KUBE_SECRET", Value: opts.secretName}, {Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR", Value: "/etc/tsconfig"}, {Name: "TS_SERVE_CONFIG", Value: "/etc/tailscaled/serve-config"}, diff --git a/ipn/store/kubestore/store_kube.go b/ipn/store/kubestore/store_kube.go index 2dcc08b6e..462e6d434 100644 --- a/ipn/store/kubestore/store_kube.go +++ b/ipn/store/kubestore/store_kube.go @@ -7,6 +7,7 @@ package kubestore import ( "context" "fmt" + "log" "net" "os" "strings" @@ -19,8 +20,18 @@ import ( "tailscale.com/types/logger" ) -// TODO(irbekrm): should we bump this? should we have retries? See tailscale/tailscale#13024 -const timeout = 5 * time.Second +const ( + // timeout is the timeout for a single state update that includes calls to the API server to write or read a + // state Secret and emit an Event. + timeout = 30 * time.Second + + reasonTailscaleStateUpdated = "TailscaledStateUpdated" + reasonTailscaleStateLoaded = "TailscaleStateLoaded" + reasonTailscaleStateUpdateFailed = "TailscaleStateUpdateFailed" + reasonTailscaleStateLoadFailed = "TailscaleStateLoadFailed" + eventTypeWarning = "Warning" + eventTypeNormal = "Normal" +) // Store is an ipn.StateStore that uses a Kubernetes Secret for persistence. type Store struct { @@ -35,7 +46,7 @@ type Store struct { // New returns a new Store that persists to the named Secret. func New(_ logger.Logf, secretName string) (*Store, error) { - c, err := kubeclient.New() + c, err := kubeclient.New("tailscale-state-store") if err != nil { return nil, err } @@ -72,13 +83,22 @@ func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) { // WriteState implements the StateStore interface. func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer func() { if err == nil { s.memory.WriteState(ipn.StateKey(sanitizeKey(id)), bs) } + if err != nil { + if err := s.client.Event(ctx, eventTypeWarning, reasonTailscaleStateUpdateFailed, err.Error()); err != nil { + log.Printf("kubestore: error creating tailscaled state update Event: %v", err) + } + } else { + if err := s.client.Event(ctx, eventTypeNormal, reasonTailscaleStateUpdated, "Successfully updated tailscaled state Secret"); err != nil { + log.Printf("kubestore: error creating tailscaled state Event: %v", err) + } + } + cancel() }() - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() secret, err := s.client.GetSecret(ctx, s.secretName) if err != nil { @@ -107,7 +127,7 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) { Value: map[string][]byte{sanitizeKey(id): bs}, }, } - if err := s.client.JSONPatchSecret(ctx, s.secretName, m); err != nil { + if err := s.client.JSONPatchResource(ctx, s.secretName, kubeclient.TypeSecrets, m); err != nil { return fmt.Errorf("error patching Secret %s with a /data field: %v", s.secretName, err) } return nil @@ -119,8 +139,8 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) { Value: bs, }, } - if err := s.client.JSONPatchSecret(ctx, s.secretName, m); err != nil { - return fmt.Errorf("error patching Secret %s with /data/%s field", s.secretName, sanitizeKey(id)) + if err := s.client.JSONPatchResource(ctx, s.secretName, kubeclient.TypeSecrets, m); err != nil { + return fmt.Errorf("error patching Secret %s with /data/%s field: %v", s.secretName, sanitizeKey(id), err) } return nil } @@ -131,7 +151,7 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) { return err } -func (s *Store) loadState() error { +func (s *Store) loadState() (err error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -140,8 +160,14 @@ func (s *Store) loadState() error { if st, ok := err.(*kubeapi.Status); ok && st.Code == 404 { return ipn.ErrStateNotExist } + if err := s.client.Event(ctx, eventTypeWarning, reasonTailscaleStateLoadFailed, err.Error()); err != nil { + log.Printf("kubestore: error creating Event: %v", err) + } return err } + if err := s.client.Event(ctx, eventTypeNormal, reasonTailscaleStateLoaded, "Successfully loaded tailscaled state from Secret"); err != nil { + log.Printf("kubestore: error creating Event: %v", err) + } s.memory.LoadFromMap(secret.Data) return nil } diff --git a/kube/kubeapi/api.go b/kube/kubeapi/api.go index 0e42437a6..a2ae8cc79 100644 --- a/kube/kubeapi/api.go +++ b/kube/kubeapi/api.go @@ -7,7 +7,9 @@ // dependency size for those consumers when adding anything new here. package kubeapi -import "time" +import ( + "time" +) // Note: The API types are copied from k8s.io/api{,machinery} to not introduce a // module dependency on the Kubernetes API as it pulls in many more dependencies. @@ -151,6 +153,57 @@ type Secret struct { Data map[string][]byte `json:"data,omitempty"` } +// Event contains a subset of fields from corev1.Event. +// https://github.com/kubernetes/api/blob/6cc44b8953ae704d6d9ec2adf32e7ae19199ea9f/core/v1/types.go#L7034 +// It is copied here to avoid having to import kube libraries. +type Event struct { + TypeMeta `json:",inline"` + ObjectMeta `json:"metadata"` + Message string `json:"message,omitempty"` + Reason string `json:"reason,omitempty"` + Source EventSource `json:"source,omitempty"` // who is emitting this Event + Type string `json:"type,omitempty"` // Normal or Warning + // InvolvedObject is the subject of the Event. `kubectl describe` will, for most object types, display any + // currently present cluster Events matching the object (but you probably want to set UID for this to work). + InvolvedObject ObjectReference `json:"involvedObject"` + Count int32 `json:"count,omitempty"` // how many times Event was observed + FirstTimestamp time.Time `json:"firstTimestamp,omitempty"` + LastTimestamp time.Time `json:"lastTimestamp,omitempty"` +} + +// EventSource includes a subset of fields from corev1.EventSource. +// https://github.com/kubernetes/api/blob/6cc44b8953ae704d6d9ec2adf32e7ae19199ea9f/core/v1/types.go#L7007 +// It is copied here to avoid having to import kube libraries. +type EventSource struct { + // Component is the name of the component that is emitting the Event. + Component string `json:"component,omitempty"` +} + +// ObjectReference contains a subset of fields from corev1.ObjectReference. +// https://github.com/kubernetes/api/blob/6cc44b8953ae704d6d9ec2adf32e7ae19199ea9f/core/v1/types.go#L6902 +// It is copied here to avoid having to import kube libraries. +type ObjectReference struct { + // Kind of the referent. + // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + // +optional + Kind string `json:"kind,omitempty"` + // Namespace of the referent. + // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + // +optional + Namespace string `json:"namespace,omitempty"` + // Name of the referent. + // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + // +optional + Name string `json:"name,omitempty"` + // UID of the referent. + // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids + // +optional + UID string `json:"uid,omitempty"` + // API version of the referent. + // +optional + APIVersion string `json:"apiVersion,omitempty"` +} + // Status is a return value for calls that don't return other objects. type Status struct { TypeMeta `json:",inline"` @@ -186,6 +239,6 @@ type Status struct { Code int `json:"code,omitempty"` } -func (s *Status) Error() string { +func (s Status) Error() string { return s.Message } diff --git a/kube/kubeclient/client.go b/kube/kubeclient/client.go index e8ddec75d..d4309448d 100644 --- a/kube/kubeclient/client.go +++ b/kube/kubeclient/client.go @@ -23,16 +23,21 @@ import ( "net/url" "os" "path/filepath" + "strings" "sync" "time" "tailscale.com/kube/kubeapi" + "tailscale.com/tstime" "tailscale.com/util/multierr" ) const ( saPath = "/var/run/secrets/kubernetes.io/serviceaccount" defaultURL = "https://kubernetes.default.svc" + + TypeSecrets = "secrets" + typeEvents = "events" ) // rootPathForTests is set by tests to override the root path to the @@ -57,8 +62,13 @@ type Client interface { GetSecret(context.Context, string) (*kubeapi.Secret, error) UpdateSecret(context.Context, *kubeapi.Secret) error CreateSecret(context.Context, *kubeapi.Secret) error + // Event attempts to ensure an event with the specified options associated with the Pod in which we are + // currently running. This is best effort - if the client is not able to create events, this operation will be a + // no-op. If there is already an Event with the given reason for the current Pod, it will get updated (only + // count and timestamp are expected to change), else a new event will be created. + Event(_ context.Context, typ, reason, msg string) error StrategicMergePatchSecret(context.Context, string, *kubeapi.Secret, string) error - JSONPatchSecret(context.Context, string, []JSONPatch) error + JSONPatchResource(_ context.Context, resourceName string, resourceType string, patches []JSONPatch) error CheckSecretPermissions(context.Context, string) (bool, bool, error) SetDialer(dialer func(context.Context, string, string) (net.Conn, error)) SetURL(string) @@ -66,15 +76,24 @@ type Client interface { type client struct { mu sync.Mutex + name string url string - ns string + podName string + podUID string + ns string // Pod namespace client *http.Client token string tokenExpiry time.Time + cl tstime.Clock + // hasEventsPerms is true if client can emit Events for the Pod in which it runs. If it is set to false any + // calls to Events() will be a no-op. + hasEventsPerms bool + // kubeAPIRequest sends a request to the kube API server. It can set to a fake in tests. + kubeAPIRequest kubeAPIRequestFunc } // New returns a new client -func New() (Client, error) { +func New(name string) (Client, error) { ns, err := readFile("namespace") if err != nil { return nil, err @@ -87,9 +106,11 @@ func New() (Client, error) { if ok := cp.AppendCertsFromPEM(caCert); !ok { return nil, fmt.Errorf("kube: error in creating root cert pool") } - return &client{ - url: defaultURL, - ns: string(ns), + c := &client{ + url: defaultURL, + ns: string(ns), + name: name, + cl: tstime.DefaultClock{}, client: &http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{ @@ -97,7 +118,10 @@ func New() (Client, error) { }, }, }, - }, nil + } + c.kubeAPIRequest = newKubeAPIRequest(c) + c.setEventPerms() + return c, nil } // SetURL sets the URL to use for the Kubernetes API. @@ -115,14 +139,14 @@ func (c *client) SetDialer(dialer func(ctx context.Context, network, addr string func (c *client) expireToken() { c.mu.Lock() defer c.mu.Unlock() - c.tokenExpiry = time.Now() + c.tokenExpiry = c.cl.Now() } func (c *client) getOrRenewToken() (string, error) { c.mu.Lock() defer c.mu.Unlock() tk, te := c.token, c.tokenExpiry - if time.Now().Before(te) { + if c.cl.Now().Before(te) { return tk, nil } @@ -131,17 +155,10 @@ func (c *client) getOrRenewToken() (string, error) { return "", err } c.token = string(tkb) - c.tokenExpiry = time.Now().Add(30 * time.Minute) + c.tokenExpiry = c.cl.Now().Add(30 * time.Minute) return c.token, nil } -func (c *client) secretURL(name string) string { - if name == "" { - return fmt.Sprintf("%s/api/v1/namespaces/%s/secrets", c.url, c.ns) - } - return fmt.Sprintf("%s/api/v1/namespaces/%s/secrets/%s", c.url, c.ns, name) -} - func getError(resp *http.Response) error { if resp.StatusCode == 200 || resp.StatusCode == 201 { // These are the only success codes returned by the Kubernetes API. @@ -161,36 +178,41 @@ func setHeader(key, value string) func(*http.Request) { } } -// doRequest performs an HTTP request to the Kubernetes API. -// If in is not nil, it is expected to be a JSON-encodable object and will be -// sent as the request body. -// If out is not nil, it is expected to be a pointer to an object that can be -// decoded from JSON. -// If the request fails with a 401, the token is expired and a new one is -// requested. -func (c *client) doRequest(ctx context.Context, method, url string, in, out any, opts ...func(*http.Request)) error { - req, err := c.newRequest(ctx, method, url, in) - if err != nil { - return err - } - for _, opt := range opts { - opt(req) - } - resp, err := c.client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - if err := getError(resp); err != nil { - if st, ok := err.(*kubeapi.Status); ok && st.Code == 401 { - c.expireToken() +type kubeAPIRequestFunc func(ctx context.Context, method, url string, in, out any, opts ...func(*http.Request)) error + +// newKubeAPIRequest returns a function that can perform an HTTP request to the Kubernetes API. +func newKubeAPIRequest(c *client) kubeAPIRequestFunc { + // If in is not nil, it is expected to be a JSON-encodable object and will be + // sent as the request body. + // If out is not nil, it is expected to be a pointer to an object that can be + // decoded from JSON. + // If the request fails with a 401, the token is expired and a new one is + // requested. + f := func(ctx context.Context, method, url string, in, out any, opts ...func(*http.Request)) error { + req, err := c.newRequest(ctx, method, url, in) + if err != nil { + return err } - return err - } - if out != nil { - return json.NewDecoder(resp.Body).Decode(out) + for _, opt := range opts { + opt(req) + } + resp, err := c.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if err := getError(resp); err != nil { + if st, ok := err.(*kubeapi.Status); ok && st.Code == 401 { + c.expireToken() + } + return err + } + if out != nil { + return json.NewDecoder(resp.Body).Decode(out) + } + return nil } - return nil + return f } func (c *client) newRequest(ctx context.Context, method, url string, in any) (*http.Request, error) { @@ -226,7 +248,7 @@ func (c *client) newRequest(ctx context.Context, method, url string, in any) (*h // GetSecret fetches the secret from the Kubernetes API. func (c *client) GetSecret(ctx context.Context, name string) (*kubeapi.Secret, error) { s := &kubeapi.Secret{Data: make(map[string][]byte)} - if err := c.doRequest(ctx, "GET", c.secretURL(name), nil, s); err != nil { + if err := c.kubeAPIRequest(ctx, "GET", c.resourceURL(name, TypeSecrets), nil, s); err != nil { return nil, err } return s, nil @@ -235,16 +257,16 @@ func (c *client) GetSecret(ctx context.Context, name string) (*kubeapi.Secret, e // CreateSecret creates a secret in the Kubernetes API. func (c *client) CreateSecret(ctx context.Context, s *kubeapi.Secret) error { s.Namespace = c.ns - return c.doRequest(ctx, "POST", c.secretURL(""), s, nil) + return c.kubeAPIRequest(ctx, "POST", c.resourceURL("", TypeSecrets), s, nil) } // UpdateSecret updates a secret in the Kubernetes API. func (c *client) UpdateSecret(ctx context.Context, s *kubeapi.Secret) error { - return c.doRequest(ctx, "PUT", c.secretURL(s.Name), s, nil) + return c.kubeAPIRequest(ctx, "PUT", c.resourceURL(s.Name, TypeSecrets), s, nil) } // JSONPatch is a JSON patch operation. -// It currently (2023-03-02) only supports "add" and "remove" operations. +// It currently (2024-11-15) only supports "add", "remove" and "replace" operations. // // https://tools.ietf.org/html/rfc6902 type JSONPatch struct { @@ -253,22 +275,22 @@ type JSONPatch struct { Value any `json:"value,omitempty"` } -// JSONPatchSecret updates a secret in the Kubernetes API using a JSON patch. -// It currently (2023-03-02) only supports "add" and "remove" operations. -func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error { - for _, p := range patch { +// JSONPatchResource updates a resource in the Kubernetes API using a JSON patch. +// It currently (2024-11-15) only supports "add", "remove" and "replace" operations. +func (c *client) JSONPatchResource(ctx context.Context, name, typ string, patches []JSONPatch) error { + for _, p := range patches { if p.Op != "remove" && p.Op != "add" && p.Op != "replace" { return fmt.Errorf("unsupported JSON patch operation: %q", p.Op) } } - return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json")) + return c.kubeAPIRequest(ctx, "PATCH", c.resourceURL(name, typ), patches, nil, setHeader("Content-Type", "application/json-patch+json")) } // StrategicMergePatchSecret updates a secret in the Kubernetes API using a // strategic merge patch. // If a fieldManager is provided, it will be used to track the patch. func (c *client) StrategicMergePatchSecret(ctx context.Context, name string, s *kubeapi.Secret, fieldManager string) error { - surl := c.secretURL(name) + surl := c.resourceURL(name, TypeSecrets) if fieldManager != "" { uv := url.Values{ "fieldManager": {fieldManager}, @@ -277,7 +299,66 @@ func (c *client) StrategicMergePatchSecret(ctx context.Context, name string, s * } s.Namespace = c.ns s.Name = name - return c.doRequest(ctx, "PATCH", surl, s, nil, setHeader("Content-Type", "application/strategic-merge-patch+json")) + return c.kubeAPIRequest(ctx, "PATCH", surl, s, nil, setHeader("Content-Type", "application/strategic-merge-patch+json")) +} + +// Event tries to ensure an Event associated with the Pod in which we are running. It is best effort - the event will be +// created if the kube client on startup was able to determine the name and UID of this Pod from POD_NAME,POD_UID env +// vars and if permissions check for event creation succeeded. Events are keyed on opts.Reason- if an Event for the +// current Pod with that reason already exists, its count and first timestamp will be updated, else a new Event will be +// created. +func (c *client) Event(ctx context.Context, typ, reason, msg string) error { + if !c.hasEventsPerms { + return nil + } + name := c.nameForEvent(reason) + ev, err := c.getEvent(ctx, name) + now := c.cl.Now() + if err != nil { + if !IsNotFoundErr(err) { + return err + } + // Event not found - create it + ev := kubeapi.Event{ + ObjectMeta: kubeapi.ObjectMeta{ + Name: name, + Namespace: c.ns, + }, + Type: typ, + Reason: reason, + Message: msg, + Source: kubeapi.EventSource{ + Component: c.name, + }, + InvolvedObject: kubeapi.ObjectReference{ + Name: c.podName, + Namespace: c.ns, + UID: c.podUID, + Kind: "Pod", + APIVersion: "v1", + }, + + FirstTimestamp: now, + LastTimestamp: now, + Count: 1, + } + return c.kubeAPIRequest(ctx, "POST", c.resourceURL("", typeEvents), &ev, nil) + } + // If the Event already exists, we patch its count and last timestamp. This ensures that when users run 'kubectl + // describe pod...', they see the event just once (but with a message of how many times it has appeared over + // last timestamp - first timestamp period of time). + count := ev.Count + 1 + countPatch := JSONPatch{ + Op: "replace", + Value: count, + Path: "/count", + } + tsPatch := JSONPatch{ + Op: "replace", + Value: now, + Path: "/lastTimestamp", + } + return c.JSONPatchResource(ctx, name, typeEvents, []JSONPatch{countPatch, tsPatch}) } // CheckSecretPermissions checks the secret access permissions of the current @@ -293,7 +374,7 @@ func (c *client) StrategicMergePatchSecret(ctx context.Context, name string, s * func (c *client) CheckSecretPermissions(ctx context.Context, secretName string) (canPatch, canCreate bool, err error) { var errs []error for _, verb := range []string{"get", "update"} { - ok, err := c.checkPermission(ctx, verb, secretName) + ok, err := c.checkPermission(ctx, verb, TypeSecrets, secretName) if err != nil { log.Printf("error checking %s permission on secret %s: %v", verb, secretName, err) } else if !ok { @@ -303,12 +384,12 @@ func (c *client) CheckSecretPermissions(ctx context.Context, secretName string) if len(errs) > 0 { return false, false, multierr.New(errs...) } - canPatch, err = c.checkPermission(ctx, "patch", secretName) + canPatch, err = c.checkPermission(ctx, "patch", TypeSecrets, secretName) if err != nil { log.Printf("error checking patch permission on secret %s: %v", secretName, err) return false, false, nil } - canCreate, err = c.checkPermission(ctx, "create", secretName) + canCreate, err = c.checkPermission(ctx, "create", TypeSecrets, secretName) if err != nil { log.Printf("error checking create permission on secret %s: %v", secretName, err) return false, false, nil @@ -316,19 +397,64 @@ func (c *client) CheckSecretPermissions(ctx context.Context, secretName string) return canPatch, canCreate, nil } -// checkPermission reports whether the current pod has permission to use the -// given verb (e.g. get, update, patch, create) on secretName. -func (c *client) checkPermission(ctx context.Context, verb, secretName string) (bool, error) { +func IsNotFoundErr(err error) bool { + if st, ok := err.(*kubeapi.Status); ok && st.Code == 404 { + return true + } + return false +} + +// setEventPerms checks whether this client will be able to write tailscaled Events to its Pod and updates the state +// accordingly. If it determines that the client can not write Events, any subsequent calls to client.Event will be a +// no-op. +func (c *client) setEventPerms() { + name := os.Getenv("POD_NAME") + uid := os.Getenv("POD_UID") + hasPerms := false + defer func() { + c.podName = name + c.podUID = uid + c.hasEventsPerms = hasPerms + if !hasPerms { + log.Printf(`kubeclient: this client is not able to write tailscaled Events to the Pod in which it is running. + To help with future debugging you can make it able write Events by giving it get,create,patch permissions for Events in the Pod namespace + and setting POD_NAME, POD_UID env vars for the Pod.`) + } + }() + if name == "" || uid == "" { + return + } + for _, verb := range []string{"get", "create", "patch"} { + can, err := c.checkPermission(context.Background(), verb, typeEvents, "") + if err != nil { + log.Printf("kubeclient: error checking Events permissions: %v", err) + return + } + if !can { + return + } + } + hasPerms = true + return +} + +// checkPermission reports whether the current pod has permission to use the given verb (e.g. get, update, patch, +// create) on the given resource type. If name is not an empty string, will check the check will be for resource with +// the given name only. +func (c *client) checkPermission(ctx context.Context, verb, typ, name string) (bool, error) { + ra := map[string]any{ + "namespace": c.ns, + "verb": verb, + "resource": typ, + } + if name != "" { + ra["name"] = name + } sar := map[string]any{ "apiVersion": "authorization.k8s.io/v1", "kind": "SelfSubjectAccessReview", "spec": map[string]any{ - "resourceAttributes": map[string]any{ - "namespace": c.ns, - "verb": verb, - "resource": "secrets", - "name": secretName, - }, + "resourceAttributes": ra, }, } var res struct { @@ -337,15 +463,32 @@ func (c *client) checkPermission(ctx context.Context, verb, secretName string) ( } `json:"status"` } url := c.url + "/apis/authorization.k8s.io/v1/selfsubjectaccessreviews" - if err := c.doRequest(ctx, "POST", url, sar, &res); err != nil { + if err := c.kubeAPIRequest(ctx, "POST", url, sar, &res); err != nil { return false, err } return res.Status.Allowed, nil } -func IsNotFoundErr(err error) bool { - if st, ok := err.(*kubeapi.Status); ok && st.Code == 404 { - return true +// resourceURL returns a URL that can be used to interact with the given resource type and, if name is not empty string, +// the named resource of that type. +// Note that this only works for core/v1 resource types. +func (c *client) resourceURL(name, typ string) string { + if name == "" { + return fmt.Sprintf("%s/api/v1/namespaces/%s/%s", c.url, c.ns, typ) } - return false + return fmt.Sprintf("%s/api/v1/namespaces/%s/%s/%s", c.url, c.ns, typ, name) +} + +// nameForEvent returns a name for the Event that uniquely identifies Event with that reason for the current Pod. +func (c *client) nameForEvent(reason string) string { + return fmt.Sprintf("%s.%s.%s", c.podName, c.podUID, strings.ToLower(reason)) +} + +// getEvent fetches the event from the Kubernetes API. +func (c *client) getEvent(ctx context.Context, name string) (*kubeapi.Event, error) { + e := &kubeapi.Event{} + if err := c.kubeAPIRequest(ctx, "GET", c.resourceURL(name, typeEvents), nil, e); err != nil { + return nil, err + } + return e, nil } diff --git a/kube/kubeclient/client_test.go b/kube/kubeclient/client_test.go new file mode 100644 index 000000000..6b5e8171c --- /dev/null +++ b/kube/kubeclient/client_test.go @@ -0,0 +1,151 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package kubeclient + +import ( + "context" + "encoding/json" + "net/http" + "testing" + + "github.com/google/go-cmp/cmp" + "tailscale.com/kube/kubeapi" + "tailscale.com/tstest" +) + +func Test_client_Event(t *testing.T) { + cl := &tstest.Clock{} + tests := []struct { + name string + typ string + reason string + msg string + argSets []args + wantErr bool + }{ + { + name: "new_event_gets_created", + typ: "Normal", + reason: "TestReason", + msg: "TestMessage", + argSets: []args{ + { // request to GET event returns not found + wantsMethod: "GET", + wantsURL: "test-apiserver/api/v1/namespaces/test-ns/events/test-pod.test-uid.testreason", + setErr: &kubeapi.Status{Code: 404}, + }, + { // sends POST request to create event + wantsMethod: "POST", + wantsURL: "test-apiserver/api/v1/namespaces/test-ns/events", + wantsIn: &kubeapi.Event{ + ObjectMeta: kubeapi.ObjectMeta{ + Name: "test-pod.test-uid.testreason", + Namespace: "test-ns", + }, + Type: "Normal", + Reason: "TestReason", + Message: "TestMessage", + Source: kubeapi.EventSource{ + Component: "test-client", + }, + InvolvedObject: kubeapi.ObjectReference{ + Name: "test-pod", + UID: "test-uid", + Namespace: "test-ns", + APIVersion: "v1", + Kind: "Pod", + }, + FirstTimestamp: cl.Now(), + LastTimestamp: cl.Now(), + Count: 1, + }, + }, + }, + }, + { + name: "existing_event_gets_patched", + typ: "Warning", + reason: "TestReason", + msg: "TestMsg", + argSets: []args{ + { // request to GET event does not error - this is enough to assume that event exists + wantsMethod: "GET", + wantsURL: "test-apiserver/api/v1/namespaces/test-ns/events/test-pod.test-uid.testreason", + setOut: []byte(`{"count":2}`), + }, + { // sends PATCH request to update the event + wantsMethod: "PATCH", + wantsURL: "test-apiserver/api/v1/namespaces/test-ns/events/test-pod.test-uid.testreason", + wantsIn: []JSONPatch{ + {Op: "replace", Path: "/count", Value: int32(3)}, + {Op: "replace", Path: "/lastTimestamp", Value: cl.Now()}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &client{ + cl: cl, + name: "test-client", + podName: "test-pod", + podUID: "test-uid", + url: "test-apiserver", + ns: "test-ns", + kubeAPIRequest: fakeKubeAPIRequest(t, tt.argSets), + hasEventsPerms: true, + } + if err := c.Event(context.Background(), tt.typ, tt.reason, tt.msg); (err != nil) != tt.wantErr { + t.Errorf("client.Event() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +// args is a set of values for testing a single call to client.kubeAPIRequest. +type args struct { + // wantsMethod is the expected value of 'method' arg. + wantsMethod string + // wantsURL is the expected value of 'url' arg. + wantsURL string + // wantsIn is the expected value of 'in' arg. + wantsIn any + // setOut can be set to a byte slice representing valid JSON. If set 'out' arg will get set to the unmarshalled + // JSON object. + setOut []byte + // setErr is the error that kubeAPIRequest will return. + setErr error +} + +// fakeKubeAPIRequest can be used to test that a series of calls to client.kubeAPIRequest gets called with expected +// values and to set these calls to return preconfigured values. 'argSets' should be set to a slice of expected +// arguments and should-be return values of a series of kubeAPIRequest calls. +func fakeKubeAPIRequest(t *testing.T, argSets []args) kubeAPIRequestFunc { + count := 0 + f := func(ctx context.Context, gotMethod, gotUrl string, gotIn, gotOut any, opts ...func(*http.Request)) error { + t.Helper() + if count >= len(argSets) { + t.Fatalf("unexpected call to client.kubeAPIRequest, expected %d calls, but got a %dth call", len(argSets), count+1) + } + a := argSets[count] + if gotMethod != a.wantsMethod { + t.Errorf("[%d] got method %q, wants method %q", count, gotMethod, a.wantsMethod) + } + if gotUrl != a.wantsURL { + t.Errorf("[%d] got URL %q, wants URL %q", count, gotMethod, a.wantsMethod) + } + if d := cmp.Diff(gotIn, a.wantsIn); d != "" { + t.Errorf("[%d] unexpected payload (-want + got):\n%s", count, d) + } + if len(a.setOut) != 0 { + if err := json.Unmarshal(a.setOut, gotOut); err != nil { + t.Fatalf("[%d] error unmarshalling output: %v", count, err) + } + } + count++ + return a.setErr + } + return f +} diff --git a/kube/kubeclient/fake_client.go b/kube/kubeclient/fake_client.go index 3cef3d27e..5716ca31b 100644 --- a/kube/kubeclient/fake_client.go +++ b/kube/kubeclient/fake_client.go @@ -29,7 +29,11 @@ func (fc *FakeClient) SetDialer(dialer func(ctx context.Context, network, addr s func (fc *FakeClient) StrategicMergePatchSecret(context.Context, string, *kubeapi.Secret, string) error { return nil } -func (fc *FakeClient) JSONPatchSecret(context.Context, string, []JSONPatch) error { +func (fc *FakeClient) Event(context.Context, string, string, string) error { + return nil +} + +func (fc *FakeClient) JSONPatchResource(context.Context, string, string, []JSONPatch) error { return nil } func (fc *FakeClient) UpdateSecret(context.Context, *kubeapi.Secret) error { return nil }