From e1530cdfccee25973cdd03c08917e143fe42282f Mon Sep 17 00:00:00 2001 From: Maisem Ali Date: Thu, 2 Mar 2023 08:10:36 -0800 Subject: [PATCH] cmd/containerboot,kube: consolidate the two kube clients We had two implemenetations of the kube client, merge them. containerboot was also using a raw http.Transport, this also has the side effect of making it use a http.Client Signed-off-by: Maisem Ali --- cmd/containerboot/kube.go | 246 +++++---------------------------- cmd/containerboot/main.go | 2 +- cmd/containerboot/main_test.go | 8 +- kube/client.go | 179 +++++++++++++++++++++--- 4 files changed, 200 insertions(+), 235 deletions(-) diff --git a/cmd/containerboot/kube.go b/cmd/containerboot/kube.go index 909a1e393..ab097a7c5 100644 --- a/cmd/containerboot/kube.go +++ b/cmd/containerboot/kube.go @@ -6,138 +6,28 @@ package main import ( - "bytes" "context" - "crypto/tls" - "crypto/x509" - "encoding/base64" - "encoding/json" "fmt" - "io" "log" "net/http" "os" - "path/filepath" - "strings" - "time" + "tailscale.com/kube" "tailscale.com/tailcfg" - "tailscale.com/util/multierr" ) -// checkSecretPermissions checks the secret access permissions of the current -// pod. It returns an error if the basic permissions tailscale needs are -// missing, and reports whether the patch permission is additionally present. -// -// Errors encountered during the access checking process are logged, but ignored -// so that the pod tries to fail alive if the permissions exist and there's just -// something wrong with SelfSubjectAccessReviews. There shouldn't be, pods -// should always be able to use SSARs to assess their own permissions, but since -// we didn't use to check permissions this way we'll be cautious in case some -// old version of k8s deviates from the current behavior. -func checkSecretPermissions(ctx context.Context, secretName string) (canPatch bool, err error) { - var errs []error - for _, verb := range []string{"get", "update"} { - ok, err := checkPermission(ctx, verb, secretName) - if err != nil { - log.Printf("error checking %s permission on secret %s: %v", verb, secretName, err) - } else if !ok { - errs = append(errs, fmt.Errorf("missing %s permission on secret %q", verb, secretName)) - } - } - if len(errs) > 0 { - return false, multierr.New(errs...) - } - ok, err := checkPermission(ctx, "patch", secretName) - if err != nil { - log.Printf("error checking patch permission on secret %s: %v", secretName, err) - return false, nil - } - return ok, nil -} - -// checkPermission reports whether the current pod has permission to use the -// given verb (e.g. get, update, patch) on secretName. -func checkPermission(ctx context.Context, verb, secretName string) (bool, error) { - sar := map[string]any{ - "apiVersion": "authorization.k8s.io/v1", - "kind": "SelfSubjectAccessReview", - "spec": map[string]any{ - "resourceAttributes": map[string]any{ - "namespace": kubeNamespace, - "verb": verb, - "resource": "secrets", - "name": secretName, - }, - }, - } - bs, err := json.Marshal(sar) - if err != nil { - return false, err - } - req, err := http.NewRequest("POST", "/apis/authorization.k8s.io/v1/selfsubjectaccessreviews", bytes.NewReader(bs)) - if err != nil { - return false, err - } - resp, err := doKubeRequest(ctx, req) - if err != nil { - return false, err - } - defer resp.Body.Close() - bs, err = io.ReadAll(resp.Body) - if err != nil { - return false, err - } - var res struct { - Status struct { - Allowed bool `json:"allowed"` - } `json:"status"` - } - if err := json.Unmarshal(bs, &res); err != nil { - return false, err - } - return res.Status.Allowed, nil -} - // findKeyInKubeSecret inspects the kube secret secretName for a data // field called "authkey", and returns its value if present. func findKeyInKubeSecret(ctx context.Context, secretName string) (string, error) { - req, err := http.NewRequest("GET", fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s", kubeNamespace, secretName), nil) - if err != nil { - return "", err - } - resp, err := doKubeRequest(ctx, req) - if err != nil { - if resp != nil && resp.StatusCode == http.StatusNotFound { - // Kube secret doesn't exist yet, can't have an authkey. - return "", nil - } - return "", err - } - defer resp.Body.Close() - - bs, err := io.ReadAll(resp.Body) + s, err := kc.GetSecret(ctx, secretName) if err != nil { return "", err } - - // We use a map[string]any here rather than import corev1.Secret, - // because we only do very limited things to the secret, and - // importing corev1 adds 12MiB to the compiled binary. - var s map[string]any - if err := json.Unmarshal(bs, &s); err != nil { - return "", err + ak, ok := s.Data["authkey"] + if !ok { + return "", nil } - if d, ok := s["data"].(map[string]any); ok { - if v, ok := d["authkey"].(string); ok { - bs, err := base64.StdEncoding.DecodeString(v) - if err != nil { - return "", err - } - return string(bs), nil - } - } - return "", nil + return string(ak), nil } // storeDeviceInfo writes deviceID into the "device_id" data field of the kube @@ -145,65 +35,38 @@ func findKeyInKubeSecret(ctx context.Context, secretName string) (string, error) func storeDeviceInfo(ctx context.Context, secretName string, deviceID tailcfg.StableNodeID, fqdn string) error { // First check if the secret exists at all. Even if running on // kubernetes, we do not necessarily store state in a k8s secret. - req, err := http.NewRequest("GET", fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s", kubeNamespace, secretName), nil) - if err != nil { - return err - } - resp, err := doKubeRequest(ctx, req) - if err != nil { - if resp != nil && resp.StatusCode >= 400 && resp.StatusCode <= 499 { - // Assume the secret doesn't exist, or we don't have - // permission to access it. - return nil + if _, err := kc.GetSecret(ctx, secretName); err != nil { + if s, ok := err.(*kube.Status); ok { + if s.Code >= 400 && s.Code <= 499 { + // Assume the secret doesn't exist, or we don't have + // permission to access it. + return nil + } } return err } - m := map[string]map[string]string{ - "stringData": { - "device_id": string(deviceID), - "device_fqdn": fqdn, + m := &kube.Secret{ + Data: map[string][]byte{ + "device_id": []byte(deviceID), + "device_fqdn": []byte(fqdn), }, } - var b bytes.Buffer - if err := json.NewEncoder(&b).Encode(m); err != nil { - return err - } - req, err = http.NewRequest("PATCH", fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=tailscale-container", kubeNamespace, secretName), &b) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/strategic-merge-patch+json") - if _, err := doKubeRequest(ctx, req); err != nil { - return err - } - return nil + return kc.StrategicMergePatchSecret(ctx, secretName, m, "tailscale-container") } // deleteAuthKey deletes the 'authkey' field of the given kube // secret. No-op if there is no authkey in the secret. func deleteAuthKey(ctx context.Context, secretName string) error { // m is a JSON Patch data structure, see https://jsonpatch.com/ or RFC 6902. - m := []struct { - Op string `json:"op"` - Path string `json:"path"` - }{ + m := []kube.JSONPatch{ { Op: "remove", Path: "/data/authkey", }, } - var b bytes.Buffer - if err := json.NewEncoder(&b).Encode(m); err != nil { - return err - } - req, err := http.NewRequest("PATCH", fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=tailscale-container", kubeNamespace, secretName), &b) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json-patch+json") - if resp, err := doKubeRequest(ctx, req); err != nil { - if resp != nil && resp.StatusCode == http.StatusUnprocessableEntity { + if err := kc.JSONPatchSecret(ctx, secretName, m); err != nil { + if s, ok := err.(*kube.Status); ok && s.Code == http.StatusUnprocessableEntity { // This is kubernetes-ese for "the field you asked to // delete already doesn't exist", aka no-op. return nil @@ -213,65 +76,22 @@ func deleteAuthKey(ctx context.Context, secretName string) error { return nil } -var ( - kubeHost string - kubeNamespace string - kubeToken string - kubeHTTP *http.Transport -) +var kc *kube.Client func initKube(root string) { - // If running in Kubernetes, set things up so that doKubeRequest - // can talk successfully to the kube apiserver. - if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { - return + if root != "/" { + // If we are running in a test, we need to set the root path to the fake + // service account directory. + kube.SetRootPathForTesting(root) } - - kubeHost = os.Getenv("KUBERNETES_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS") - - bs, err := os.ReadFile(filepath.Join(root, "var/run/secrets/kubernetes.io/serviceaccount/namespace")) - if err != nil { - log.Fatalf("Error reading kube namespace: %v", err) - } - kubeNamespace = strings.TrimSpace(string(bs)) - - bs, err = os.ReadFile(filepath.Join(root, "var/run/secrets/kubernetes.io/serviceaccount/token")) - if err != nil { - log.Fatalf("Error reading kube token: %v", err) - } - kubeToken = strings.TrimSpace(string(bs)) - - bs, err = os.ReadFile(filepath.Join(root, "var/run/secrets/kubernetes.io/serviceaccount/ca.crt")) - if err != nil { - log.Fatalf("Error reading kube CA cert: %v", err) - } - cp := x509.NewCertPool() - cp.AppendCertsFromPEM(bs) - kubeHTTP = &http.Transport{ - TLSClientConfig: &tls.Config{ - RootCAs: cp, - }, - IdleConnTimeout: time.Second, - } -} - -// doKubeRequest sends r to the kube apiserver. -func doKubeRequest(ctx context.Context, r *http.Request) (*http.Response, error) { - if kubeHTTP == nil { - panic("not in kubernetes") - } - - r.URL.Scheme = "https" - r.URL.Host = kubeHost - r.Header.Set("Authorization", "Bearer "+kubeToken) - r.Header.Set("Accept", "application/json") - - resp, err := kubeHTTP.RoundTrip(r) + var err error + kc, err = kube.New() if err != nil { - return nil, err + log.Fatalf("Error creating kube client: %v", err) } - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return resp, fmt.Errorf("got non-200/201 status code %d", resp.StatusCode) + if root != "/" { + // If we are running in a test, we need to set the URL to the + // httptest server. + kc.SetURL(fmt.Sprintf("https://%s:%s", os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS"))) } - return resp, nil } diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index dc382fa25..314251e39 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -123,7 +123,7 @@ func main() { defer cancel() if cfg.InKubernetes && cfg.KubeSecret != "" { - canPatch, err := checkSecretPermissions(ctx, cfg.KubeSecret) + canPatch, err := kc.CheckSecretPermissions(ctx, cfg.KubeSecret) if err != nil { log.Fatalf("Some Kubernetes permissions are missing, please check your RBAC configuration: %v", err) } diff --git a/cmd/containerboot/main_test.go b/cmd/containerboot/main_test.go index 8f4c22302..704553a2a 100644 --- a/cmd/containerboot/main_test.go +++ b/cmd/containerboot/main_test.go @@ -607,7 +607,7 @@ func TestContainerBoot(t *testing.T) { }() var wantCmds []string - for _, p := range test.Phases { + for i, p := range test.Phases { lapi.Notify(p.Notify) wantCmds = append(wantCmds, p.WantCmds...) waitArgs(t, 2*time.Second, d, argFile, strings.Join(wantCmds, "\n")) @@ -626,7 +626,7 @@ func TestContainerBoot(t *testing.T) { return nil }) if err != nil { - t.Fatal(err) + t.Fatalf("phase %d: %v", i, err) } err = tstest.WaitFor(2*time.Second, func() error { for path, want := range p.WantFiles { @@ -983,13 +983,13 @@ func (k *kubeServer) serveSecret(w http.ResponseWriter, r *http.Request) { } case "application/strategic-merge-patch+json": req := struct { - Data map[string]string `json:"stringData"` + Data map[string][]byte `json:"data"` }{} if err := json.Unmarshal(bs, &req); err != nil { panic(fmt.Sprintf("json decode failed: %v. Body:\n\n%s", err, string(bs))) } for key, val := range req.Data { - k.secret[key] = val + k.secret[key] = string(val) } default: panic(fmt.Sprintf("unknown content type %q", r.Header.Get("Content-Type"))) diff --git a/kube/client.go b/kube/client.go index 48aa658c5..020a8776e 100644 --- a/kube/client.go +++ b/kube/client.go @@ -14,11 +14,15 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" + "net/url" "os" "path/filepath" "sync" "time" + + "tailscale.com/util/multierr" ) const ( @@ -26,7 +30,19 @@ const ( defaultURL = "https://kubernetes.default.svc" ) +// rootPathForTests is set by tests to override the root path to the +// service account directory. +var rootPathForTests string + +// SetRootPathForTesting sets the path to the service account directory. +func SetRootPathForTesting(p string) { + rootPathForTests = p +} + func readFile(n string) ([]byte, error) { + if rootPathForTests != "" { + return os.ReadFile(filepath.Join(rootPathForTests, saPath, n)) + } return os.ReadFile(filepath.Join(saPath, n)) } @@ -68,6 +84,12 @@ func New() (*Client, error) { }, nil } +// SetURL sets the URL to use for the Kubernetes API. +// This is used only for testing. +func (c *Client) SetURL(url string) { + c.url = url +} + func (c *Client) expireToken() { c.mu.Lock() defer c.mu.Unlock() @@ -111,28 +133,27 @@ func getError(resp *http.Response) error { return st } -func (c *Client) doRequest(ctx context.Context, method, url string, in, out any) error { - tk, err := c.getOrRenewToken() - if err != nil { - return err - } - var body io.Reader - if in != nil { - var b bytes.Buffer - if err := json.NewEncoder(&b).Encode(in); err != nil { - return err - } - body = &b +func setHeader(key, value string) func(*http.Request) { + return func(req *http.Request) { + req.Header.Set(key, value) } - req, err := http.NewRequestWithContext(ctx, method, url, body) +} + +// doRequest performs an HTTP request to the Kubernetes API. +// If in is not nil, it is expected to be a JSON-encodable object and will be +// sent as the request body. +// If out is not nil, it is expected to be a pointer to an object that can be +// decoded from JSON. +// If the request fails with a 401, the token is expired and a new one is +// requested. +func (c *Client) doRequest(ctx context.Context, method, url string, in, out any, opts ...func(*http.Request)) error { + req, err := c.newRequest(ctx, method, url, in) if err != nil { return err } - if body != nil { - req.Header.Add("Content-Type", "application/json") + for _, opt := range opts { + opt(req) } - req.Header.Add("Accept", "application/json") - req.Header.Add("Authorization", "Bearer "+tk) resp, err := c.client.Do(req) if err != nil { return err @@ -150,6 +171,36 @@ func (c *Client) doRequest(ctx context.Context, method, url string, in, out any) return nil } +func (c *Client) newRequest(ctx context.Context, method, url string, in any) (*http.Request, error) { + tk, err := c.getOrRenewToken() + if err != nil { + return nil, err + } + var body io.Reader + if in != nil { + switch in := in.(type) { + case []byte: + body = bytes.NewReader(in) + default: + var b bytes.Buffer + if err := json.NewEncoder(&b).Encode(in); err != nil { + return nil, err + } + body = &b + } + } + req, err := http.NewRequestWithContext(ctx, method, url, body) + if err != nil { + return nil, err + } + if body != nil { + req.Header.Add("Content-Type", "application/json") + } + req.Header.Add("Accept", "application/json") + req.Header.Add("Authorization", "Bearer "+tk) + return req, nil +} + // GetSecret fetches the secret from the Kubernetes API. func (c *Client) GetSecret(ctx context.Context, name string) (*Secret, error) { s := &Secret{Data: make(map[string][]byte)} @@ -169,3 +220,97 @@ func (c *Client) CreateSecret(ctx context.Context, s *Secret) error { func (c *Client) UpdateSecret(ctx context.Context, s *Secret) error { return c.doRequest(ctx, "PUT", c.secretURL(s.Name), s, nil) } + +// JSONPatch is a JSON patch operation. +// It currently (2023-03-02) only supports the "remove" operation. +// +// https://tools.ietf.org/html/rfc6902 +type JSONPatch struct { + Op string `json:"op"` + Path string `json:"path"` +} + +// JSONPatchSecret updates a secret in the Kubernetes API using a JSON patch. +// It currently (2023-03-02) only supports the "remove" operation. +func (c *Client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error { + for _, p := range patch { + if p.Op != "remove" { + panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op)) + } + } + return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json")) +} + +// StrategicMergePatchSecret updates a secret in the Kubernetes API using a +// strategic merge patch. +// If a fieldManager is provided, it will be used to track the patch. +func (c *Client) StrategicMergePatchSecret(ctx context.Context, name string, s *Secret, fieldManager string) error { + surl := c.secretURL(name) + if fieldManager != "" { + uv := url.Values{ + "fieldManager": {fieldManager}, + } + surl += "?" + uv.Encode() + } + s.Namespace = c.ns + s.Name = name + return c.doRequest(ctx, "PATCH", surl, s, nil, setHeader("Content-Type", "application/strategic-merge-patch+json")) +} + +// CheckSecretPermissions checks the secret access permissions of the current +// pod. It returns an error if the basic permissions tailscale needs are +// missing, and reports whether the patch permission is additionally present. +// +// Errors encountered during the access checking process are logged, but ignored +// so that the pod tries to fail alive if the permissions exist and there's just +// something wrong with SelfSubjectAccessReviews. There shouldn't be, pods +// should always be able to use SSARs to assess their own permissions, but since +// we didn't use to check permissions this way we'll be cautious in case some +// old version of k8s deviates from the current behavior. +func (c *Client) CheckSecretPermissions(ctx context.Context, secretName string) (canPatch bool, err error) { + var errs []error + for _, verb := range []string{"get", "update"} { + ok, err := c.checkPermission(ctx, verb, secretName) + if err != nil { + log.Printf("error checking %s permission on secret %s: %v", verb, secretName, err) + } else if !ok { + errs = append(errs, fmt.Errorf("missing %s permission on secret %q", verb, secretName)) + } + } + if len(errs) > 0 { + return false, multierr.New(errs...) + } + ok, err := c.checkPermission(ctx, "patch", secretName) + if err != nil { + log.Printf("error checking patch permission on secret %s: %v", secretName, err) + return false, nil + } + return ok, nil +} + +// checkPermission reports whether the current pod has permission to use the +// given verb (e.g. get, update, patch) on secretName. +func (c *Client) checkPermission(ctx context.Context, verb, secretName string) (bool, error) { + sar := map[string]any{ + "apiVersion": "authorization.k8s.io/v1", + "kind": "SelfSubjectAccessReview", + "spec": map[string]any{ + "resourceAttributes": map[string]any{ + "namespace": c.ns, + "verb": verb, + "resource": "secrets", + "name": secretName, + }, + }, + } + var res struct { + Status struct { + Allowed bool `json:"allowed"` + } `json:"status"` + } + url := c.url + "/apis/authorization.k8s.io/v1/selfsubjectaccessreviews" + if err := c.doRequest(ctx, "POST", url, sar, &res); err != nil { + return false, err + } + return res.Status.Allowed, nil +}