cmd/containerboot,kube,util/linuxfw: configure kube egress proxies to route to 1+ tailnet targets (#13531)

* cmd/containerboot,kube,util/linuxfw: configure kube egress proxies to route to 1+ tailnet targets

This commit is first part of the work to allow running multiple
replicas of the Kubernetes operator egress proxies per tailnet service +
to allow exposing multiple tailnet services via each proxy replica.

This expands the existing iptables/nftables-based proxy configuration
mechanism.

A proxy can now be configured to route to one or more tailnet targets
via a (mounted) config file that, for each tailnet target, specifies:
- the target's tailnet IP or FQDN
- mappings of container ports to which cluster workloads will send traffic to
tailnet target ports where the traffic should be forwarded.

Example configfile contents:
{
  "some-svc": {"tailnetTarget":{"fqdn":"foo.tailnetxyz.ts.net","ports"{"tcp:4006:80":{"protocol":"tcp","matchPort":4006,"targetPort":80},"tcp:4007:443":{"protocol":"tcp","matchPort":4007,"targetPort":443}}}}
}

A proxy that is configured with this config file will configure firewall rules
to route cluster traffic to the tailnet targets. It will then watch the config file
for updates as well as monitor relevant netmap updates and reconfigure firewall
as needed.

This adds a bunch of new iptables/nftables functionality to make it easier to dynamically update
the firewall rules without needing to restart the proxy Pod as well as to make
it easier to debug/understand the rules:

- for iptables, each portmapping is a DNAT rule with a comment pointing
at the 'service',i.e:

-A PREROUTING ! -i tailscale0 -p tcp -m tcp --dport 4006 -m comment --comment "some-svc:tcp:4006 -> tcp:80" -j DNAT --to-destination 100.64.1.18:80
Additionally there is a SNAT rule for each tailnet target, to mask the source address.

- for nftables, a separate prerouting chain is created for each tailnet target
and all the portmapping rules are placed in that chain. This makes it easier
to look up rules and delete services when no longer needed.
(nftables allows hooking a custom chain to a prerouting hook, so no extra work
is needed to ensure that the rules in the service chains are evaluated).

The next steps will be to get the Kubernetes Operator to generate
the configfile and ensure it is mounted to the relevant proxy nodes.

Updates tailscale/tailscale#13406

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
pull/13624/head
Irbe Krumina 4 weeks ago committed by GitHub
parent c62b0732d2
commit 096b090caf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -158,6 +158,7 @@ func main() {
PodIP: defaultEnv("POD_IP", ""), PodIP: defaultEnv("POD_IP", ""),
EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false), EnableForwardingOptimizations: defaultBool("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS", false),
HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""), HealthCheckAddrPort: defaultEnv("TS_HEALTHCHECK_ADDR_PORT", ""),
EgressSvcsCfgPath: defaultEnv("TS_EGRESS_SERVICES_CONFIG_PATH", ""),
} }
if err := cfg.validate(); err != nil { if err := cfg.validate(); err != nil {
@ -275,10 +276,8 @@ authLoop:
switch *n.State { switch *n.State {
case ipn.NeedsLogin: case ipn.NeedsLogin:
if isOneStepConfig(cfg) { if isOneStepConfig(cfg) {
// This could happen if this is the // This could happen if this is the first time tailscaled was run for this
// first time tailscaled was run for // device and the auth key was not passed via the configfile.
// this device and the auth key was not
// passed via the configfile.
log.Fatalf("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file.") log.Fatalf("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file.")
} }
if err := authTailscale(); err != nil { if err := authTailscale(); err != nil {
@ -376,6 +375,9 @@ authLoop:
} }
}) })
) )
// egressSvcsErrorChan will get an error sent to it if this containerboot instance is configured to expose 1+
// egress services in HA mode and errored.
var egressSvcsErrorChan = make(chan error)
defer t.Stop() defer t.Stop()
// resetTimer resets timer for when to next attempt to resolve the DNS // resetTimer resets timer for when to next attempt to resolve the DNS
// name for the proxy configured with TS_EXPERIMENTAL_DEST_DNS_NAME. The // name for the proxy configured with TS_EXPERIMENTAL_DEST_DNS_NAME. The
@ -401,6 +403,7 @@ authLoop:
failedResolveAttempts++ failedResolveAttempts++
} }
var egressSvcsNotify chan ipn.Notify
notifyChan := make(chan ipn.Notify) notifyChan := make(chan ipn.Notify)
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
@ -575,31 +578,50 @@ runLoop:
h.Unlock() h.Unlock()
healthzRunner() healthzRunner()
} }
if egressSvcsNotify != nil {
egressSvcsNotify <- n
}
} }
if !startupTasksDone { if !startupTasksDone {
// For containerboot instances that act as TCP // For containerboot instances that act as TCP proxies (proxying traffic to an endpoint
// proxies (proxying traffic to an endpoint // passed via one of the env vars that containerboot reads) and store state in a
// passed via one of the env vars that // Kubernetes Secret, we consider startup tasks done at the point when device info has
// containerbot reads) and store state in a // been successfully stored to state Secret. For all other containerboot instances, if
// Kubernetes Secret, we consider startup tasks // we just get to this point the startup tasks can be considered done.
// done at the point when device info has been
// successfully stored to state Secret.
// For all other containerboot instances, if we
// just get to this point the startup tasks can
// be considered done.
if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) { if !isL3Proxy(cfg) || !hasKubeStateStore(cfg) || (currentDeviceEndpoints != deephash.Sum{} && currentDeviceID != deephash.Sum{}) {
// This log message is used in tests to detect when all // This log message is used in tests to detect when all
// post-auth configuration is done. // post-auth configuration is done.
log.Println("Startup complete, waiting for shutdown signal") log.Println("Startup complete, waiting for shutdown signal")
startupTasksDone = true startupTasksDone = true
// Wait on tailscaled process. It won't // Configure egress proxy. Egress proxy will set up firewall rules to proxy
// be cleaned up by default when the // traffic to tailnet targets configured in the provided configuration file. It
// container exits as it is not PID1. // will then continuously monitor the config file and netmap updates and
// TODO (irbekrm): perhaps we can // reconfigure the firewall rules as needed. If any of its operations fail, it
// replace the reaper by a running // will crash this node.
// cmd.Wait in a goroutine immediately if cfg.EgressSvcsCfgPath != "" {
// after starting tailscaled? log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressSvcsCfgPath)
egressSvcsNotify = make(chan ipn.Notify)
ep := egressProxy{
cfgPath: cfg.EgressSvcsCfgPath,
nfr: nfr,
kc: kc,
stateSecret: cfg.KubeSecret,
netmapChan: egressSvcsNotify,
podIP: cfg.PodIP,
tailnetAddrs: addrs,
}
go func() {
if err := ep.run(ctx, n); err != nil {
egressSvcsErrorChan <- err
}
}()
}
// Wait on tailscaled process. It won't be cleaned up by default when the
// container exits as it is not PID1. TODO (irbekrm): perhaps we can replace the
// reaper by a running cmd.Wait in a goroutine immediately after starting
// tailscaled?
reaper := func() { reaper := func() {
defer wg.Done() defer wg.Done()
for { for {
@ -637,6 +659,8 @@ runLoop:
} }
backendAddrs = newBackendAddrs backendAddrs = newBackendAddrs
resetTimer(false) resetTimer(false)
case e := <-egressSvcsErrorChan:
log.Fatalf("egress proxy failed: %v", e)
} }
} }
wg.Wait() wg.Wait()

