pull/17407/merge
David Bond 1 day ago committed by GitHub
commit d14cee581c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -87,6 +87,9 @@
// cluster using the same hostname (in this case, the MagicDNS name of the ingress proxy) // cluster using the same hostname (in this case, the MagicDNS name of the ingress proxy)
// as a non-cluster workload on tailnet. // as a non-cluster workload on tailnet.
// This is only meant to be configured by the Kubernetes operator. // This is only meant to be configured by the Kubernetes operator.
// - EXPERIMENTAL_DISCONNECT_ON_SHUTDOWN: if set to true, the containerboot instance
// will disconnect from the control-plane on shutdown. This is used by HA subnet
// routers and exit nodes to more quickly trigger failover to other replicas.
// //
// When running on Kubernetes, containerboot defaults to storing state in the // When running on Kubernetes, containerboot defaults to storing state in the
// "tailscale" kube secret. To store state on local disk instead, set // "tailscale" kube secret. To store state on local disk instead, set
@ -118,6 +121,7 @@ import (
"time" "time"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"tailscale.com/client/local"
"tailscale.com/client/tailscale" "tailscale.com/client/tailscale"
"tailscale.com/ipn" "tailscale.com/ipn"
kubeutils "tailscale.com/k8s-operator" kubeutils "tailscale.com/k8s-operator"
@ -205,34 +209,7 @@ func run() error {
if err != nil { if err != nil {
return fmt.Errorf("failed to bring up tailscale: %w", err) return fmt.Errorf("failed to bring up tailscale: %w", err)
} }
killTailscaled := func() { defer killTailscaled(client, cfg, daemonProcess, kc)
// The default termination grace period for a Pod is 30s. We wait 25s at
// most so that we still reserve some of that budget for tailscaled
// to receive and react to a SIGTERM before the SIGKILL that k8s
// will send at the end of the grace period.
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
defer cancel()
if err := services.EnsureServicesNotAdvertised(ctx, client, log.Printf); err != nil {
log.Printf("Error ensuring services are not advertised: %v", err)
}
if hasKubeStateStore(cfg) {
// Check we're not shutting tailscaled down while it's still writing
// state. If we authenticate and fail to write all the state, we'll
// never recover automatically.
log.Printf("Checking for consistent state")
err := kc.waitForConsistentState(ctx)
if err != nil {
log.Printf("Error waiting for consistent state on shutdown: %v", err)
}
}
log.Printf("Sending SIGTERM to tailscaled")
if err := daemonProcess.Signal(unix.SIGTERM); err != nil {
log.Fatalf("error shutting tailscaled down: %v", err)
}
}
defer killTailscaled()
var healthCheck *healthz.Healthz var healthCheck *healthz.Healthz
ep := &egressProxy{} ep := &egressProxy{}
@ -491,7 +468,7 @@ runLoop:
// have started the reaper defined below, we need to // have started the reaper defined below, we need to
// kill tailscaled and let reaper clean up child // kill tailscaled and let reaper clean up child
// processes. // processes.
killTailscaled() killTailscaled(client, cfg, daemonProcess, kc)
break runLoop break runLoop
case err := <-errChan: case err := <-errChan:
return fmt.Errorf("failed to read from tailscaled: %w", err) return fmt.Errorf("failed to read from tailscaled: %w", err)
@ -499,7 +476,7 @@ runLoop:
return fmt.Errorf("failed to watch tailscaled config: %w", err) return fmt.Errorf("failed to watch tailscaled config: %w", err)
case n := <-notifyChan: case n := <-notifyChan:
if n.State != nil && *n.State != ipn.Running { if n.State != nil && *n.State != ipn.Running {
// Something's gone wrong and we've left the authenticated state. // Something's gone wrong, and we've left the authenticated state.
// Our container image never recovered gracefully from this, and the // Our container image never recovered gracefully from this, and the
// control flow required to make it work now is hard. So, just crash // control flow required to make it work now is hard. So, just crash
// the container and rely on the container runtime to restart us, // the container and rely on the container runtime to restart us,
@ -632,11 +609,11 @@ runLoop:
// route setup has succeeded. IPs and FQDN are // route setup has succeeded. IPs and FQDN are
// read from the Secret by the Tailscale // read from the Secret by the Tailscale
// Kubernetes operator and, for some proxy // Kubernetes operator and, for some proxy
// types, such as Tailscale Ingress, advertized // types, such as Tailscale Ingress, advertised
// on the Ingress status. Writing them to the // on the Ingress status. Writing them to the
// Secret only after the proxy routing has been // Secret only after the proxy routing has been
// set up ensures that the operator does not // set up ensures that the operator does not
// advertize endpoints of broken proxies. // advertise endpoints of broken proxies.
// TODO (irbekrm): instead of using the IP and FQDN, have some other mechanism for the proxy signal that it is 'Ready'. // TODO (irbekrm): instead of using the IP and FQDN, have some other mechanism for the proxy signal that it is 'Ready'.
deviceEndpoints := []any{n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses()} deviceEndpoints := []any{n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses()}
if hasKubeStateStore(cfg) && deephash.Update(&currentDeviceEndpoints, &deviceEndpoints) { if hasKubeStateStore(cfg) && deephash.Update(&currentDeviceEndpoints, &deviceEndpoints) {
@ -892,3 +869,43 @@ func runHTTPServer(mux *http.ServeMux, addr string) (close func() error) {
return errors.Join(err, ln.Close()) return errors.Join(err, ln.Close())
} }
} }
func killTailscaled(client *local.Client, cfg *settings, daemonProcess *os.Process, kc *kubeClient) {
// The default termination grace period for a Pod is 30s. We wait 25s at
// most so that we still reserve some of that budget for tailscaled
// to receive and react to a SIGTERM before the SIGKILL that k8s
// will send at the end of the grace period.
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
defer cancel()
if err := services.EnsureServicesNotAdvertised(ctx, client, log.Printf); err != nil {
log.Printf("Error ensuring services are not advertised: %v", err)
}
if hasKubeStateStore(cfg) {
// Check we're not shutting tailscaled down while it's still writing
// state. If we authenticate and fail to write all the state, we'll
// never recover automatically.
log.Printf("Checking for consistent state")
if err := kc.waitForConsistentState(ctx); err != nil {
log.Printf("Error waiting for consistent state on shutdown: %v", err)
}
}
if cfg.DisconnectOnShutdown {
// Forcibly disconnect the local Tailscale instance from the control plane. This is useful when running as a HA
// app connector or subnet router to speed up switching over to another replica.
if err := client.DisconnectControl(ctx); err != nil {
log.Printf("Error disconnecting from control: %v", err)
}
}
log.Printf("Sending SIGTERM to tailscaled")
if err := daemonProcess.Signal(unix.SIGTERM); err != nil {
log.Fatalf("error shutting tailscaled down: %v", err)
}
// Run out the clock for the grace period, so that any clients still connected have enough time to get a netmap
// update and switch over.
<-ctx.Done()
}

@ -1104,8 +1104,8 @@ func TestContainerBoot(t *testing.T) {
cmd.Process.Signal(*p.Signal) cmd.Process.Signal(*p.Signal)
} }
if p.WantLog != "" { if p.WantLog != "" {
err := tstest.WaitFor(2*time.Second, func() error { err := tstest.WaitFor(time.Minute, func() error {
waitLogLine(t, time.Second, cbOut, p.WantLog) waitLogLine(t, time.Minute, cbOut, p.WantLog)
return nil return nil
}) })
if err != nil { if err != nil {
@ -1213,6 +1213,8 @@ func (b *lockingBuffer) String() string {
// waitLogLine fails the entire test if path doesn't contain want // waitLogLine fails the entire test if path doesn't contain want
// before the timeout. // before the timeout.
func waitLogLine(t *testing.T, timeout time.Duration, b *lockingBuffer, want string) { func waitLogLine(t *testing.T, timeout time.Duration, b *lockingBuffer, want string) {
t.Helper()
deadline := time.Now().Add(timeout) deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) { for time.Now().Before(deadline) {
for _, line := range strings.Split(b.String(), "\n") { for _, line := range strings.Split(b.String(), "\n") {
@ -1338,6 +1340,17 @@ func (lc *localAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
w.Write([]byte("fake metrics")) w.Write([]byte("fake metrics"))
return return
case "/localapi/v0/disconnect-control":
if r.Method != "POST" {
panic(fmt.Sprintf("unsupported method %q", r.Method))
}
case "/localapi/v0/prefs":
if r.Method != "GET" {
panic(fmt.Sprintf("unsupported method %q", r.Method))
}
w.Write([]byte("{}"))
return
default: default:
panic(fmt.Sprintf("unsupported path %q", r.URL.Path)) panic(fmt.Sprintf("unsupported path %q", r.URL.Path))
} }

@ -81,6 +81,12 @@ type settings struct {
// certs) and 'rw' for Pods that should manage the TLS certs shared // certs) and 'rw' for Pods that should manage the TLS certs shared
// amongst the replicas. // amongst the replicas.
CertShareMode string CertShareMode string
// DisconnectOnShutdown is set for subnet routers & app connectors that
// are running in an HA configuration. When set, it forces the application
// to wait for the entirety of the termination grace period before exiting
// to give time for clients to receive an updated netmap that points them
// to an active subnet router/app connector.
DisconnectOnShutdown bool
} }
func configFromEnv() (*settings, error) { func configFromEnv() (*settings, error) {
@ -117,6 +123,7 @@ func configFromEnv() (*settings, error) {
EgressProxiesCfgPath: defaultEnv("TS_EGRESS_PROXIES_CONFIG_PATH", ""), EgressProxiesCfgPath: defaultEnv("TS_EGRESS_PROXIES_CONFIG_PATH", ""),
IngressProxiesCfgPath: defaultEnv("TS_INGRESS_PROXIES_CONFIG_PATH", ""), IngressProxiesCfgPath: defaultEnv("TS_INGRESS_PROXIES_CONFIG_PATH", ""),
PodUID: defaultEnv("POD_UID", ""), PodUID: defaultEnv("POD_UID", ""),
DisconnectOnShutdown: defaultBool("EXPERIMENTAL_DISCONNECT_ON_SHUTDOWN", false),
} }
podIPs, ok := os.LookupEnv("POD_IPS") podIPs, ok := os.LookupEnv("POD_IPS")
if ok { if ok {

@ -318,9 +318,9 @@ func run(logger *zap.SugaredLogger) error {
// Context cancelled, exit. // Context cancelled, exit.
logger.Info("Context cancelled, exiting") logger.Info("Context cancelled, exiting")
shutdownCtx, shutdownCancel := context.WithTimeout(serveCtx, 20*time.Second) shutdownCtx, shutdownCancel := context.WithTimeout(serveCtx, 20*time.Second)
defer shutdownCancel()
unadvertiseErr := services.EnsureServicesNotAdvertised(shutdownCtx, lc, logger.Infof) unadvertiseErr := services.EnsureServicesNotAdvertised(shutdownCtx, lc, logger.Infof)
shutdownCancel() <-shutdownCtx.Done()
serveCancel()
return errors.Join(unadvertiseErr, group.Wait()) return errors.Join(unadvertiseErr, group.Wait())
case cfg = <-cfgChan: case cfg = <-cfgChan:
// Handle config reload. // Handle config reload.

@ -8,7 +8,6 @@ package services
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"tailscale.com/client/local" "tailscale.com/client/local"
"tailscale.com/ipn" "tailscale.com/ipn"
@ -24,40 +23,21 @@ func EnsureServicesNotAdvertised(ctx context.Context, lc *local.Client, logf log
if err != nil { if err != nil {
return fmt.Errorf("error getting prefs: %w", err) return fmt.Errorf("error getting prefs: %w", err)
} }
if len(prefs.AdvertiseServices) == 0 { if len(prefs.AdvertiseServices) == 0 {
return nil return nil
} }
logf("unadvertising services: %v", prefs.AdvertiseServices) logf("unadvertising services: %v", prefs.AdvertiseServices)
if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{ _, err = lc.EditPrefs(ctx, &ipn.MaskedPrefs{
AdvertiseServicesSet: true, AdvertiseServicesSet: true,
Prefs: ipn.Prefs{ Prefs: ipn.Prefs{
AdvertiseServices: nil, AdvertiseServices: nil,
}, }})
}); err != nil { if err != nil {
// EditPrefs only returns an error if it fails _set_ its local prefs. // EditPrefs only returns an error if it fails to _set_ its local prefs.
// If it fails to _persist_ the prefs in state, we don't get an error
// and we continue waiting below, as control will failover as usual.
return fmt.Errorf("error setting prefs AdvertiseServices: %w", err) return fmt.Errorf("error setting prefs AdvertiseServices: %w", err)
} }
// Services use the same (failover XOR regional routing) mechanism that return nil
// HA subnet routers use. Unfortunately we don't yet get a reliable signal
// from control that it's responded to our unadvertisement, so the best we
// can do is wait for 20 seconds, where 15s is the approximate maximum time
// it should take for control to choose a new primary, and 5s is for buffer.
//
// Note: There is no guarantee that clients have been _informed_ of the new
// primary no matter how long we wait. We would need a mechanism to await
// netmap updates for peers to know for sure.
//
// See https://tailscale.com/kb/1115/high-availability for more details.
// TODO(tomhjp): Wait for a netmap update instead of sleeping when control
// supports that.
select {
case <-ctx.Done():
return nil
case <-time.After(20 * time.Second):
return nil
}
} }

Loading…
Cancel
Save