diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index f056d26f3..f091197bb 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -87,6 +87,9 @@ // cluster using the same hostname (in this case, the MagicDNS name of the ingress proxy) // as a non-cluster workload on tailnet. // This is only meant to be configured by the Kubernetes operator. +// - EXPERIMENTAL_DISCONNECT_ON_SHUTDOWN: if set to true, the containerboot instance +// will disconnect from the control-plane on shutdown. This is used by HA subnet +// routers and exit nodes to more quickly trigger failover to other replicas. // // When running on Kubernetes, containerboot defaults to storing state in the // "tailscale" kube secret. To store state on local disk instead, set @@ -118,6 +121,7 @@ import ( "time" "golang.org/x/sys/unix" + "tailscale.com/client/local" "tailscale.com/client/tailscale" "tailscale.com/ipn" kubeutils "tailscale.com/k8s-operator" @@ -205,34 +209,7 @@ func run() error { if err != nil { return fmt.Errorf("failed to bring up tailscale: %w", err) } - killTailscaled := func() { - // The default termination grace period for a Pod is 30s. We wait 25s at - // most so that we still reserve some of that budget for tailscaled - // to receive and react to a SIGTERM before the SIGKILL that k8s - // will send at the end of the grace period. - ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second) - defer cancel() - - if err := services.EnsureServicesNotAdvertised(ctx, client, log.Printf); err != nil { - log.Printf("Error ensuring services are not advertised: %v", err) - } - - if hasKubeStateStore(cfg) { - // Check we're not shutting tailscaled down while it's still writing - // state. If we authenticate and fail to write all the state, we'll - // never recover automatically. - log.Printf("Checking for consistent state") - err := kc.waitForConsistentState(ctx) - if err != nil { - log.Printf("Error waiting for consistent state on shutdown: %v", err) - } - } - log.Printf("Sending SIGTERM to tailscaled") - if err := daemonProcess.Signal(unix.SIGTERM); err != nil { - log.Fatalf("error shutting tailscaled down: %v", err) - } - } - defer killTailscaled() + defer killTailscaled(client, cfg, daemonProcess, kc) var healthCheck *healthz.Healthz ep := &egressProxy{} @@ -491,7 +468,7 @@ runLoop: // have started the reaper defined below, we need to // kill tailscaled and let reaper clean up child // processes. - killTailscaled() + killTailscaled(client, cfg, daemonProcess, kc) break runLoop case err := <-errChan: return fmt.Errorf("failed to read from tailscaled: %w", err) @@ -499,7 +476,7 @@ runLoop: return fmt.Errorf("failed to watch tailscaled config: %w", err) case n := <-notifyChan: if n.State != nil && *n.State != ipn.Running { - // Something's gone wrong and we've left the authenticated state. + // Something's gone wrong, and we've left the authenticated state. // Our container image never recovered gracefully from this, and the // control flow required to make it work now is hard. So, just crash // the container and rely on the container runtime to restart us, @@ -632,11 +609,11 @@ runLoop: // route setup has succeeded. IPs and FQDN are // read from the Secret by the Tailscale // Kubernetes operator and, for some proxy - // types, such as Tailscale Ingress, advertized + // types, such as Tailscale Ingress, advertised // on the Ingress status. Writing them to the // Secret only after the proxy routing has been // set up ensures that the operator does not - // advertize endpoints of broken proxies. + // advertise endpoints of broken proxies. // TODO (irbekrm): instead of using the IP and FQDN, have some other mechanism for the proxy signal that it is 'Ready'. deviceEndpoints := []any{n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses()} if hasKubeStateStore(cfg) && deephash.Update(¤tDeviceEndpoints, &deviceEndpoints) { @@ -892,3 +869,43 @@ func runHTTPServer(mux *http.ServeMux, addr string) (close func() error) { return errors.Join(err, ln.Close()) } } + +func killTailscaled(client *local.Client, cfg *settings, daemonProcess *os.Process, kc *kubeClient) { + // The default termination grace period for a Pod is 30s. We wait 25s at + // most so that we still reserve some of that budget for tailscaled + // to receive and react to a SIGTERM before the SIGKILL that k8s + // will send at the end of the grace period. + ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second) + defer cancel() + + if err := services.EnsureServicesNotAdvertised(ctx, client, log.Printf); err != nil { + log.Printf("Error ensuring services are not advertised: %v", err) + } + + if hasKubeStateStore(cfg) { + // Check we're not shutting tailscaled down while it's still writing + // state. If we authenticate and fail to write all the state, we'll + // never recover automatically. + log.Printf("Checking for consistent state") + if err := kc.waitForConsistentState(ctx); err != nil { + log.Printf("Error waiting for consistent state on shutdown: %v", err) + } + } + + if cfg.DisconnectOnShutdown { + // Forcibly disconnect the local Tailscale instance from the control plane. This is useful when running as a HA + // app connector or subnet router to speed up switching over to another replica. + if err := client.DisconnectControl(ctx); err != nil { + log.Printf("Error disconnecting from control: %v", err) + } + } + + log.Printf("Sending SIGTERM to tailscaled") + if err := daemonProcess.Signal(unix.SIGTERM); err != nil { + log.Fatalf("error shutting tailscaled down: %v", err) + } + + // Run out the clock for the grace period, so that any clients still connected have enough time to get a netmap + // update and switch over. + <-ctx.Done() +} diff --git a/cmd/containerboot/main_test.go b/cmd/containerboot/main_test.go index f92f35333..159a5476a 100644 --- a/cmd/containerboot/main_test.go +++ b/cmd/containerboot/main_test.go @@ -1104,8 +1104,8 @@ func TestContainerBoot(t *testing.T) { cmd.Process.Signal(*p.Signal) } if p.WantLog != "" { - err := tstest.WaitFor(2*time.Second, func() error { - waitLogLine(t, time.Second, cbOut, p.WantLog) + err := tstest.WaitFor(time.Minute, func() error { + waitLogLine(t, time.Minute, cbOut, p.WantLog) return nil }) if err != nil { @@ -1213,6 +1213,8 @@ func (b *lockingBuffer) String() string { // waitLogLine fails the entire test if path doesn't contain want // before the timeout. func waitLogLine(t *testing.T, timeout time.Duration, b *lockingBuffer, want string) { + t.Helper() + deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { for _, line := range strings.Split(b.String(), "\n") { @@ -1338,6 +1340,17 @@ func (lc *localAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { } w.Write([]byte("fake metrics")) return + case "/localapi/v0/disconnect-control": + if r.Method != "POST" { + panic(fmt.Sprintf("unsupported method %q", r.Method)) + } + case "/localapi/v0/prefs": + if r.Method != "GET" { + panic(fmt.Sprintf("unsupported method %q", r.Method)) + } + + w.Write([]byte("{}")) + return default: panic(fmt.Sprintf("unsupported path %q", r.URL.Path)) } diff --git a/cmd/containerboot/settings.go b/cmd/containerboot/settings.go index 5a8be9036..2516a6b63 100644 --- a/cmd/containerboot/settings.go +++ b/cmd/containerboot/settings.go @@ -81,6 +81,12 @@ type settings struct { // certs) and 'rw' for Pods that should manage the TLS certs shared // amongst the replicas. CertShareMode string + // DisconnectOnShutdown is set for subnet routers & app connectors that + // are running in an HA configuration. When set, it forces the application + // to wait for the entirety of the termination grace period before exiting + // to give time for clients to receive an updated netmap that points them + // to an active subnet router/app connector. + DisconnectOnShutdown bool } func configFromEnv() (*settings, error) { @@ -117,6 +123,7 @@ func configFromEnv() (*settings, error) { EgressProxiesCfgPath: defaultEnv("TS_EGRESS_PROXIES_CONFIG_PATH", ""), IngressProxiesCfgPath: defaultEnv("TS_INGRESS_PROXIES_CONFIG_PATH", ""), PodUID: defaultEnv("POD_UID", ""), + DisconnectOnShutdown: defaultBool("EXPERIMENTAL_DISCONNECT_ON_SHUTDOWN", false), } podIPs, ok := os.LookupEnv("POD_IPS") if ok { diff --git a/cmd/k8s-proxy/k8s-proxy.go b/cmd/k8s-proxy/k8s-proxy.go index 9b2bb6749..f4cd46c76 100644 --- a/cmd/k8s-proxy/k8s-proxy.go +++ b/cmd/k8s-proxy/k8s-proxy.go @@ -318,9 +318,9 @@ func run(logger *zap.SugaredLogger) error { // Context cancelled, exit. logger.Info("Context cancelled, exiting") shutdownCtx, shutdownCancel := context.WithTimeout(serveCtx, 20*time.Second) + defer shutdownCancel() unadvertiseErr := services.EnsureServicesNotAdvertised(shutdownCtx, lc, logger.Infof) - shutdownCancel() - serveCancel() + <-shutdownCtx.Done() return errors.Join(unadvertiseErr, group.Wait()) case cfg = <-cfgChan: // Handle config reload. diff --git a/kube/services/services.go b/kube/services/services.go index a9e50975c..edefdebd8 100644 --- a/kube/services/services.go +++ b/kube/services/services.go @@ -8,7 +8,6 @@ package services import ( "context" "fmt" - "time" "tailscale.com/client/local" "tailscale.com/ipn" @@ -24,40 +23,21 @@ func EnsureServicesNotAdvertised(ctx context.Context, lc *local.Client, logf log if err != nil { return fmt.Errorf("error getting prefs: %w", err) } + if len(prefs.AdvertiseServices) == 0 { return nil } logf("unadvertising services: %v", prefs.AdvertiseServices) - if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{ + _, err = lc.EditPrefs(ctx, &ipn.MaskedPrefs{ AdvertiseServicesSet: true, Prefs: ipn.Prefs{ AdvertiseServices: nil, - }, - }); err != nil { - // EditPrefs only returns an error if it fails _set_ its local prefs. - // If it fails to _persist_ the prefs in state, we don't get an error - // and we continue waiting below, as control will failover as usual. + }}) + if err != nil { + // EditPrefs only returns an error if it fails to _set_ its local prefs. return fmt.Errorf("error setting prefs AdvertiseServices: %w", err) } - // Services use the same (failover XOR regional routing) mechanism that - // HA subnet routers use. Unfortunately we don't yet get a reliable signal - // from control that it's responded to our unadvertisement, so the best we - // can do is wait for 20 seconds, where 15s is the approximate maximum time - // it should take for control to choose a new primary, and 5s is for buffer. - // - // Note: There is no guarantee that clients have been _informed_ of the new - // primary no matter how long we wait. We would need a mechanism to await - // netmap updates for peers to know for sure. - // - // See https://tailscale.com/kb/1115/high-availability for more details. - // TODO(tomhjp): Wait for a netmap update instead of sleeping when control - // supports that. - select { - case <-ctx.Done(): - return nil - case <-time.After(20 * time.Second): - return nil - } + return nil }