@ -0,0 +1,570 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/netip"
"os"
"path/filepath"
"reflect"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"tailscale.com/ipn"
"tailscale.com/kube/egressservices"
"tailscale.com/kube/kubeclient"
"tailscale.com/tailcfg"
"tailscale.com/util/linuxfw"
"tailscale.com/util/mak"
)
const tailscaleTunInterface = "tailscale0"
// This file contains functionality to run containerboot as a proxy that can
// route cluster traffic to one or more tailnet targets, based on portmapping
// rules read from a configfile. Currently (9/2024) this is only used for the
// Kubernetes operator egress proxies.
// egressProxy knows how to configure firewall rules to route cluster traffic to
// one or more tailnet services.
type egressProxy struct {
cfgPath string // path to egress service config file
nfr linuxfw.NetfilterRunner // never nil
kc kubeclient.Client // never nil
stateSecret string // name of the kube state Secret
netmapChan chan ipn.Notify // chan to receive netmap updates on
podIP string // never empty string
// tailnetFQDNs is the egress service FQDN to tailnet IP mappings that
// were last used to configure firewall rules for this proxy.
// TODO(irbekrm): target addresses are also stored in the state Secret.
// Evaluate whether we should retrieve them from there and not store in
// memory at all.
targetFQDNs map[string][]netip.Prefix
// used to configure firewall rules.
tailnetAddrs []netip.Prefix
}
// run configures egress proxy firewall rules and ensures that the firewall rules are reconfigured when:
// - the mounted egress config has changed
// - the proxy's tailnet IP addresses have changed
// - tailnet IPs have changed for any backend targets specified by tailnet FQDN
func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error {
var tickChan <-chan time.Time
var eventChan <-chan fsnotify.Event
// TODO (irbekrm): take a look if this can be pulled into a single func
// shared with serve config loader.
if w, err := fsnotify.NewWatcher(); err != nil {
log.Printf("failed to create fsnotify watcher, timer-only mode: %v", err)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
tickChan = ticker.C
} else {
defer w.Close()
if err := w.Add(filepath.Dir(ep.cfgPath)); err != nil {
return fmt.Errorf("failed to add fsnotify watch: %w", err)
}
eventChan = w.Events
}
if err := ep.sync(ctx, n); err != nil {
return err
}
for {
var err error
select {
case <-ctx.Done():
return nil
case <-tickChan:
err = ep.sync(ctx, n)
case <-eventChan:
log.Printf("config file change detected, ensuring firewall config is up to date...")
err = ep.sync(ctx, n)
case n = <-ep.netmapChan:
shouldResync := ep.shouldResync(n)
if shouldResync {
log.Printf("netmap change detected, ensuring firewall config is up to date...")
err = ep.sync(ctx, n)
}
}
if err != nil {
return fmt.Errorf("error syncing egress service config: %w", err)
}
}
}
// sync triggers an egress proxy config resync. The resync calculates the diff between config and status to determine if
// any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current
// firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such
// as failed firewall update
func (ep *egressProxy) sync(ctx context.Context, n ipn.Notify) error {
cfgs, err := ep.getConfigs()
if err != nil {
return fmt.Errorf("error retrieving egress service configs: %w", err)
}
status, err := ep.getStatus(ctx)
if err != nil {
return fmt.Errorf("error retrieving current egress proxy status: %w", err)
}
newStatus, err := ep.syncEgressConfigs(cfgs, status, n)
if err != nil {
return fmt.Errorf("error syncing egress service configs: %w", err)
}
if !servicesStatusIsEqual(newStatus, status) {
if err := ep.setStatus(ctx, newStatus, n); err != nil {
return fmt.Errorf("error setting egress proxy status: %w", err)
}
}
return nil
}
// addrsHaveChanged returns true if the provided netmap update contains tailnet address change for this proxy node.
// Netmap must not be nil.
func (ep *egressProxy) addrsHaveChanged(n ipn.Notify) bool {
return !reflect.DeepEqual(ep.tailnetAddrs, n.NetMap.SelfNode.Addresses())
}
// syncEgressConfigs adds and deletes firewall rules to match the desired
// configuration. It uses the provided status to determine what is currently
// applied and updates the status after a successful sync.
func (ep *egressProxy) syncEgressConfigs(cfgs *egressservices.Configs, status *egressservices.Status, n ipn.Notify) (*egressservices.Status, error) {
if !(wantsServicesConfigured(cfgs) || hasServicesConfigured(status)) {
return nil, nil
}
// Delete unnecessary services.
if err := ep.deleteUnnecessaryServices(cfgs, status); err != nil {
return nil, fmt.Errorf("error deleting services: %w", err)
}
newStatus := &egressservices.Status{}
if !wantsServicesConfigured(cfgs) {
return newStatus, nil
}
// Add new services, update rules for any that have changed.
rulesPerSvcToAdd := make(map[string][]rule, 0)
rulesPerSvcToDelete := make(map[string][]rule, 0)
for svcName, cfg := range *cfgs {
tailnetTargetIPs, err := ep.tailnetTargetIPsForSvc(cfg, n)
if err != nil {
return nil, fmt.Errorf("error determining tailnet target IPs: %w", err)
}
rulesToAdd, rulesToDelete, err := updatesForCfg(svcName, cfg, status, tailnetTargetIPs)
if err != nil {
return nil, fmt.Errorf("error validating service changes: %v", err)
}
log.Printf("syncegressservices: looking at svc %s rulesToAdd %d rulesToDelete %d", svcName, len(rulesToAdd), len(rulesToDelete))
if len(rulesToAdd) != 0 {
mak.Set(&rulesPerSvcToAdd, svcName, rulesToAdd)
}
if len(rulesToDelete) != 0 {
mak.Set(&rulesPerSvcToDelete, svcName, rulesToDelete)
}
if len(rulesToAdd) != 0 || ep.addrsHaveChanged(n) {
// For each tailnet target, set up SNAT from the local tailnet device address of the matching
// family.
for _, t := range tailnetTargetIPs {
if t.Is6() && !ep.nfr.HasIPV6NAT() {
continue
}
var local netip.Addr
for _, pfx := range n.NetMap.SelfNode.Addresses().All() {
if !pfx.IsSingleIP() {
continue
}
if pfx.Addr().Is4() != t.Is4() {
continue
}
local = pfx.Addr()
break
}
if !local.IsValid() {
return nil, fmt.Errorf("no valid local IP: %v", local)
}
// TODO(irbekrm): only create the SNAT rule if it does not already exist.
if err := ep.nfr.AddSNATRuleForDst(local, t); err != nil {
return nil, fmt.Errorf("error setting up SNAT rule: %w", err)
}
}
}
// Update the status. Status will be written back to the state Secret by the caller.
mak.Set(&newStatus.Services, svcName, &egressservices.ServiceStatus{TailnetTargetIPs: tailnetTargetIPs, TailnetTarget: cfg.TailnetTarget, Ports: cfg.Ports})
}
// Actually apply the firewall rules.
if err := ensureRulesAdded(rulesPerSvcToAdd, ep.nfr); err != nil {
return nil, fmt.Errorf("error adding rules: %w", err)
}
if err := ensureRulesDeleted(rulesPerSvcToDelete, ep.nfr); err != nil {
return nil, fmt.Errorf("error deleting rules: %w", err)
}
return newStatus, nil
}
// updatesForCfg calculates any rules that need to be added or deleted for an individucal egress service config.
func updatesForCfg(svcName string, cfg egressservices.Config, status *egressservices.Status, tailnetTargetIPs []netip.Addr) ([]rule, []rule, error) {
rulesToAdd := make([]rule, 0)
rulesToDelete := make([]rule, 0)
currentConfig, ok := lookupCurrentConfig(svcName, status)
// If no rules for service are present yet, add them all.
if !ok {
for _, t := range tailnetTargetIPs {
for ports := range cfg.Ports {
log.Printf("syncegressservices: svc %s adding port %v", svcName, ports)
rulesToAdd = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: t})
}
}
return rulesToAdd, rulesToDelete, nil
}
// If there are no backend targets available, delete any currently configured rules.
if len(tailnetTargetIPs) == 0 {
log.Printf("tailnet target for egress service %s does not have any backend addresses, deleting all rules", svcName)
for _, ip := range currentConfig.TailnetTargetIPs {
for ports := range currentConfig.Ports {
rulesToDelete = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip})
}
}
return rulesToAdd, rulesToDelete, nil
}
// If there are rules present for backend targets that no longer match, delete them.
for _, ip := range currentConfig.TailnetTargetIPs {
var found bool
for _, wantsIP := range tailnetTargetIPs {
if reflect.DeepEqual(ip, wantsIP) {
found = true
break
}
}
if !found {
for ports := range currentConfig.Ports {
rulesToDelete = append(rulesToDelete, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip})
}
}
}
// Sync rules for the currently wanted backend targets.
for _, ip := range tailnetTargetIPs {
// If the backend target is not yet present in status, add all rules.
var found bool
for _, gotIP := range currentConfig.TailnetTargetIPs {
if reflect.DeepEqual(ip, gotIP) {
found = true
break
}
}
if !found {
for ports := range cfg.Ports {
rulesToAdd = append(rulesToAdd, rule{tailnetPort: ports.TargetPort, containerPort: ports.MatchPort, protocol: ports.Protocol, tailnetIP: ip})
}
continue
}
// If the backend target is present in status, check that the
// currently applied rules are up to date.
// Delete any current portmappings that are no longer present in config.
for port := range currentConfig.Ports {
if _, ok := cfg.Ports[port]; ok {
continue
}
rulesToDelete = append(rulesToDelete, rule{tailnetPort: port.TargetPort, containerPort: port.MatchPort, protocol: port.Protocol, tailnetIP: ip})
}
// Add any new portmappings.
for port := range cfg.Ports {
if _, ok := currentConfig.Ports[port]; ok {
continue
}
rulesToAdd = append(rulesToAdd, rule{tailnetPort: port.TargetPort, containerPort: port.MatchPort, protocol: port.Protocol, tailnetIP: ip})
}
}
return rulesToAdd, rulesToDelete, nil
}
// deleteUnneccessaryServices ensure that any services found on status, but not
// present in config are deleted.
func (ep *egressProxy) deleteUnnecessaryServices(cfgs *egressservices.Configs, status *egressservices.Status) error {
if !hasServicesConfigured(status) {
return nil
}
if !wantsServicesConfigured(cfgs) {
for svcName, svc := range status.Services {
log.Printf("service %s is no longer required, deleting", svcName)
if err := ensureServiceDeleted(svcName, svc, ep.nfr); err != nil {
return fmt.Errorf("error deleting service %s: %w", svcName, err)
}
}
return nil
}
for svcName, svc := range status.Services {
if _, ok := (*cfgs)[svcName]; !ok {
log.Printf("service %s is no longer required, deleting", svcName)
if err := ensureServiceDeleted(svcName, svc, ep.nfr); err != nil {
return fmt.Errorf("error deleting service %s: %w", svcName, err)
}
// TODO (irbekrm): also delete the SNAT rule here
}
}
return nil
}
// getConfigs gets the mounted egress service configuration.
func (ep *egressProxy) getConfigs() (*egressservices.Configs, error) {
j, err := os.ReadFile(ep.cfgPath)
if os.IsNotExist(err) {
return nil, nil
}
if err != nil {
return nil, err
}
if len(j) == 0 || string(j) == "" {
return nil, nil
}
cfg := &egressservices.Configs{}
if err := json.Unmarshal(j, &cfg); err != nil {
return nil, err
}
return cfg, nil
}
// getStatus gets the current status of the configured firewall. The current
// status is stored in state Secret. Returns nil status if no status that
// applies to the current proxy Pod was found. Uses the Pod IP to determine if a
// status found in the state Secret applies to this proxy Pod.
func (ep *egressProxy) getStatus(ctx context.Context) (*egressservices.Status, error) {
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return nil, fmt.Errorf("error retrieving state secret: %w", err)
}
status := &egressservices.Status{}
raw, ok := secret.Data[egressservices.KeyEgressServices]
if !ok {
return nil, nil
}
if err := json.Unmarshal([]byte(raw), status); err != nil {
return nil, fmt.Errorf("error unmarshalling previous config: %w", err)
}
if reflect.DeepEqual(status.PodIP, ep.podIP) {
return status, nil
}
return nil, nil
}
// setStatus writes egress proxy's currently configured firewall to the state
// Secret and updates proxy's tailnet addresses.
func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, n ipn.Notify) error {
// Pod IP is used to determine if a stored status applies to THIS proxy Pod.
status.PodIP = ep.podIP
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil {
return fmt.Errorf("error retrieving state Secret: %w", err)
}
bs, err := json.Marshal(status)
if err != nil {
return fmt.Errorf("error marshalling service config: %w", err)
}
secret.Data[egressservices.KeyEgressServices] = bs
patch := kubeclient.JSONPatch{
Op: "replace",
Path: fmt.Sprintf("/data/%s", egressservices.KeyEgressServices),
Value: bs,
}
if err := ep.kc.JSONPatchSecret(ctx, ep.stateSecret, []kubeclient.JSONPatch{patch}); err != nil {
return fmt.Errorf("error patching state Secret: %w", err)
}
ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice()
return nil
}
// tailnetTargetIPsForSvc returns the tailnet IPs to which traffic for this
// egress service should be proxied. The egress service can be configured by IP
// or by FQDN. If it's configured by IP, just return that. If it's configured by
// FQDN, resolve the FQDN and return the resolved IPs.
func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.Notify) (addrs []netip.Addr, err error) {
if svc.TailnetTarget.IP != "" {
addr, err := netip.ParseAddr(svc.TailnetTarget.IP)
if err != nil {
return nil, fmt.Errorf("error parsing tailnet target IP: %w", err)
}
return []netip.Addr{addr}, nil
}
if svc.TailnetTarget.FQDN == "" {
return nil, errors.New("unexpected egress service config- neither tailnet target IP nor FQDN is set")
}
if n.NetMap == nil {
log.Printf("netmap is not available, unable to determine backend addresses for %s", svc.TailnetTarget.FQDN)
return addrs, nil
}
var (
node tailcfg.NodeView
nodeFound bool
)
for _, nn := range n.NetMap.Peers {
if equalFQDNs(nn.Name(), svc.TailnetTarget.FQDN) {
node = nn
nodeFound = true
break
}
}
if nodeFound {
for _, addr := range node.Addresses().AsSlice() {
addrs = append(addrs, addr.Addr())
}
// Egress target endpoints configured via FQDN are stored, so
// that we can determine if a netmap update should trigger a
// resync.
mak.Set(&ep.targetFQDNs, svc.TailnetTarget.FQDN, node.Addresses().AsSlice())
}
return addrs, nil
}
// shouldResync parses netmap update and returns true if the update contains
// changes for which the egress proxy's firewall should be reconfigured.
func (ep *egressProxy) shouldResync(n ipn.Notify) bool {
if n.NetMap == nil {
return false
}
// If proxy's tailnet addresses have changed, resync.
if !reflect.DeepEqual(n.NetMap.SelfNode.Addresses().AsSlice(), ep.tailnetAddrs) {
log.Printf("node addresses have changed, trigger egress config resync")
ep.tailnetAddrs = n.NetMap.SelfNode.Addresses().AsSlice()
return true
}
// If the IPs for any of the egress services configured via FQDN have
// changed, resync.
for fqdn, ips := range ep.targetFQDNs {
for _, nn := range n.NetMap.Peers {
if equalFQDNs(nn.Name(), fqdn) {
if !reflect.DeepEqual(ips, nn.Addresses().AsSlice()) {
log.Printf("backend addresses for egress target %q have changed old IPs %v, new IPs %v trigger egress config resync", nn.Name(), ips, nn.Addresses().AsSlice())
}
return true
}
}
}
return false
}
// ensureServiceDeleted ensures that any rules for an egress service are removed
// from the firewall configuration.
func ensureServiceDeleted(svcName string, svc *egressservices.ServiceStatus, nfr linuxfw.NetfilterRunner) error {
// Note that the portmap is needed for iptables based firewall only.
// Nftables group rules for a service in a chain, so there is no need to
// specify individual portmapping based rules.
pms := make([]linuxfw.PortMap, 0)
for pm := range svc.Ports {
pms = append(pms, linuxfw.PortMap{MatchPort: pm.MatchPort, TargetPort: pm.TargetPort, Protocol: pm.Protocol})
}
if err := nfr.DeleteSvc(svcName, tailscaleTunInterface, svc.TailnetTargetIPs, pms); err != nil {
return fmt.Errorf("error deleting service %s: %w", svcName, err)
}
return nil
}
// ensureRulesAdded ensures that all portmapping rules are added to the firewall
// configuration. For any rules that already exist, calling this function is a
// no-op. In case of nftables, a service consists of one or two (one per IP
// family) chains that conain the portmapping rules for the service and the
// chains as needed when this function is called.
func ensureRulesAdded(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error {
for svc, rules := range rulesPerSvc {
for _, rule := range rules {
if rule.tailnetIP.Is6() && !nfr.HasIPV6NAT() {
log.Printf("host does not support IPv6 NAT; skipping IPv6 target %s", rule.tailnetIP)
continue
}
log.Printf("ensureRulesAdded svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol)
if err := nfr.EnsurePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil {
return fmt.Errorf("error ensuring rule: %w", err)
}
}
}
return nil
}
// ensureRulesDeleted ensures that the given rules are deleted from the firewall
// configuration. For any rules that do not exist, calling this funcion is a
// no-op.
func ensureRulesDeleted(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error {
for svc, rules := range rulesPerSvc {
for _, rule := range rules {
if rule.tailnetIP.Is6() && !nfr.HasIPV6NAT() {
log.Printf("host does not support IPv6 NAT; skipping IPv6 target %s", rule.tailnetIP)
continue
}
log.Printf("ensureRulesDeleted svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol)
if err := nfr.DeletePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil {
return fmt.Errorf("error deleting rule: %w", err)
}
}
}
return nil
}
func lookupCurrentConfig(svcName string, status *egressservices.Status) (*egressservices.ServiceStatus, bool) {
if status == nil || len(status.Services) == 0 {
return nil, false
}
c, ok := status.Services[svcName]
return c, ok
}
func equalFQDNs(s, s1 string) bool {
s, _ = strings.CutSuffix(s, ".")
s1, _ = strings.CutSuffix(s1, ".")
return strings.EqualFold(s, s1)
}
// rule contains configuration for an egress proxy firewall rule.
type rule struct {
containerPort uint16 // port to match incoming traffic
tailnetPort uint16 // tailnet service port
tailnetIP netip.Addr // tailnet service IP
protocol string
}
func wantsServicesConfigured(cfgs *egressservices.Configs) bool {
return cfgs != nil && len(*cfgs) != 0
}
func hasServicesConfigured(status *egressservices.Status) bool {
return status != nil && len(status.Services) != 0
}
func servicesStatusIsEqual(st, st1 *egressservices.Status) bool {
if st == nil && st1 == nil {
return true
}
if st == nil || st1 == nil {
return false
}
st.PodIP = ""
st1.PodIP = ""
return reflect.DeepEqual(*st, *st1)
}

@ -0,0 +1,175 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package main
import (
"net/netip"
"reflect"
"testing"
"tailscale.com/kube/egressservices"
)
func Test_updatesForSvc(t *testing.T) {
tailnetIPv4, tailnetIPv6 := netip.MustParseAddr("100.99.99.99"), netip.MustParseAddr("fd7a:115c:a1e0::701:b62a")
tailnetIPv4_1, tailnetIPv6_1 := netip.MustParseAddr("100.88.88.88"), netip.MustParseAddr("fd7a:115c:a1e0::4101:512f")
ports := map[egressservices.PortMap]struct{}{{Protocol: "tcp", MatchPort: 4003, TargetPort: 80}: {}}
ports1 := map[egressservices.PortMap]struct{}{{Protocol: "udp", MatchPort: 4004, TargetPort: 53}: {}}
ports2 := map[egressservices.PortMap]struct{}{{Protocol: "tcp", MatchPort: 4003, TargetPort: 80}: {},
{Protocol: "tcp", MatchPort: 4005, TargetPort: 443}: {}}
fqdnSpec := egressservices.Config{
TailnetTarget: egressservices.TailnetTarget{FQDN: "test"},
Ports: ports,
}
fqdnSpec1 := egressservices.Config{
TailnetTarget: egressservices.TailnetTarget{FQDN: "test"},
Ports: ports1,
}
fqdnSpec2 := egressservices.Config{
TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()},
Ports: ports,
}
fqdnSpec3 := egressservices.Config{
TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()},
Ports: ports2,
}
r := rule{containerPort: 4003, tailnetPort: 80, protocol: "tcp", tailnetIP: tailnetIPv4}
r1 := rule{containerPort: 4003, tailnetPort: 80, protocol: "tcp", tailnetIP: tailnetIPv6}
r2 := rule{tailnetPort: 53, containerPort: 4004, protocol: "udp", tailnetIP: tailnetIPv4}
r3 := rule{tailnetPort: 53, containerPort: 4004, protocol: "udp", tailnetIP: tailnetIPv6}
r4 := rule{containerPort: 4003, tailnetPort: 80, protocol: "tcp", tailnetIP: tailnetIPv4_1}
r5 := rule{containerPort: 4003, tailnetPort: 80, protocol: "tcp", tailnetIP: tailnetIPv6_1}
r6 := rule{containerPort: 4005, tailnetPort: 443, protocol: "tcp", tailnetIP: tailnetIPv4}
tests := []struct {
name string
svcName string
tailnetTargetIPs []netip.Addr
podIP string
spec egressservices.Config
status *egressservices.Status
wantRulesToAdd []rule
wantRulesToDelete []rule
}{
{
name: "add_fqdn_svc_that_does_not_yet_exist",
svcName: "test",
tailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6},
spec: fqdnSpec,
status: &egressservices.Status{},
wantRulesToAdd: []rule{r, r1},
wantRulesToDelete: []rule{},
},
{
name: "fqdn_svc_already_exists",
svcName: "test",
tailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6},
spec: fqdnSpec,
status: &egressservices.Status{
Services: map[string]*egressservices.ServiceStatus{"test": {
TailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6},
TailnetTarget: egressservices.TailnetTarget{FQDN: "test"},
Ports: ports,
}}},
wantRulesToAdd: []rule{},
wantRulesToDelete: []rule{},
},
{
name: "fqdn_svc_already_exists_add_port_remove_port",
svcName: "test",
tailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6},
spec: fqdnSpec1,
status: &egressservices.Status{
Services: map[string]*egressservices.ServiceStatus{"test": {
TailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6},
TailnetTarget: egressservices.TailnetTarget{FQDN: "test"},
Ports: ports,
}}},
wantRulesToAdd: []rule{r2, r3},
wantRulesToDelete: []rule{r, r1},
},
{
name: "fqdn_svc_already_exists_change_fqdn_backend_ips",
svcName: "test",
tailnetTargetIPs: []netip.Addr{tailnetIPv4_1, tailnetIPv6_1},
spec: fqdnSpec,
status: &egressservices.Status{
Services: map[string]*egressservices.ServiceStatus{"test": {
TailnetTargetIPs: []netip.Addr{tailnetIPv4, tailnetIPv6},
TailnetTarget: egressservices.TailnetTarget{FQDN: "test"},
Ports: ports,
}}},
wantRulesToAdd: []rule{r4, r5},
wantRulesToDelete: []rule{r, r1},
},
{
name: "add_ip_service",
svcName: "test",
tailnetTargetIPs: []netip.Addr{tailnetIPv4},
spec: fqdnSpec2,
status: &egressservices.Status{},
wantRulesToAdd: []rule{r},
wantRulesToDelete: []rule{},
},
{
name: "add_ip_service_already_exists",
svcName: "test",
tailnetTargetIPs: []netip.Addr{tailnetIPv4},
spec: fqdnSpec2,
status: &egressservices.Status{
Services: map[string]*egressservices.ServiceStatus{"test": {
TailnetTargetIPs: []netip.Addr{tailnetIPv4},
TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()},
Ports: ports,
}}},
wantRulesToAdd: []rule{},
wantRulesToDelete: []rule{},
},
{
name: "ip_service_add_port",
svcName: "test",
tailnetTargetIPs: []netip.Addr{tailnetIPv4},
spec: fqdnSpec3,
status: &egressservices.Status{
Services: map[string]*egressservices.ServiceStatus{"test": {
TailnetTargetIPs: []netip.Addr{tailnetIPv4},
TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()},
Ports: ports,
}}},
wantRulesToAdd: []rule{r6},
wantRulesToDelete: []rule{},
},
{
name: "ip_service_delete_port",
svcName: "test",
tailnetTargetIPs: []netip.Addr{tailnetIPv4},
spec: fqdnSpec,
status: &egressservices.Status{
Services: map[string]*egressservices.ServiceStatus{"test": {
TailnetTargetIPs: []netip.Addr{tailnetIPv4},
TailnetTarget: egressservices.TailnetTarget{IP: tailnetIPv4.String()},
Ports: ports2,
}}},
wantRulesToAdd: []rule{},
wantRulesToDelete: []rule{r6},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotRulesToAdd, gotRulesToDelete, err := updatesForCfg(tt.svcName, tt.spec, tt.status, tt.tailnetTargetIPs)
if err != nil {
t.Errorf("updatesForSvc() unexpected error %v", err)
return
}
if !reflect.DeepEqual(gotRulesToAdd, tt.wantRulesToAdd) {
t.Errorf("updatesForSvc() got rulesToAdd = \n%v\n want rulesToAdd \n%v", gotRulesToAdd, tt.wantRulesToAdd)
}
if !reflect.DeepEqual(gotRulesToDelete, tt.wantRulesToDelete) {
t.Errorf("updatesForSvc() got rulesToDelete = \n%v\n want rulesToDelete \n%v", gotRulesToDelete, tt.wantRulesToDelete)
}
})
}
}

@ -64,6 +64,7 @@ type settings struct {
// target. // target.
PodIP string PodIP string
HealthCheckAddrPort string HealthCheckAddrPort string
EgressSvcsCfgPath string
} }
func (s *settings) validate() error { func (s *settings) validate() error {
@ -198,7 +199,7 @@ func isOneStepConfig(cfg *settings) bool {
// as an L3 proxy, proxying to an endpoint provided via one of the config env // as an L3 proxy, proxying to an endpoint provided via one of the config env
// vars. // vars.
func isL3Proxy(cfg *settings) bool { func isL3Proxy(cfg *settings) bool {
return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressSvcsCfgPath != ""
} }
// hasKubeStateStore returns true if the state must be stored in a Kubernetes // hasKubeStateStore returns true if the state must be stored in a Kubernetes

@ -0,0 +1,103 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package egressservices contains shared types for exposing tailnet services to
// cluster workloads.
// These are split into a separate package for consumption of
// non-Kubernetes shared libraries and binaries. Be mindful of not increasing
// dependency size for those consumers when adding anything new here.
package egressservices
import (
"encoding"
"fmt"
"net/netip"
"strconv"
"strings"
)
// KeyEgressServices is name of the proxy state Secret field that contains the
// currently applied egress proxy config.
const KeyEgressServices = "egress-services"
// Configs contains the desired configuration for egress services keyed by
// service name.
type Configs map[string]Config
// Config is an egress service configuration.
// TODO(irbekrm): version this?
type Config struct {
// TailnetTarget is the target to which cluster traffic for this service
// should be proxied.
TailnetTarget TailnetTarget `json:"tailnetTarget"`
// Ports contains mappings for ports that can be accessed on the tailnet target.
Ports map[PortMap]struct{} `json:"ports"`
}
// TailnetTarget is the tailnet target to which traffic for the egress service
// should be proxied. Exactly one of IP or FQDN should be set.
type TailnetTarget struct {
// IP is the tailnet IP of the target.
IP string `json:"ip"`
// FQDN is the full tailnet FQDN of the target.
FQDN string `json:"fqdn"`
}
// PorMap is a mapping between match port on which proxy receives cluster
// traffic and target port where traffic received on match port should be
// fowardded to.
type PortMap struct {
Protocol string `json:"protocol"`
MatchPort uint16 `json:"matchPort"`
TargetPort uint16 `json:"targetPort"`
}
// PortMap is used as a Config.Ports map key. Config needs to be serialized/deserialized to/from JSON. JSON only
// supports string map keys, so we need to implement TextMarshaler/TextUnmarshaler to convert PortMap to string and
// back.
var _ encoding.TextMarshaler = PortMap{}
var _ encoding.TextUnmarshaler = &PortMap{}
func (pm *PortMap) UnmarshalText(t []byte) error {
tt := string(t)
ss := strings.Split(tt, ":")
if len(ss) != 3 {
return fmt.Errorf("error unmarshalling portmap from JSON, wants a portmap in form <protocol>:<matchPort>:<targetPor>, got %q", tt)
}
(*pm).Protocol = ss[0]
matchPort, err := strconv.ParseUint(ss[1], 10, 16)
if err != nil {
return fmt.Errorf("error converting match port %q to uint16: %w", ss[1], err)
}
(*pm).MatchPort = uint16(matchPort)
targetPort, err := strconv.ParseUint(ss[2], 10, 16)
if err != nil {
return fmt.Errorf("error converting target port %q to uint16: %w", ss[2], err)
}
(*pm).TargetPort = uint16(targetPort)
return nil
}
func (pm PortMap) MarshalText() ([]byte, error) {
s := fmt.Sprintf("%s:%d:%d", pm.Protocol, pm.MatchPort, pm.TargetPort)
return []byte(s), nil
}
// Status represents the currently configured firewall rules for all egress
// services for a proxy identified by the PodIP.
type Status struct {
PodIP string `json:"podIP"`
// All egress service status keyed by service name.
Services map[string]*ServiceStatus `json:"services"`
}
// ServiceStatus is the currently configured firewall rules for an egress
// service.
type ServiceStatus struct {
Ports map[PortMap]struct{} `json:"ports"`
// TailnetTargetIPs are the tailnet target IPs that were used to
// configure these firewall rules. For a TailnetTarget with IP set, this
// is the same as IP.
TailnetTargetIPs []netip.Addr `json:"tailnetTargetIPs"`
TailnetTarget TailnetTarget `json:"tailnetTarget"`
}

@ -0,0 +1,76 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package egressservices
import (
"encoding/json"
"reflect"
"testing"
)
func Test_jsonUnmarshalConfig(t *testing.T) {
tests := []struct {
name string
bs []byte
wantsCfg Config
wantsErr bool
}{
{
name: "success",
bs: []byte(`{"ports":{"tcp:4003:80":{}}}`),
wantsCfg: Config{Ports: map[PortMap]struct{}{{Protocol: "tcp", MatchPort: 4003, TargetPort: 80}: {}}},
},
{
name: "failure_invalid_format",
bs: []byte(`{"ports":{"tcp:80":{}}}`),
wantsCfg: Config{Ports: map[PortMap]struct{}{}},
wantsErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := Config{}
if gotErr := json.Unmarshal(tt.bs, &cfg); (gotErr != nil) != tt.wantsErr {
t.Errorf("json.Unmarshal returned error %v, wants error %v", gotErr, tt.wantsErr)
}
if !reflect.DeepEqual(cfg, tt.wantsCfg) {
t.Errorf("json.Unmarshal produced Config %v, wants Config %v", cfg, tt.wantsCfg)
}
})
}
}
func Test_jsonMarshalConfig(t *testing.T) {
tests := []struct {
name string
protocol string
matchPort uint16
targetPort uint16
wantsBs []byte
}{
{
name: "success",
protocol: "tcp",
matchPort: 4003,
targetPort: 80,
wantsBs: []byte(`{"tailnetTarget":{"ip":"","fqdn":""},"ports":{"tcp:4003:80":{}}}`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := Config{Ports: map[PortMap]struct{}{{
Protocol: tt.protocol,
MatchPort: tt.matchPort,
TargetPort: tt.targetPort}: {}}}
gotBs, gotErr := json.Marshal(&cfg)
if gotErr != nil {
t.Errorf("json.Marshal(%+#v) returned unexpected error %v", cfg, gotErr)
}
if !reflect.DeepEqual(gotBs, tt.wantsBs) {
t.Errorf("json.Marshal(%+#v) returned '%v', wants '%v'", cfg, string(gotBs), string(tt.wantsBs))
}
})
}
}

@ -257,8 +257,8 @@ type JSONPatch struct {
// It currently (2023-03-02) only supports "add" and "remove" operations. // It currently (2023-03-02) only supports "add" and "remove" operations.
func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error { func (c *client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error {
for _, p := range patch { for _, p := range patch {
if p.Op != "remove" && p.Op != "add" { if p.Op != "remove" && p.Op != "add" && p.Op != "replace" {
panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op)) return fmt.Errorf("unsupported JSON patch operation: %q", p.Op)
} }
} }
return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json")) return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json"))

@ -0,0 +1,79 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package linuxfw
import (
"fmt"
"net/netip"
)
// This file contains functionality to insert portmapping rules for a 'service'.
// These are currently only used by the Kubernetes operator proxies.
// An iptables rule for such a service contains a comment with the service name.
// EnsurePortMapRuleForSvc adds a prerouting rule that forwards traffic received
// on match port and NOT on the provided interface to target IP and target port.
// Rule will only be added if it does not already exists.
func (i *iptablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error {
table := i.getIPTByAddr(targetIP)
args := argsForPortMapRule(svc, tun, targetIP, pm)
exists, err := table.Exists("nat", "PREROUTING", args...)
if err != nil {
return fmt.Errorf("error checking if rule exists: %w", err)
}
if !exists {
return table.Append("nat", "PREROUTING", args...)
}
return nil
}
// DeleteMapRuleForSvc constructs a prerouting rule as would be created by
// EnsurePortMapRuleForSvc with the provided args and, if such a rule exists,
// deletes it.
func (i *iptablesRunner) DeletePortMapRuleForSvc(svc, excludeI string, targetIP netip.Addr, pm PortMap) error {
table := i.getIPTByAddr(targetIP)
args := argsForPortMapRule(svc, excludeI, targetIP, pm)
exists, err := table.Exists("nat", "PREROUTING", args...)
if err != nil {
return fmt.Errorf("error checking if rule exists: %w", err)
}
if exists {
return table.Delete("nat", "PREROUTING", args...)
}
return nil
}
// DeleteSvc constructs all possible rules that would have been created by
// EnsurePortMapRuleForSvc from the provided args and ensures that each one that
// exists is deleted.
func (i *iptablesRunner) DeleteSvc(svc, tun string, targetIPs []netip.Addr, pms []PortMap) error {
for _, tip := range targetIPs {
for _, pm := range pms {
if err := i.DeletePortMapRuleForSvc(svc, tun, tip, pm); err != nil {
return fmt.Errorf("error deleting rule: %w", err)
}
}
}
return nil
}
func argsForPortMapRule(svc, excludeI string, targetIP netip.Addr, pm PortMap) []string {
c := commentForSvc(svc, pm)
return []string{
"!", "-i", excludeI,
"-p", pm.Protocol,
"--dport", fmt.Sprintf("%d", pm.MatchPort),
"-m", "comment", "--comment", c,
"-j", "DNAT",
"--to-destination", fmt.Sprintf("%v:%v", targetIP, pm.TargetPort),
}
}
// commentForSvc generates a comment to be added to an iptables DNAT rule for a
// service. This is for iptables debugging/readability purposes only.
func commentForSvc(svc string, pm PortMap) string {
return fmt.Sprintf("%s:%s:%d -> %s:%d", svc, pm.Protocol, pm.MatchPort, pm.Protocol, pm.TargetPort)
}

@ -0,0 +1,196 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package linuxfw
import (
"net/netip"
"testing"
)
func Test_iptablesRunner_EnsurePortMapRuleForSvc(t *testing.T) {
v4Addr := netip.MustParseAddr("10.0.0.4")
v6Addr := netip.MustParseAddr("fd7a:115c:a1e0::701:b62a")
testPM := PortMap{Protocol: "tcp", MatchPort: 4003, TargetPort: 80}
testPM2 := PortMap{Protocol: "udp", MatchPort: 4004, TargetPort: 53}
v4Rule := argsForPortMapRule("test-svc", "tailscale0", v4Addr, testPM)
tests := []struct {
name string
targetIP netip.Addr
svc string
pm PortMap
precreateSvcRules [][]string
}{
{
name: "pm_for_ipv4",
targetIP: v4Addr,
svc: "test-svc",
pm: testPM,
},
{
name: "pm_for_ipv6",
targetIP: v6Addr,
svc: "test-svc-2",
pm: testPM2,
},
{
name: "add_existing_rule",
targetIP: v4Addr,
svc: "test-svc",
pm: testPM,
precreateSvcRules: [][]string{v4Rule},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
iptr := NewFakeIPTablesRunner()
table := iptr.getIPTByAddr(tt.targetIP)
for _, ruleset := range tt.precreateSvcRules {
mustPrecreatePortMapRule(t, ruleset, table)
}
if err := iptr.EnsurePortMapRuleForSvc(tt.svc, "tailscale0", tt.targetIP, tt.pm); err != nil {
t.Errorf("[unexpected error] iptablesRunner.EnsurePortMapRuleForSvc() = %v", err)
}
args := argsForPortMapRule(tt.svc, "tailscale0", tt.targetIP, tt.pm)
exists, err := table.Exists("nat", "PREROUTING", args...)
if err != nil {
t.Fatalf("error checking if rule exists: %v", err)
}
if !exists {
t.Errorf("expected rule was not created")
}
})
}
}
func Test_iptablesRunner_DeletePortMapRuleForSvc(t *testing.T) {
v4Addr := netip.MustParseAddr("10.0.0.4")
v6Addr := netip.MustParseAddr("fd7a:115c:a1e0::701:b62a")
testPM := PortMap{Protocol: "tcp", MatchPort: 4003, TargetPort: 80}
v4Rule := argsForPortMapRule("test", "tailscale0", v4Addr, testPM)
v6Rule := argsForPortMapRule("test", "tailscale0", v6Addr, testPM)
tests := []struct {
name string
targetIP netip.Addr
svc string
pm PortMap
precreateSvcRules [][]string
}{
{
name: "multiple_rules_ipv4_deleted",
targetIP: v4Addr,
svc: "test",
pm: testPM,
precreateSvcRules: [][]string{v4Rule, v6Rule},
},
{
name: "multiple_rules_ipv6_deleted",
targetIP: v6Addr,
svc: "test",
pm: testPM,
precreateSvcRules: [][]string{v4Rule, v6Rule},
},
{
name: "non-existent_rule_deleted",
targetIP: v4Addr,
svc: "test",
pm: testPM,
precreateSvcRules: [][]string{v6Rule},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
iptr := NewFakeIPTablesRunner()
table := iptr.getIPTByAddr(tt.targetIP)
for _, ruleset := range tt.precreateSvcRules {
mustPrecreatePortMapRule(t, ruleset, table)
}
if err := iptr.DeletePortMapRuleForSvc(tt.svc, "tailscale0", tt.targetIP, tt.pm); err != nil {
t.Errorf("iptablesRunner.DeletePortMapRuleForSvc() errored: %v ", err)
}
deletedRule := argsForPortMapRule(tt.svc, "tailscale0", tt.targetIP, tt.pm)
exists, err := table.Exists("nat", "PREROUTING", deletedRule...)
if err != nil {
t.Fatalf("error verifying that rule does not exist after deletion: %v", err)
}
if exists {
t.Errorf("portmap rule exists after deletion")
}
})
}
}
func Test_iptablesRunner_DeleteSvc(t *testing.T) {
v4Addr := netip.MustParseAddr("10.0.0.4")
v6Addr := netip.MustParseAddr("fd7a:115c:a1e0::701:b62a")
testPM := PortMap{Protocol: "tcp", MatchPort: 4003, TargetPort: 80}
iptr := NewFakeIPTablesRunner()
// create two rules that will consitute svc1
s1R1 := argsForPortMapRule("svc1", "tailscale0", v4Addr, testPM)
mustPrecreatePortMapRule(t, s1R1, iptr.getIPTByAddr(v4Addr))
s1R2 := argsForPortMapRule("svc1", "tailscale0", v6Addr, testPM)
mustPrecreatePortMapRule(t, s1R2, iptr.getIPTByAddr(v6Addr))
// create two rules that will consitute svc2
s2R1 := argsForPortMapRule("svc2", "tailscale0", v4Addr, testPM)
mustPrecreatePortMapRule(t, s2R1, iptr.getIPTByAddr(v4Addr))
s2R2 := argsForPortMapRule("svc2", "tailscale0", v6Addr, testPM)
mustPrecreatePortMapRule(t, s2R2, iptr.getIPTByAddr(v6Addr))
// delete svc1
if err := iptr.DeleteSvc("svc1", "tailscale0", []netip.Addr{v4Addr, v6Addr}, []PortMap{testPM}); err != nil {
t.Fatalf("error deleting service: %v", err)
}
// validate that svc1 no longer exists
svcMustNotExist(t, "svc1", map[string][]string{v4Addr.String(): s1R1, v6Addr.String(): s1R2}, iptr)
// validate that svc2 still exists
svcMustExist(t, "svc2", map[string][]string{v4Addr.String(): s2R1, v6Addr.String(): s2R2}, iptr)
}
func svcMustExist(t *testing.T, svcName string, rules map[string][]string, iptr *iptablesRunner) {
t.Helper()
for dst, ruleset := range rules {
tip := netip.MustParseAddr(dst)
exists, err := iptr.getIPTByAddr(tip).Exists("nat", "PREROUTING", ruleset...)
if err != nil {
t.Fatalf("error checking whether %s exists: %v", svcName, err)
}
if !exists {
t.Fatalf("service %s should be deleted,but found rule for %s", svcName, dst)
}
}
}
func svcMustNotExist(t *testing.T, svcName string, rules map[string][]string, iptr *iptablesRunner) {
t.Helper()
for dst, ruleset := range rules {
tip := netip.MustParseAddr(dst)
exists, err := iptr.getIPTByAddr(tip).Exists("nat", "PREROUTING", ruleset...)
if err != nil {
t.Fatalf("error checking whether %s exists: %v", svcName, err)
}
if exists {
t.Fatalf("service %s should exist, but rule for %s is missing", svcName, dst)
}
}
}
func mustPrecreatePortMapRule(t *testing.T, rules []string, table iptablesInterface) {
t.Helper()
exists, err := table.Exists("nat", "PREROUTING", rules...)
if err != nil {
t.Fatalf("error ensuring that nat PREROUTING table exists: %v", err)
}
if exists {
return
}
if err := table.Append("nat", "PREROUTING", rules...); err != nil {
t.Fatalf("error precreating portmap rule: %v", err)
}
}

@ -682,7 +682,7 @@ func delTSHook(ipt iptablesInterface, table, chain string, logf logger.Logf) err
return nil return nil
} }
// delChain flushs and deletes a chain. If the chain does not exist, it's a no-op // delChain flushes and deletes a chain. If the chain does not exist, it's a no-op
// since the desired state is already achieved. otherwise, it returns an error. // since the desired state is already achieved. otherwise, it returns an error.
func delChain(ipt iptablesInterface, table, chain string) error { func delChain(ipt iptablesInterface, table, chain string) error {
if err := ipt.ClearChain(table, chain); err != nil { if err := ipt.ClearChain(table, chain); err != nil {

@ -0,0 +1,245 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package linuxfw
import (
"errors"
"fmt"
"net/netip"
"reflect"
"strings"
"github.com/google/nftables"
"github.com/google/nftables/binaryutil"
"github.com/google/nftables/expr"
"golang.org/x/sys/unix"
)
// This file contains functionality that is currently (09/2024) used to set up
// routing for the Tailscale Kubernetes operator egress proxies. A tailnet
// service (identified by tailnet IP or FQDN) that gets exposed to cluster
// workloads gets a separate prerouting chain created for it for each IP family
// of the chain's target addresses. Each service's prerouting chain contains one
// or more portmapping rules. A portmapping rule DNATs traffic received on a
// particular port to a port of the tailnet service. Creating a chain per
// service makes it easier to delete a service when no longer needed and helps
// with readability.
// EnsurePortMapRuleForSvc:
// - ensures that nat table exists
// - ensures that there is a prerouting chain for the given service and IP family of the target address in the nat table
// - ensures that there is a portmapping rule mathcing the given portmap (only creates the rule if it does not already exist)
func (n *nftablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error {
t, ch, err := n.ensureChainForSvc(svc, targetIP)
if err != nil {
return fmt.Errorf("error ensuring chain for %s: %w", svc, err)
}
meta := svcPortMapRuleMeta(svc, targetIP, pm)
rule, err := n.findRuleByMetadata(t, ch, meta)
if err != nil {
return fmt.Errorf("error looking up rule: %w", err)
}
if rule != nil {
return nil
}
p, err := protoFromString(pm.Protocol)
if err != nil {
return fmt.Errorf("error converting protocol %s: %w", pm.Protocol, err)
}
rule = portMapRule(t, ch, tun, targetIP, pm.MatchPort, pm.TargetPort, p, meta)
n.conn.InsertRule(rule)
return n.conn.Flush()
}
// DeletePortMapRuleForSvc deletes a portmapping rule in the given service/IP family chain.
// It finds the matching rule using metadata attached to the rule.
// The caller is expected to call DeleteSvc if the whole service (the chain)
// needs to be deleted, so we don't deal with the case where this is the only
// rule in the chain here.
func (n *nftablesRunner) DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error {
table, err := n.getNFTByAddr(targetIP)
if err != nil {
return fmt.Errorf("error setting up nftables for IP family of %s: %w", targetIP, err)
}
t, err := getTableIfExists(n.conn, table.Proto, "nat")
if err != nil {
return fmt.Errorf("error checking if nat table exists: %w", err)
}
if t == nil {
return nil
}
ch, err := getChainFromTable(n.conn, t, svc)
if err != nil && !errors.Is(err, errorChainNotFound{t.Name, svc}) {
return fmt.Errorf("error checking if chain %s exists: %w", svc, err)
}
if errors.Is(err, errorChainNotFound{t.Name, svc}) {
return nil // service chain does not exist, so neither does the portmapping rule
}
meta := svcPortMapRuleMeta(svc, targetIP, pm)
rule, err := n.findRuleByMetadata(t, ch, meta)
if err != nil {
return fmt.Errorf("error checking if rule exists: %w", err)
}
if rule == nil {
return nil
}
if err := n.conn.DelRule(rule); err != nil {
return fmt.Errorf("error deleting rule: %w", err)
}
return n.conn.Flush()
}
// DeleteSvc deletes the chains for the given service if any exist.
func (n *nftablesRunner) DeleteSvc(svc, tun string, targetIPs []netip.Addr, pm []PortMap) error {
for _, tip := range targetIPs {
table, err := n.getNFTByAddr(tip)
if err != nil {
return fmt.Errorf("error setting up nftables for IP family of %s: %w", tip, err)
}
t, err := getTableIfExists(n.conn, table.Proto, "nat")
if err != nil {
return fmt.Errorf("error checking if nat table exists: %w", err)
}
if t == nil {
return nil
}
ch, err := getChainFromTable(n.conn, t, svc)
if err != nil && !errors.Is(err, errorChainNotFound{t.Name, svc}) {
return fmt.Errorf("error checking if chain %s exists: %w", svc, err)
}
if errors.Is(err, errorChainNotFound{t.Name, svc}) {
return nil
}
n.conn.DelChain(ch)
}
return n.conn.Flush()
}
func portMapRule(t *nftables.Table, ch *nftables.Chain, tun string, targetIP netip.Addr, matchPort, targetPort uint16, proto uint8, meta []byte) *nftables.Rule {
var fam uint32
if targetIP.Is4() {
fam = unix.NFPROTO_IPV4
} else {
fam = unix.NFPROTO_IPV6
}
rule := &nftables.Rule{
Table: t,
Chain: ch,
UserData: meta,
Exprs: []expr.Any{
&expr.Meta{Key: expr.MetaKeyOIFNAME, Register: 1},
&expr.Cmp{
Op: expr.CmpOpNeq,
Register: 1,
Data: []byte(tun),
},
&expr.Meta{Key: expr.MetaKeyL4PROTO, Register: 1},
&expr.Cmp{
Op: expr.CmpOpEq,
Register: 1,
Data: []byte{proto},
},
&expr.Payload{
DestRegister: 1,
Base: expr.PayloadBaseTransportHeader,
Offset: 2,
Len: 2,
},
&expr.Cmp{
Op: expr.CmpOpEq,
Register: 1,
Data: binaryutil.BigEndian.PutUint16(matchPort),
},
&expr.Immediate{
Register: 1,
Data: targetIP.AsSlice(),
},
&expr.Immediate{
Register: 2,
Data: binaryutil.BigEndian.PutUint16(targetPort),
},
&expr.NAT{
Type: expr.NATTypeDestNAT,
Family: fam,
RegAddrMin: 1,
RegAddrMax: 1,
RegProtoMin: 2,
RegProtoMax: 2,
},
},
}
return rule
}
// svcPortMapRuleMeta generates metadata for a rule.
// This metadata can then be used to find the rule.
// https://github.com/google/nftables/issues/48
func svcPortMapRuleMeta(svcName string, targetIP netip.Addr, pm PortMap) []byte {
return []byte(fmt.Sprintf("svc:%s,targetIP:%s:matchPort:%v,targetPort:%v,proto:%v", svcName, targetIP.String(), pm.MatchPort, pm.TargetPort, pm.Protocol))
}
func (n *nftablesRunner) findRuleByMetadata(t *nftables.Table, ch *nftables.Chain, meta []byte) (*nftables.Rule, error) {
if n.conn == nil || t == nil || ch == nil || len(meta) == 0 {
return nil, nil
}
rules, err := n.conn.GetRules(t, ch)
if err != nil {
return nil, fmt.Errorf("error listing rules: %w", err)
}
for _, rule := range rules {
if reflect.DeepEqual(rule.UserData, meta) {
return rule, nil
}
}
return nil, nil
}
func (n *nftablesRunner) ensureChainForSvc(svc string, targetIP netip.Addr) (*nftables.Table, *nftables.Chain, error) {
polAccept := nftables.ChainPolicyAccept
table, err := n.getNFTByAddr(targetIP)
if err != nil {
return nil, nil, fmt.Errorf("error setting up nftables for IP family of %v: %w", targetIP, err)
}
nat, err := createTableIfNotExist(n.conn, table.Proto, "nat")
if err != nil {
return nil, nil, fmt.Errorf("error ensuring nat table: %w", err)
}
svcCh, err := getOrCreateChain(n.conn, chainInfo{
table: nat,
name: svc,
chainType: nftables.ChainTypeNAT,
chainHook: nftables.ChainHookPrerouting,
chainPriority: nftables.ChainPriorityNATDest,
chainPolicy: &polAccept,
})
if err != nil {
return nil, nil, fmt.Errorf("error ensuring prerouting chain: %w", err)
}
return nat, svcCh, nil
}
// // PortMap is the port mapping for a service rule.
type PortMap struct {
// MatchPort is the local port to which the rule should apply.
MatchPort uint16
// TargetPort is the port to which the traffic should be forwarded.
TargetPort uint16
// Protocol is the protocol to match packets on. Only TCP and UDP are
// supported.
Protocol string
}
func protoFromString(s string) (uint8, error) {
switch strings.ToLower(s) {
case "tcp":
return unix.IPPROTO_TCP, nil
case "udp":
return unix.IPPROTO_UDP, nil
default:
return 0, fmt.Errorf("unrecognized protocol: %q", s)
}
}

@ -0,0 +1,156 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build linux
package linuxfw
import (
"net/netip"
"testing"
"github.com/google/nftables"
)
// This test creates a temporary network namespace for the nftables rules being
// set up, so it needs to run in a privileged mode. Locally it needs to be run
// by root, else it will be silently skipped. In CI it runs in a privileged
// container.
func Test_nftablesRunner_EnsurePortMapRuleForSvc(t *testing.T) {
conn := newSysConn(t)
runner := newFakeNftablesRunnerWithConn(t, conn, true)
ipv4, ipv6 := netip.MustParseAddr("100.99.99.99"), netip.MustParseAddr("fd7a:115c:a1e0::701:b62a")
pmTCP := PortMap{MatchPort: 4003, TargetPort: 80, Protocol: "TCP"}
pmTCP1 := PortMap{MatchPort: 4004, TargetPort: 443, Protocol: "TCP"}
// Create a rule for service 'foo' to forward TCP traffic to IPv4 endpoint
runner.EnsurePortMapRuleForSvc("foo", "tailscale0", ipv4, pmTCP)
svcChains(t, 1, conn)
chainRuleCount(t, "foo", 1, conn, nftables.TableFamilyIPv4)
chainRule(t, "foo", ipv4, pmTCP, runner, nftables.TableFamilyIPv4)
// Create another rule for service 'foo' to forward TCP traffic to the
// same IPv4 endpoint, but to a different port.
runner.EnsurePortMapRuleForSvc("foo", "tailscale0", ipv4, pmTCP1)
svcChains(t, 1, conn)
chainRuleCount(t, "foo", 2, conn, nftables.TableFamilyIPv4)
chainRule(t, "foo", ipv4, pmTCP1, runner, nftables.TableFamilyIPv4)
// Create a rule for service 'foo' to forward TCP traffic to an IPv6 endpoint
runner.EnsurePortMapRuleForSvc("foo", "tailscale0", ipv6, pmTCP)
svcChains(t, 2, conn)
chainRuleCount(t, "foo", 1, conn, nftables.TableFamilyIPv6)
chainRule(t, "foo", ipv6, pmTCP, runner, nftables.TableFamilyIPv6)
// Create a rule for service 'bar' to forward TCP traffic to IPv4 endpoint
runner.EnsurePortMapRuleForSvc("bar", "tailscale0", ipv4, pmTCP)
svcChains(t, 3, conn)
chainRuleCount(t, "bar", 1, conn, nftables.TableFamilyIPv4)
chainRule(t, "bar", ipv4, pmTCP, runner, nftables.TableFamilyIPv4)
// Create a rule for service 'bar' to forward TCP traffic to an IPv6 endpoint
runner.EnsurePortMapRuleForSvc("bar", "tailscale0", ipv6, pmTCP)
svcChains(t, 4, conn)
chainRuleCount(t, "bar", 1, conn, nftables.TableFamilyIPv6)
chainRule(t, "bar", ipv6, pmTCP, runner, nftables.TableFamilyIPv6)
// Delete service bar
runner.DeleteSvc("bar", "tailscale0", []netip.Addr{ipv4, ipv6}, []PortMap{pmTCP})
svcChains(t, 2, conn)
// Delete a rule from service foo
runner.DeletePortMapRuleForSvc("foo", "tailscale0", ipv4, pmTCP)
svcChains(t, 2, conn)
chainRuleCount(t, "foo", 1, conn, nftables.TableFamilyIPv4)
// Delete service foo
runner.DeleteSvc("foo", "tailscale0", []netip.Addr{ipv4, ipv6}, []PortMap{pmTCP, pmTCP1})
svcChains(t, 0, conn)
}
// svcChains verifies that the expected number of chains exist (for either IP
// family) and that each of them is configured as NAT prerouting chain.
func svcChains(t *testing.T, wantCount int, conn *nftables.Conn) {
t.Helper()
chains, err := conn.ListChains()
if err != nil {
t.Fatalf("error listing chains: %v", err)
}
if len(chains) != wantCount {
t.Fatalf("wants %d chains, got %d", wantCount, len(chains))
}
for _, ch := range chains {
if *ch.Policy != nftables.ChainPolicyAccept {
t.Fatalf("chain %s has unexpected policy %v", ch.Name, *ch.Policy)
}
if ch.Type != nftables.ChainTypeNAT {
t.Fatalf("chain %s has unexpected type %v", ch.Name, ch.Type)
}
if *ch.Hooknum != *nftables.ChainHookPrerouting {
t.Fatalf("chain %s is attached to unexpected hook %v", ch.Name, ch.Hooknum)
}
if *ch.Priority != *nftables.ChainPriorityNATDest {
t.Fatalf("chain %s has unexpected priority %v", ch.Name, ch.Priority)
}
}
}
// chainRuleCount returns number of rules in a chain identified by service name and IP family.
func chainRuleCount(t *testing.T, svc string, count int, conn *nftables.Conn, fam nftables.TableFamily) {
t.Helper()
chains, err := conn.ListChainsOfTableFamily(fam)
if err != nil {
t.Fatalf("error listing chains: %v", err)
}
found := false
for _, ch := range chains {
if ch.Name == svc {
found = true
rules, err := conn.GetRules(ch.Table, ch)
if err != nil {
t.Fatalf("error getting rules: %v", err)
}
if len(rules) != count {
t.Fatalf("unexpected number of rules, wants %d got %d", count, len(rules))
}
break
}
}
if !found {
t.Fatalf("chain for service %s does not exist", svc)
}
}
// chainRule verifies that rule for the provided target IP and PortMap exists in
// a chain identified by service name and IP family.
func chainRule(t *testing.T, svc string, targetIP netip.Addr, pm PortMap, runner *nftablesRunner, fam nftables.TableFamily) {
t.Helper()
chains, err := runner.conn.ListChainsOfTableFamily(fam)
if err != nil {
t.Fatalf("error listing chains: %v", err)
}
var chain *nftables.Chain
for _, ch := range chains {
if ch.Name == svc {
chain = ch
break
}
}
if chain == nil {
t.Fatalf("chain for service %s does not exist", svc)
}
meta := svcPortMapRuleMeta(svc, targetIP, pm)
p, err := protoFromString(pm.Protocol)
if err != nil {
t.Fatalf("error converting protocol: %v", err)
}
wantsRule := portMapRule(chain.Table, chain, "tailscale0", targetIP, pm.MatchPort, pm.TargetPort, p, meta)
gotRule, err := findRule(runner.conn, wantsRule)
if err != nil {
t.Fatalf("error looking up rule: %v", err)
}
if gotRule == nil {
t.Fatalf("rule not found")
}
}

@ -569,6 +569,12 @@ type NetfilterRunner interface {
// the Tailscale interface, as used in the Kubernetes egress proxies. // the Tailscale interface, as used in the Kubernetes egress proxies.
DNATNonTailscaleTraffic(exemptInterface string, dst netip.Addr) error DNATNonTailscaleTraffic(exemptInterface string, dst netip.Addr) error
EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error
DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm PortMap) error
DeleteSvc(svc, tun string, targetIPs []netip.Addr, pm []PortMap) error
// ClampMSSToPMTU adds a rule to the mangle/FORWARD chain to clamp MSS for // ClampMSSToPMTU adds a rule to the mangle/FORWARD chain to clamp MSS for
// traffic destined for the provided tun interface. // traffic destined for the provided tun interface.
ClampMSSToPMTU(tun string, addr netip.Addr) error ClampMSSToPMTU(tun string, addr netip.Addr) error

@ -537,6 +537,17 @@ func (n *fakeIPTablesRunner) AddSNATRuleForDst(src, dst netip.Addr) error {
func (n *fakeIPTablesRunner) DNATNonTailscaleTraffic(exemptInterface string, dst netip.Addr) error { func (n *fakeIPTablesRunner) DNATNonTailscaleTraffic(exemptInterface string, dst netip.Addr) error {
return errors.New("not implemented") return errors.New("not implemented")
} }
func (n *fakeIPTablesRunner) EnsurePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm linuxfw.PortMap) error {
return errors.New("not implemented")
}
func (n *fakeIPTablesRunner) DeletePortMapRuleForSvc(svc, tun string, targetIP netip.Addr, pm linuxfw.PortMap) error {
return errors.New("not implemented")
}
func (n *fakeIPTablesRunner) DeleteSvc(svc, tun string, targetIPs []netip.Addr, pm []linuxfw.PortMap) error {
return errors.New("not implemented")
}
func (n *fakeIPTablesRunner) ClampMSSToPMTU(tun string, addr netip.Addr) error { func (n *fakeIPTablesRunner) ClampMSSToPMTU(tun string, addr netip.Addr) error {
return errors.New("not implemented") return errors.New("not implemented")

Loading…
Cancel
Save