cmd/{k8s-operator,containerboot},k8s-operator,kube: reconcile ExternalName Services for ProxyGroup (#13635)

Adds a new reconciler that reconciles ExternalName Services that define a
tailnet target that should be exposed to cluster workloads on a ProxyGroup's
proxies.
The reconciler ensures that for each such service, the config mounted to
the proxies is updated with the tailnet target definition and that
and EndpointSlice and ClusterIP Service are created for the service.

Adds a new reconciler that ensures that as proxy Pods become ready to route
traffic to a tailnet target, the EndpointSlice for the target is updated
with the Pods' endpoints.

Updates tailscale/tailscale#13406

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
pull/13594/head
Irbe Krumina 2 months ago committed by GitHub
parent 9bd158cc09
commit e8bb5d1be5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -179,9 +179,6 @@ func (ep *egressProxy) syncEgressConfigs(cfgs *egressservices.Configs, status *e
// For each tailnet target, set up SNAT from the local tailnet device address of the matching // For each tailnet target, set up SNAT from the local tailnet device address of the matching
// family. // family.
for _, t := range tailnetTargetIPs { for _, t := range tailnetTargetIPs {
if t.Is6() && !ep.nfr.HasIPV6NAT() {
continue
}
var local netip.Addr var local netip.Addr
for _, pfx := range n.NetMap.SelfNode.Addresses().All() { for _, pfx := range n.NetMap.SelfNode.Addresses().All() {
if !pfx.IsSingleIP() { if !pfx.IsSingleIP() {
@ -374,6 +371,9 @@ func (ep *egressProxy) getStatus(ctx context.Context) (*egressservices.Status, e
// Secret and updates proxy's tailnet addresses. // Secret and updates proxy's tailnet addresses.
func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, n ipn.Notify) error { func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Status, n ipn.Notify) error {
// Pod IP is used to determine if a stored status applies to THIS proxy Pod. // Pod IP is used to determine if a stored status applies to THIS proxy Pod.
if status == nil {
status = &egressservices.Status{}
}
status.PodIP = ep.podIP status.PodIP = ep.podIP
secret, err := ep.kc.GetSecret(ctx, ep.stateSecret) secret, err := ep.kc.GetSecret(ctx, ep.stateSecret)
if err != nil { if err != nil {
@ -399,13 +399,19 @@ func (ep *egressProxy) setStatus(ctx context.Context, status *egressservices.Sta
// tailnetTargetIPsForSvc returns the tailnet IPs to which traffic for this // tailnetTargetIPsForSvc returns the tailnet IPs to which traffic for this
// egress service should be proxied. The egress service can be configured by IP // egress service should be proxied. The egress service can be configured by IP
// or by FQDN. If it's configured by IP, just return that. If it's configured by // or by FQDN. If it's configured by IP, just return that. If it's configured by
// FQDN, resolve the FQDN and return the resolved IPs. // FQDN, resolve the FQDN and return the resolved IPs. It checks if the
// netfilter runner supports IPv6 NAT and skips any IPv6 addresses if it
// doesn't.
func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.Notify) (addrs []netip.Addr, err error) { func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.Notify) (addrs []netip.Addr, err error) {
if svc.TailnetTarget.IP != "" { if svc.TailnetTarget.IP != "" {
addr, err := netip.ParseAddr(svc.TailnetTarget.IP) addr, err := netip.ParseAddr(svc.TailnetTarget.IP)
if err != nil { if err != nil {
return nil, fmt.Errorf("error parsing tailnet target IP: %w", err) return nil, fmt.Errorf("error parsing tailnet target IP: %w", err)
} }
if addr.Is6() && !ep.nfr.HasIPV6NAT() {
log.Printf("tailnet target is an IPv6 address, but this host does not support IPv6 in the chosen firewall mode. This will probably not work.")
return addrs, nil
}
return []netip.Addr{addr}, nil return []netip.Addr{addr}, nil
} }
@ -429,6 +435,10 @@ func (ep *egressProxy) tailnetTargetIPsForSvc(svc egressservices.Config, n ipn.N
} }
if nodeFound { if nodeFound {
for _, addr := range node.Addresses().AsSlice() { for _, addr := range node.Addresses().AsSlice() {
if addr.Addr().Is6() && !ep.nfr.HasIPV6NAT() {
log.Printf("tailnet target %v is an IPv6 address, but this host does not support IPv6 in the chosen firewall mode, skipping.", addr.Addr().String())
continue
}
addrs = append(addrs, addr.Addr()) addrs = append(addrs, addr.Addr())
} }
// Egress target endpoints configured via FQDN are stored, so // Egress target endpoints configured via FQDN are stored, so
@ -494,10 +504,6 @@ func ensureServiceDeleted(svcName string, svc *egressservices.ServiceStatus, nfr
func ensureRulesAdded(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error { func ensureRulesAdded(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error {
for svc, rules := range rulesPerSvc { for svc, rules := range rulesPerSvc {
for _, rule := range rules { for _, rule := range rules {
if rule.tailnetIP.Is6() && !nfr.HasIPV6NAT() {
log.Printf("host does not support IPv6 NAT; skipping IPv6 target %s", rule.tailnetIP)
continue
}
log.Printf("ensureRulesAdded svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol) log.Printf("ensureRulesAdded svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol)
if err := nfr.EnsurePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil { if err := nfr.EnsurePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil {
return fmt.Errorf("error ensuring rule: %w", err) return fmt.Errorf("error ensuring rule: %w", err)
@ -513,10 +519,6 @@ func ensureRulesAdded(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner
func ensureRulesDeleted(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error { func ensureRulesDeleted(rulesPerSvc map[string][]rule, nfr linuxfw.NetfilterRunner) error {
for svc, rules := range rulesPerSvc { for svc, rules := range rulesPerSvc {
for _, rule := range rules { for _, rule := range rules {
if rule.tailnetIP.Is6() && !nfr.HasIPV6NAT() {
log.Printf("host does not support IPv6 NAT; skipping IPv6 target %s", rule.tailnetIP)
continue
}
log.Printf("ensureRulesDeleted svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol) log.Printf("ensureRulesDeleted svc %s tailnetTarget %s container port %d tailnet port %d protocol %s", svc, rule.tailnetIP, rule.containerPort, rule.tailnetPort, rule.protocol)
if err := nfr.DeletePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil { if err := nfr.DeletePortMapRuleForSvc(svc, tailscaleTunInterface, rule.tailnetIP, linuxfw.PortMap{MatchPort: rule.containerPort, TargetPort: rule.tailnetPort, Protocol: rule.protocol}); err != nil {
return fmt.Errorf("error deleting rule: %w", err) return fmt.Errorf("error deleting rule: %w", err)

@ -690,6 +690,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
tailscale.com/k8s-operator/sessionrecording/spdy from tailscale.com/k8s-operator/sessionrecording tailscale.com/k8s-operator/sessionrecording/spdy from tailscale.com/k8s-operator/sessionrecording
tailscale.com/k8s-operator/sessionrecording/tsrecorder from tailscale.com/k8s-operator/sessionrecording+ tailscale.com/k8s-operator/sessionrecording/tsrecorder from tailscale.com/k8s-operator/sessionrecording+
tailscale.com/k8s-operator/sessionrecording/ws from tailscale.com/k8s-operator/sessionrecording tailscale.com/k8s-operator/sessionrecording/ws from tailscale.com/k8s-operator/sessionrecording
tailscale.com/kube/egressservices from tailscale.com/cmd/k8s-operator
tailscale.com/kube/kubeapi from tailscale.com/ipn/store/kubestore+ tailscale.com/kube/kubeapi from tailscale.com/ipn/store/kubestore+
tailscale.com/kube/kubeclient from tailscale.com/ipn/store/kubestore tailscale.com/kube/kubeclient from tailscale.com/ipn/store/kubestore
tailscale.com/kube/kubetypes from tailscale.com/cmd/k8s-operator+ tailscale.com/kube/kubetypes from tailscale.com/cmd/k8s-operator+

@ -22,7 +22,7 @@ rules:
resources: ["ingressclasses"] resources: ["ingressclasses"]
verbs: ["get", "list", "watch"] verbs: ["get", "list", "watch"]
- apiGroups: ["tailscale.com"] - apiGroups: ["tailscale.com"]
resources: ["connectors", "connectors/status", "proxyclasses", "proxyclasses/status"] resources: ["connectors", "connectors/status", "proxyclasses", "proxyclasses/status", "proxygroups", "proxygroups/status"]
verbs: ["get", "list", "watch", "update"] verbs: ["get", "list", "watch", "update"]
- apiGroups: ["tailscale.com"] - apiGroups: ["tailscale.com"]
resources: ["dnsconfigs", "dnsconfigs/status"] resources: ["dnsconfigs", "dnsconfigs/status"]
@ -53,12 +53,15 @@ rules:
- apiGroups: [""] - apiGroups: [""]
resources: ["secrets", "serviceaccounts", "configmaps"] resources: ["secrets", "serviceaccounts", "configmaps"]
verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get","list","watch"]
- apiGroups: ["apps"] - apiGroups: ["apps"]
resources: ["statefulsets", "deployments"] resources: ["statefulsets", "deployments"]
verbs: ["create","delete","deletecollection","get","list","patch","update","watch"] verbs: ["create","delete","deletecollection","get","list","patch","update","watch"]
- apiGroups: ["discovery.k8s.io"] - apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"] resources: ["endpointslices"]
verbs: ["get", "list", "watch"] verbs: ["get", "list", "watch", "create", "update", "deletecollection"]
- apiGroups: ["rbac.authorization.k8s.io"] - apiGroups: ["rbac.authorization.k8s.io"]
resources: ["roles", "rolebindings"] resources: ["roles", "rolebindings"]
verbs: ["get", "create", "patch", "update", "list", "watch"] verbs: ["get", "create", "patch", "update", "list", "watch"]

@ -4360,6 +4360,8 @@ rules:
- connectors/status - connectors/status
- proxyclasses - proxyclasses
- proxyclasses/status - proxyclasses/status
- proxygroups
- proxygroups/status
verbs: verbs:
- get - get
- list - list
@ -4420,6 +4422,14 @@ rules:
- patch - patch
- update - update
- watch - watch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups: - apiGroups:
- apps - apps
resources: resources:
@ -4442,6 +4452,9 @@ rules:
- get - get
- list - list
- watch - watch
- create
- update
- deletecollection
- apiGroups: - apiGroups:
- rbac.authorization.k8s.io - rbac.authorization.k8s.io
resources: resources:

@ -0,0 +1,188 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
tsoperator "tailscale.com/k8s-operator"
"tailscale.com/kube/egressservices"
"tailscale.com/types/ptr"
)
// egressEpsReconciler reconciles EndpointSlices for tailnet services exposed to cluster via egress ProxyGroup proxies.
type egressEpsReconciler struct {
client.Client
logger *zap.SugaredLogger
tsNamespace string
}
// Reconcile reconciles an EndpointSlice for a tailnet service. It updates the EndpointSlice with the endpoints of
// those ProxyGroup Pods that are ready to route traffic to the tailnet service.
// It compares tailnet service state stored in egress proxy state Secrets by containerboot with the desired
// configuration stored in proxy-cfg ConfigMap to determine if the endpoint is ready.
func (er *egressEpsReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
l := er.logger.With("Service", req.NamespacedName)
l.Debugf("starting reconcile")
defer l.Debugf("reconcile finished")
eps := new(discoveryv1.EndpointSlice)
err = er.Get(ctx, req.NamespacedName, eps)
if apierrors.IsNotFound(err) {
l.Debugf("EndpointSlice not found")
return reconcile.Result{}, nil
}
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to get EndpointSlice: %w", err)
}
if !eps.DeletionTimestamp.IsZero() {
l.Debugf("EnpointSlice is being deleted")
return res, nil
}
// Get the user-created ExternalName Service and use its status conditions to determine whether cluster
// resources are set up for this tailnet service.
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: eps.Labels[labelExternalSvcName],
Namespace: eps.Labels[labelExternalSvcNamespace],
},
}
err = er.Get(ctx, client.ObjectKeyFromObject(svc), svc)
if apierrors.IsNotFound(err) {
l.Infof("ExternalName Service %s/%s not found, perhaps it was deleted", svc.Namespace, svc.Name)
return res, nil
}
if err != nil {
return res, fmt.Errorf("error retrieving ExternalName Service: %w", err)
}
if !tsoperator.EgressServiceIsValidAndConfigured(svc) {
l.Infof("Cluster resources for ExternalName Service %s/%s are not yet configured", svc.Namespace, svc.Name)
return res, nil
}
// TODO(irbekrm): currently this reconcile loop runs all the checks every time it's triggered, which is
// wasteful. Once we have a Ready condition for ExternalName Services for ProxyGroup, use the condition to
// determine if a reconcile is needed.
oldEps := eps.DeepCopy()
proxyGroupName := eps.Labels[labelProxyGroup]
tailnetSvc := tailnetSvcName(svc)
l = l.With("tailnet-service-name", tailnetSvc)
// Retrieve the desired tailnet service configuration from the ConfigMap.
_, cfgs, err := egressSvcsConfigs(ctx, er.Client, proxyGroupName, er.tsNamespace)
if err != nil {
return res, fmt.Errorf("error retrieving tailnet services configuration: %w", err)
}
cfg, ok := (*cfgs)[tailnetSvc]
if !ok {
l.Infof("[unexpected] configuration for tailnet service %s not found", tailnetSvc)
return res, nil
}
// Check which Pods in ProxyGroup are ready to route traffic to this
// egress service.
podList := &corev1.PodList{}
if err := er.List(ctx, podList, client.MatchingLabels(map[string]string{labelProxyGroup: proxyGroupName})); err != nil {
return res, fmt.Errorf("error listing Pods for ProxyGroup %s: %w", proxyGroupName, err)
}
newEndpoints := make([]discoveryv1.Endpoint, 0)
for _, pod := range podList.Items {
ready, err := er.podIsReadyToRouteTraffic(ctx, pod, &cfg, tailnetSvc, l)
if err != nil {
return res, fmt.Errorf("error verifying if Pod is ready to route traffic: %w", err)
}
if !ready {
continue // maybe next time
}
newEndpoints = append(newEndpoints, discoveryv1.Endpoint{
Hostname: (*string)(&pod.UID),
Addresses: []string{pod.Status.PodIP},
Conditions: discoveryv1.EndpointConditions{
Ready: ptr.To(true),
Serving: ptr.To(true),
Terminating: ptr.To(false),
},
})
}
// Note that Endpoints are being overwritten with the currently valid endpoints so we don't need to explicitly
// run a cleanup for deleted Pods etc.
eps.Endpoints = newEndpoints
if !reflect.DeepEqual(eps, oldEps) {
l.Infof("Updating EndpointSlice to ensure traffic is routed to ready proxy Pods")
if err := er.Update(ctx, eps); err != nil {
return res, fmt.Errorf("error updating EndpointSlice: %w", err)
}
}
return res, nil
}
// podIsReadyToRouteTraffic returns true if it appears that the proxy Pod has configured firewall rules to be able to
// route traffic to the given tailnet service. It retrieves the proxy's state Secret and compares the tailnet service
// status written there to the desired service configuration.
func (er *egressEpsReconciler) podIsReadyToRouteTraffic(ctx context.Context, pod corev1.Pod, cfg *egressservices.Config, tailnetSvcName string, l *zap.SugaredLogger) (bool, error) {
l = l.With("proxy_pod", pod.Name)
l.Debugf("checking whether proxy is ready to route to egress service")
if !pod.DeletionTimestamp.IsZero() {
l.Debugf("proxy Pod is being deleted, ignore")
return false, nil
}
podIP := pod.Status.PodIP
stateS := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
err := er.Get(ctx, client.ObjectKeyFromObject(stateS), stateS)
if apierrors.IsNotFound(err) {
l.Debugf("proxy does not have a state Secret, waiting...")
return false, nil
}
if err != nil {
return false, fmt.Errorf("error getting state Secret: %w", err)
}
svcStatusBS := stateS.Data[egressservices.KeyEgressServices]
if len(svcStatusBS) == 0 {
l.Debugf("proxy's state Secret does not contain egress services status, waiting...")
return false, nil
}
svcStatus := &egressservices.Status{}
if err := json.Unmarshal(svcStatusBS, svcStatus); err != nil {
return false, fmt.Errorf("error unmarshalling egress service status: %w", err)
}
if !strings.EqualFold(podIP, svcStatus.PodIP) {
l.Infof("proxy's egress service status is for Pod IP %s, current proxy's Pod IP %s, waiting for the proxy to reconfigure...", svcStatus.PodIP, podIP)
return false, nil
}
st, ok := (*svcStatus).Services[tailnetSvcName]
if !ok {
l.Infof("proxy's state Secret does not have egress service status, waiting...")
return false, nil
}
if !reflect.DeepEqual(cfg.TailnetTarget, st.TailnetTarget) {
l.Infof("proxy has configured egress service for tailnet target %v, current target is %v, waiting for proxy to reconfigure...", st.TailnetTarget, cfg.TailnetTarget)
return false, nil
}
if !reflect.DeepEqual(cfg.Ports, st.Ports) {
l.Debugf("proxy has configured egress service for ports %#+v, wants ports %#+v, waiting for proxy to reconfigure", st.Ports, cfg.Ports)
return false, nil
}
l.Debugf("proxy is ready to route traffic to egress service")
return true, nil
}

@ -0,0 +1,195 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"encoding/json"
"fmt"
"math/rand/v2"
"testing"
"github.com/AlekSi/pointer"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/egressservices"
"tailscale.com/tstest"
"tailscale.com/util/mak"
)
func TestTailscaleEgressEndpointSlices(t *testing.T) {
clock := tstest.NewClock(tstest.ClockOpts{})
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
UID: types.UID("1234-UID"),
Annotations: map[string]string{
AnnotationTailnetTargetFQDN: "foo.bar.ts.net",
AnnotationProxyGroup: "foo",
},
},
Spec: corev1.ServiceSpec{
ExternalName: "placeholder",
Type: corev1.ServiceTypeExternalName,
Selector: nil,
Ports: []corev1.ServicePort{
{
Name: "http",
Protocol: "TCP",
Port: 80,
},
},
},
Status: corev1.ServiceStatus{
Conditions: []metav1.Condition{
condition(tsapi.EgressSvcConfigured, metav1.ConditionTrue, "", "", clock),
condition(tsapi.EgressSvcValid, metav1.ConditionTrue, "", "", clock),
},
},
}
port := randomPort()
cm := configMapForSvc(t, svc, port)
fc := fake.NewClientBuilder().
WithScheme(tsapi.GlobalScheme).
WithObjects(svc, cm).
WithStatusSubresource(svc).
Build()
zl, err := zap.NewDevelopment()
if err != nil {
t.Fatal(err)
}
er := &egressEpsReconciler{
Client: fc,
logger: zl.Sugar(),
tsNamespace: "operator-ns",
}
eps := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "operator-ns",
Labels: map[string]string{labelExternalSvcName: "test", labelExternalSvcNamespace: "default", labelProxyGroup: "foo"},
},
AddressType: discoveryv1.AddressTypeIPv4,
}
mustCreate(t, fc, eps)
t.Run("no_proxy_group_resources", func(t *testing.T) {
expectReconciled(t, er, "operator-ns", "foo") // should not error
})
t.Run("no_pods_ready_to_route_traffic", func(t *testing.T) {
pod, stateS := podAndSecretForProxyGroup("foo")
mustCreate(t, fc, pod)
mustCreate(t, fc, stateS)
expectReconciled(t, er, "operator-ns", "foo") // should not error
})
t.Run("pods_are_ready_to_route_traffic", func(t *testing.T) {
pod, stateS := podAndSecretForProxyGroup("foo")
stBs := serviceStatusForPodIP(t, svc, pod.Status.PodIP, port)
mustUpdate(t, fc, "operator-ns", stateS.Name, func(s *corev1.Secret) {
mak.Set(&s.Data, egressservices.KeyEgressServices, stBs)
})
expectReconciled(t, er, "operator-ns", "foo")
eps.Endpoints = append(eps.Endpoints, discoveryv1.Endpoint{
Addresses: []string{pod.Status.PodIP},
Hostname: pointer.To("foo"),
Conditions: discoveryv1.EndpointConditions{
Serving: pointer.ToBool(true),
Ready: pointer.ToBool(true),
Terminating: pointer.ToBool(false),
},
})
expectEqual(t, fc, eps, nil)
})
}
func configMapForSvc(t *testing.T, svc *corev1.Service, p uint16) *corev1.ConfigMap {
t.Helper()
ports := make(map[egressservices.PortMap]struct{})
for _, port := range svc.Spec.Ports {
ports[egressservices.PortMap{Protocol: string(port.Protocol), MatchPort: p, TargetPort: uint16(port.Port)}] = struct{}{}
}
cfg := egressservices.Config{
Ports: ports,
}
if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" {
cfg.TailnetTarget = egressservices.TailnetTarget{FQDN: fqdn}
}
if ip := svc.Annotations[AnnotationTailnetTargetIP]; ip != "" {
cfg.TailnetTarget = egressservices.TailnetTarget{IP: ip}
}
name := tailnetSvcName(svc)
cfgs := egressservices.Configs{name: cfg}
bs, err := json.Marshal(&cfgs)
if err != nil {
t.Fatalf("error marshalling config: %v", err)
}
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(egressSvcsCMNameTemplate, svc.Annotations[AnnotationProxyGroup]),
Namespace: "operator-ns",
},
BinaryData: map[string][]byte{egressservices.KeyEgressServices: bs},
}
return cm
}
func serviceStatusForPodIP(t *testing.T, svc *corev1.Service, ip string, p uint16) []byte {
t.Helper()
ports := make(map[egressservices.PortMap]struct{})
for _, port := range svc.Spec.Ports {
ports[egressservices.PortMap{Protocol: string(port.Protocol), MatchPort: p, TargetPort: uint16(port.Port)}] = struct{}{}
}
svcSt := egressservices.ServiceStatus{Ports: ports}
if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" {
svcSt.TailnetTarget = egressservices.TailnetTarget{FQDN: fqdn}
}
if ip := svc.Annotations[AnnotationTailnetTargetIP]; ip != "" {
svcSt.TailnetTarget = egressservices.TailnetTarget{IP: ip}
}
svcName := tailnetSvcName(svc)
st := egressservices.Status{
PodIP: ip,
Services: map[string]*egressservices.ServiceStatus{svcName: &svcSt},
}
bs, err := json.Marshal(st)
if err != nil {
t.Fatalf("error marshalling service status: %v", err)
}
return bs
}
func podAndSecretForProxyGroup(pg string) (*corev1.Pod, *corev1.Secret) {
p := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-0", pg),
Namespace: "operator-ns",
Labels: map[string]string{labelProxyGroup: pg},
UID: "foo",
},
Status: corev1.PodStatus{
PodIP: "10.0.0.1",
},
}
s := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-0", pg),
Namespace: "operator-ns",
Labels: map[string]string{labelProxyGroup: pg},
},
}
return p, s
}
func randomPort() uint16 {
return uint16(rand.Int32N(1000) + 1000)
}

@ -0,0 +1,706 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"reflect"
"slices"
"strings"
"sync"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
tsoperator "tailscale.com/k8s-operator"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/egressservices"
"tailscale.com/kube/kubetypes"
"tailscale.com/tstime"
"tailscale.com/util/clientmetric"
"tailscale.com/util/mak"
"tailscale.com/util/set"
)
const (
reasonEgressSvcInvalid = "EgressSvcInvalid"
reasonEgressSvcValid = "EgressSvcValid"
reasonEgressSvcCreationFailed = "EgressSvcCreationFailed"
reasonProxyGroupNotReady = "ProxyGroupNotReady"
labelProxyGroup = "tailscale.com/proxy-group"
labelProxyGroupType = "tailscale.com/proxy-group-type"
labelExternalSvcName = "tailscale.com/external-service-name"
labelExternalSvcNamespace = "tailscale.com/external-service-namespace"
labelSvcType = "tailscale.com/svc-type" // ingress or egress
typeEgress = "egress"
// maxPorts is the maximum number of ports that can be exposed on a
// container. In practice this will be ports in range [3000 - 4000). The
// high range should make it easier to distinguish container ports from
// the tailnet target ports for debugging purposes (i.e when reading
// netfilter rules). The limit of 10000 is somewhat arbitrary, the
// assumption is that this would not be hit in practice.
maxPorts = 10000
indexEgressProxyGroup = ".metadata.annotations.egress-proxy-group"
egressSvcsCMNameTemplate = "proxy-cfg-%s"
)
var gaugeEgressServices = clientmetric.NewGauge(kubetypes.MetricEgressServiceCount)
// egressSvcsReconciler reconciles user created ExternalName Services that specify a tailnet
// endpoint that should be exposed to cluster workloads and an egress ProxyGroup
// on whose proxies it should be exposed.
type egressSvcsReconciler struct {
client.Client
logger *zap.SugaredLogger
recorder record.EventRecorder
clock tstime.Clock
tsNamespace string
mu sync.Mutex // protects following
svcs set.Slice[types.UID] // UIDs of all currently managed egress Services for ProxyGroup
}
// Reconcile reconciles an ExternalName Service that specifies a tailnet target and a ProxyGroup on whose proxies should
// forward cluster traffic to the target.
// For an ExternalName Service the reconciler:
//
// - for each port N defined on the ExternalName Service, allocates a port X in range [3000- 4000), unique for the
// ProxyGroup proxies. Proxies will forward cluster traffic received on port N to port M on the tailnet target
//
// - creates a ClusterIP Service in the operator's namespace with portmappings for all M->N port pairs. This will allow
// cluster workloads to send traffic on the user-defined tailnet target port and get it transparently mapped to the
// randomly selected port on proxy Pods.
//
// - creates an EndpointSlice in the operator's namespace with kubernetes.io/service-name label pointing to the
// ClusterIP Service. The endpoints will get dynamically updates to proxy Pod IPs as the Pods become ready to route
// traffic to the tailnet target. kubernetes.io/service-name label ensures that kube-proxy sets up routing rules to
// forward cluster traffic received on ClusterIP Service's IP address to the endpoints (Pod IPs).
//
// - updates the egress service config in a ConfigMap mounted to the ProxyGroup proxies with the tailnet target and the
// portmappings.
func (esr *egressSvcsReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
l := esr.logger.With("Service", req.NamespacedName)
defer l.Info("reconcile finished")
svc := new(corev1.Service)
if err = esr.Get(ctx, req.NamespacedName, svc); apierrors.IsNotFound(err) {
l.Info("Service not found")
return res, nil
} else if err != nil {
return res, fmt.Errorf("failed to get Service: %w", err)
}
// Name of the 'egress service', meaning the tailnet target.
tailnetSvc := tailnetSvcName(svc)
l = l.With("tailnet-service", tailnetSvc)
// Note that resources for egress Services are only cleaned up when the
// Service is actually deleted (and not if, for example, user decides to
// remove the Tailscale annotation from it). This should be fine- we
// assume that the egress ExternalName Services are always created for
// Tailscale operator specifically.
if !svc.DeletionTimestamp.IsZero() {
l.Info("Service is being deleted, ensuring resource cleanup")
return res, esr.maybeCleanup(ctx, svc, l)
}
oldStatus := svc.Status.DeepCopy()
defer func() {
if !apiequality.Semantic.DeepEqual(oldStatus, svc.Status) {
err = errors.Join(err, esr.Status().Update(ctx, svc))
}
}()
// Validate the user-created ExternalName Service and the associated ProxyGroup.
if ok, err := esr.validateClusterResources(ctx, svc, l); err != nil {
return res, fmt.Errorf("error validating cluster resources: %w", err)
} else if !ok {
return res, nil
}
if !slices.Contains(svc.Finalizers, FinalizerName) {
l.Infof("configuring tailnet service") // logged exactly once
svc.Finalizers = append(svc.Finalizers, FinalizerName)
if err := esr.Update(ctx, svc); err != nil {
err := fmt.Errorf("failed to add finalizer: %w", err)
r := svcConfiguredReason(svc, false, l)
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcConfigured, metav1.ConditionFalse, r, err.Error(), esr.clock, l)
return res, err
}
esr.mu.Lock()
esr.svcs.Add(svc.UID)
gaugeEgressServices.Set(int64(esr.svcs.Len()))
esr.mu.Unlock()
}
if err := esr.maybeCleanupProxyGroupConfig(ctx, svc, l); err != nil {
err = fmt.Errorf("cleaning up resources for previous ProxyGroup failed: %w", err)
r := svcConfiguredReason(svc, false, l)
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcConfigured, metav1.ConditionFalse, r, err.Error(), esr.clock, l)
return res, err
}
return res, esr.maybeProvision(ctx, svc, l)
}
func (esr *egressSvcsReconciler) maybeProvision(ctx context.Context, svc *corev1.Service, l *zap.SugaredLogger) (err error) {
l.Debug("maybe provision")
r := svcConfiguredReason(svc, false, l)
st := metav1.ConditionFalse
defer func() {
msg := r
if st != metav1.ConditionTrue && err != nil {
msg = err.Error()
}
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcConfigured, st, r, msg, esr.clock, l)
}()
crl := egressSvcChildResourceLabels(svc)
clusterIPSvc, err := getSingleObject[corev1.Service](ctx, esr.Client, esr.tsNamespace, crl)
if err != nil {
err = fmt.Errorf("error retrieving ClusterIP Service: %w", err)
return err
}
if clusterIPSvc == nil {
clusterIPSvc = esr.clusterIPSvcForEgress(crl)
}
upToDate := svcConfigurationUpToDate(svc, l)
provisioned := true
if !upToDate {
if clusterIPSvc, provisioned, err = esr.provision(ctx, svc.Annotations[AnnotationProxyGroup], svc, clusterIPSvc, l); err != nil {
return err
}
}
if !provisioned {
l.Infof("unable to provision cluster resources")
return nil
}
// Update ExternalName Service to point at the ClusterIP Service.
clusterDomain := retrieveClusterDomain(esr.tsNamespace, l)
clusterIPSvcFQDN := fmt.Sprintf("%s.%s.svc.%s", clusterIPSvc.Name, clusterIPSvc.Namespace, clusterDomain)
if svc.Spec.ExternalName != clusterIPSvcFQDN {
l.Infof("Configuring ExternalName Service to point to ClusterIP Service %s", clusterIPSvcFQDN)
svc.Spec.ExternalName = clusterIPSvcFQDN
if err = esr.Update(ctx, svc); err != nil {
err = fmt.Errorf("error updating ExternalName Service: %w", err)
return err
}
}
r = svcConfiguredReason(svc, true, l)
st = metav1.ConditionTrue
return nil
}
func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName string, svc, clusterIPSvc *corev1.Service, l *zap.SugaredLogger) (*corev1.Service, bool, error) {
l.Infof("updating configuration...")
usedPorts, err := esr.usedPortsForPG(ctx, proxyGroupName)
if err != nil {
return nil, false, fmt.Errorf("error calculating used ports for ProxyGroup %s: %w", proxyGroupName, err)
}
oldClusterIPSvc := clusterIPSvc.DeepCopy()
// loop over ClusterIP Service ports, remove any that are not needed.
for i := len(clusterIPSvc.Spec.Ports) - 1; i >= 0; i-- {
pm := clusterIPSvc.Spec.Ports[i]
found := false
for _, wantsPM := range svc.Spec.Ports {
if wantsPM.Port == pm.Port && strings.EqualFold(string(wantsPM.Protocol), string(pm.Protocol)) {
found = true
break
}
}
if !found {
l.Debugf("portmapping %s:%d -> %s:%d is no longer required, removing", pm.Protocol, pm.TargetPort.IntVal, pm.Protocol, pm.Port)
clusterIPSvc.Spec.Ports = slices.Delete(clusterIPSvc.Spec.Ports, i, i+1)
}
}
// loop over ExternalName Service ports, for each one not found on
// ClusterIP Service produce new target port and add a portmapping to
// the ClusterIP Service.
for _, wantsPM := range svc.Spec.Ports {
found := false
for _, gotPM := range clusterIPSvc.Spec.Ports {
if wantsPM.Port == gotPM.Port && strings.EqualFold(string(wantsPM.Protocol), string(gotPM.Protocol)) {
found = true
break
}
}
if !found {
// Calculate a free port to expose on container and add
// a new PortMap to the ClusterIP Service.
if usedPorts.Len() == maxPorts {
// TODO(irbekrm): refactor to avoid extra reconciles here. Low priority as in practice,
// the limit should not be hit.
return nil, false, fmt.Errorf("unable to allocate additional ports on ProxyGroup %s, %d ports already used. Create another ProxyGroup or open an issue if you believe this is unexpected.", proxyGroupName, maxPorts)
}
p := unusedPort(usedPorts)
l.Debugf("mapping tailnet target port %d to container port %d", wantsPM.Port, p)
usedPorts.Insert(p)
clusterIPSvc.Spec.Ports = append(clusterIPSvc.Spec.Ports, corev1.ServicePort{
Name: wantsPM.Name,
Protocol: wantsPM.Protocol,
Port: wantsPM.Port,
TargetPort: intstr.FromInt32(p),
})
}
}
if !reflect.DeepEqual(clusterIPSvc, oldClusterIPSvc) {
if clusterIPSvc, err = createOrUpdate(ctx, esr.Client, esr.tsNamespace, clusterIPSvc, func(svc *corev1.Service) {
svc.Labels = clusterIPSvc.Labels
svc.Spec = clusterIPSvc.Spec
}); err != nil {
return nil, false, fmt.Errorf("error ensuring ClusterIP Service: %v", err)
}
}
crl := egressSvcChildResourceLabels(svc)
// TODO(irbekrm): support IPv6, but need to investigate how kube proxy
// sets up Service -> Pod routing when IPv6 is involved.
crl[discoveryv1.LabelServiceName] = clusterIPSvc.Name
crl[discoveryv1.LabelManagedBy] = "tailscale.com"
eps := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-ipv4", clusterIPSvc.Name),
Namespace: esr.tsNamespace,
Labels: crl,
},
AddressType: discoveryv1.AddressTypeIPv4,
Ports: epsPortsFromSvc(clusterIPSvc),
}
if eps, err = createOrUpdate(ctx, esr.Client, esr.tsNamespace, eps, func(e *discoveryv1.EndpointSlice) {
e.Labels = eps.Labels
e.AddressType = eps.AddressType
e.Ports = eps.Ports
for _, p := range e.Endpoints {
p.Conditions.Ready = nil
}
}); err != nil {
return nil, false, fmt.Errorf("error ensuring EndpointSlice: %w", err)
}
cm, cfgs, err := egressSvcsConfigs(ctx, esr.Client, proxyGroupName, esr.tsNamespace)
if err != nil {
return nil, false, fmt.Errorf("error retrieving egress services configuration: %w", err)
}
if cm == nil {
l.Info("ConfigMap not yet created, waiting..")
return nil, false, nil
}
tailnetSvc := tailnetSvcName(svc)
gotCfg := (*cfgs)[tailnetSvc]
wantsCfg := egressSvcCfg(svc, clusterIPSvc)
if !reflect.DeepEqual(gotCfg, wantsCfg) {
l.Debugf("updating egress services ConfigMap %s", cm.Name)
mak.Set(cfgs, tailnetSvc, wantsCfg)
bs, err := json.Marshal(cfgs)
if err != nil {
return nil, false, fmt.Errorf("error marshalling egress services configs: %w", err)
}
mak.Set(&cm.BinaryData, egressservices.KeyEgressServices, bs)
if err := esr.Update(ctx, cm); err != nil {
return nil, false, fmt.Errorf("error updating egress services ConfigMap: %w", err)
}
}
l.Infof("egress service configuration has been updated")
return clusterIPSvc, true, nil
}
func (esr *egressSvcsReconciler) maybeCleanup(ctx context.Context, svc *corev1.Service, logger *zap.SugaredLogger) error {
logger.Info("ensuring that resources created for egress service are deleted")
// Delete egress service config from the ConfigMap mounted by the proxies.
if err := esr.ensureEgressSvcCfgDeleted(ctx, svc, logger); err != nil {
return fmt.Errorf("error deleting egress service config: %w", err)
}
// Delete the ClusterIP Service and EndpointSlice for the egress
// service.
types := []client.Object{
&corev1.Service{},
&discoveryv1.EndpointSlice{},
}
crl := egressSvcChildResourceLabels(svc)
for _, typ := range types {
if err := esr.DeleteAllOf(ctx, typ, client.InNamespace(esr.tsNamespace), client.MatchingLabels(crl)); err != nil {
return fmt.Errorf("error deleting %s: %w", typ, err)
}
}
ix := slices.Index(svc.Finalizers, FinalizerName)
if ix != -1 {
logger.Debug("Removing Tailscale finalizer from Service")
svc.Finalizers = append(svc.Finalizers[:ix], svc.Finalizers[ix+1:]...)
if err := esr.Update(ctx, svc); err != nil {
return fmt.Errorf("failed to remove finalizer: %w", err)
}
}
esr.mu.Lock()
esr.svcs.Remove(svc.UID)
gaugeEgressServices.Set(int64(esr.svcs.Len()))
esr.mu.Unlock()
logger.Info("successfully cleaned up resources for egress Service")
return nil
}
func (esr *egressSvcsReconciler) maybeCleanupProxyGroupConfig(ctx context.Context, svc *corev1.Service, l *zap.SugaredLogger) error {
wantsProxyGroup := svc.Annotations[AnnotationProxyGroup]
cond := tsoperator.GetServiceCondition(svc, tsapi.EgressSvcConfigured)
if cond == nil {
return nil
}
ss := strings.Split(cond.Reason, ":")
if len(ss) < 3 {
return nil
}
if strings.EqualFold(wantsProxyGroup, ss[2]) {
return nil
}
esr.logger.Infof("egress Service configured on ProxyGroup %s, wants ProxyGroup %s, cleaning up...", ss[2], wantsProxyGroup)
if err := esr.ensureEgressSvcCfgDeleted(ctx, svc, l); err != nil {
return fmt.Errorf("error deleting egress service config: %w", err)
}
return nil
}
// usedPortsForPG calculates the currently used match ports for ProxyGroup
// containers. It does that by looking by retrieving all target ports of all
// ClusterIP Services created for egress services exposed on this ProxyGroup's
// proxies.
// TODO(irbekrm): this is currently good enough because we only have a single worker and
// because these Services are created by us, so we can always expect to get the
// latest ClusterIP Services via the controller cache. It will not work as well
// once we split into multiple workers- at that point we probably want to set
// used ports on ProxyGroup's status.
func (esr *egressSvcsReconciler) usedPortsForPG(ctx context.Context, pg string) (sets.Set[int32], error) {
svcList := &corev1.ServiceList{}
if err := esr.List(ctx, svcList, client.InNamespace(esr.tsNamespace), client.MatchingLabels(map[string]string{labelProxyGroup: pg})); err != nil {
return nil, fmt.Errorf("error listing Services: %w", err)
}
usedPorts := sets.New[int32]()
for _, s := range svcList.Items {
for _, p := range s.Spec.Ports {
usedPorts.Insert(p.TargetPort.IntVal)
}
}
return usedPorts, nil
}
// clusterIPSvcForEgress returns a template for the ClusterIP Service created
// for an egress service exposed on ProxyGroup proxies. The ClusterIP Service
// has no selector. Traffic sent to it will be routed to the endpoints defined
// by an EndpointSlice created for this egress service.
func (esr *egressSvcsReconciler) clusterIPSvcForEgress(crl map[string]string) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
GenerateName: svcNameBase(crl[labelExternalSvcName]),
Namespace: esr.tsNamespace,
Labels: crl,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
},
}
}
func (esr *egressSvcsReconciler) ensureEgressSvcCfgDeleted(ctx context.Context, svc *corev1.Service, logger *zap.SugaredLogger) error {
crl := egressSvcChildResourceLabels(svc)
cmName := fmt.Sprintf(egressSvcsCMNameTemplate, crl[labelProxyGroup])
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cmName,
Namespace: esr.tsNamespace,
},
}
l := logger.With("ConfigMap", client.ObjectKeyFromObject(cm))
l.Debug("ensuring that egress service configuration is removed from proxy config")
if err := esr.Get(ctx, client.ObjectKeyFromObject(cm), cm); apierrors.IsNotFound(err) {
l.Debugf("ConfigMap not found")
return nil
} else if err != nil {
return fmt.Errorf("error retrieving ConfigMap: %w", err)
}
bs := cm.BinaryData[egressservices.KeyEgressServices]
if len(bs) == 0 {
l.Debugf("ConfigMap does not contain egress service configs")
return nil
}
cfgs := &egressservices.Configs{}
if err := json.Unmarshal(bs, cfgs); err != nil {
return fmt.Errorf("error unmarshalling egress services configs")
}
tailnetSvc := tailnetSvcName(svc)
_, ok := (*cfgs)[tailnetSvc]
if !ok {
l.Debugf("ConfigMap does not contain egress service config, likely because it was already deleted")
return nil
}
l.Infof("before deleting config %+#v", *cfgs)
delete(*cfgs, tailnetSvc)
l.Infof("after deleting config %+#v", *cfgs)
bs, err := json.Marshal(cfgs)
if err != nil {
return fmt.Errorf("error marshalling egress services configs: %w", err)
}
mak.Set(&cm.BinaryData, egressservices.KeyEgressServices, bs)
return esr.Update(ctx, cm)
}
func (esr *egressSvcsReconciler) validateClusterResources(ctx context.Context, svc *corev1.Service, l *zap.SugaredLogger) (bool, error) {
proxyGroupName := svc.Annotations[AnnotationProxyGroup]
pg := &tsapi.ProxyGroup{
ObjectMeta: metav1.ObjectMeta{
Name: proxyGroupName,
},
}
if err := esr.Get(ctx, client.ObjectKeyFromObject(pg), pg); apierrors.IsNotFound(err) {
l.Infof("ProxyGroup %q not found, waiting...", proxyGroupName)
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, esr.clock, l)
return false, nil
} else if err != nil {
err := fmt.Errorf("unable to retrieve ProxyGroup %s: %w", proxyGroupName, err)
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, err.Error(), esr.clock, l)
return false, err
}
if !tsoperator.ProxyGroupIsReady(pg) {
l.Infof("ProxyGroup %s is not ready, waiting...", proxyGroupName)
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, esr.clock, l)
return false, nil
}
if violations := validateEgressService(svc, pg); len(violations) > 0 {
msg := fmt.Sprintf("invalid egress Service: %s", strings.Join(violations, ", "))
esr.recorder.Event(svc, corev1.EventTypeWarning, "INVALIDSERVICE", msg)
l.Info(msg)
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionFalse, reasonEgressSvcInvalid, msg, esr.clock, l)
return false, nil
}
l.Debugf("egress service is valid")
tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionTrue, reasonEgressSvcValid, reasonEgressSvcValid, esr.clock, l)
return true, nil
}
func validateEgressService(svc *corev1.Service, pg *tsapi.ProxyGroup) []string {
violations := validateService(svc)
// We check that only one of these two is set in the earlier validateService function.
if svc.Annotations[AnnotationTailnetTargetFQDN] == "" && svc.Annotations[AnnotationTailnetTargetIP] == "" {
violations = append(violations, fmt.Sprintf("egress Service for ProxyGroup must have one of %s, %s annotations set", AnnotationTailnetTargetFQDN, AnnotationTailnetTargetIP))
}
if len(svc.Spec.Ports) == 0 {
violations = append(violations, "egress Service for ProxyGroup must have at least one target Port specified")
}
if svc.Spec.Type != corev1.ServiceTypeExternalName {
violations = append(violations, fmt.Sprintf("unexpected egress Service type %s. The only supported type is ExternalName.", svc.Spec.Type))
}
if pg.Spec.Type != tsapi.ProxyGroupTypeEgress {
violations = append(violations, fmt.Sprintf("egress Service references ProxyGroup of type %s, must be type %s", pg.Spec.Type, tsapi.ProxyGroupTypeEgress))
}
return violations
}
// egressSvcNameBase returns a name base that can be passed to
// ObjectMeta.GenerateName to generate a name for the ClusterIP Service.
// The generated name needs to be short enough so that it can later be used to
// generate a valid Kubernetes resource name for the EndpointSlice in form
// 'ipv4-|ipv6-<ClusterIP Service name>.
// A valid Kubernetes resource name must not be longer than 253 chars.
func svcNameBase(s string) string {
// -ipv4 - ipv6
const maxClusterIPSvcNameLength = 253 - 5
base := fmt.Sprintf("ts-%s-", s)
generator := names.SimpleNameGenerator
for {
generatedName := generator.GenerateName(base)
excess := len(generatedName) - maxClusterIPSvcNameLength
if excess <= 0 {
return base
}
base = base[:len(base)-1-excess] // cut off the excess chars
base = base + "-" // re-instate the dash
}
}
// unusedPort returns a port in range [3000 - 4000). The caller must ensure that
// usedPorts does not contain all ports in range [3000 - 4000).
func unusedPort(usedPorts sets.Set[int32]) int32 {
foundFreePort := false
var suggestPort int32
for !foundFreePort {
suggestPort = rand.Int32N(maxPorts) + 3000
if !usedPorts.Has(suggestPort) {
foundFreePort = true
}
}
return suggestPort
}
// tailnetTargetFromSvc returns a tailnet target for the given egress Service.
// Service must contain exactly one of tailscale.com/tailnet-ip,
// tailscale.com/tailnet-fqdn annotations.
func tailnetTargetFromSvc(svc *corev1.Service) egressservices.TailnetTarget {
if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" {
return egressservices.TailnetTarget{
FQDN: fqdn,
}
}
return egressservices.TailnetTarget{
IP: svc.Annotations[AnnotationTailnetTargetIP],
}
}
func egressSvcCfg(externalNameSvc, clusterIPSvc *corev1.Service) egressservices.Config {
tt := tailnetTargetFromSvc(externalNameSvc)
cfg := egressservices.Config{TailnetTarget: tt}
for _, svcPort := range clusterIPSvc.Spec.Ports {
pm := portMap(svcPort)
mak.Set(&cfg.Ports, pm, struct{}{})
}
return cfg
}
func portMap(p corev1.ServicePort) egressservices.PortMap {
// TODO (irbekrm): out of bounds check?
return egressservices.PortMap{Protocol: string(p.Protocol), MatchPort: uint16(p.TargetPort.IntVal), TargetPort: uint16(p.Port)}
}
func isEgressSvcForProxyGroup(obj client.Object) bool {
s, ok := obj.(*corev1.Service)
if !ok {
return false
}
annots := s.ObjectMeta.Annotations
return annots[AnnotationProxyGroup] != "" && (annots[AnnotationTailnetTargetFQDN] != "" || annots[AnnotationTailnetTargetIP] != "")
}
// egressSvcConfig returns a ConfigMap that contains egress services configuration for the provided ProxyGroup as well
// as unmarshalled configuration from the ConfigMap.
func egressSvcsConfigs(ctx context.Context, cl client.Client, proxyGroupName, tsNamespace string) (cm *corev1.ConfigMap, cfgs *egressservices.Configs, err error) {
cmName := fmt.Sprintf(egressSvcsCMNameTemplate, proxyGroupName)
cm = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cmName,
Namespace: tsNamespace,
},
}
if err := cl.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil {
return nil, nil, fmt.Errorf("error retrieving egress services ConfigMap %s: %v", cmName, err)
}
cfgs = &egressservices.Configs{}
if len(cm.BinaryData[egressservices.KeyEgressServices]) != 0 {
if err := json.Unmarshal(cm.BinaryData[egressservices.KeyEgressServices], cfgs); err != nil {
return nil, nil, fmt.Errorf("error unmarshaling egress services config %v: %w", cm.BinaryData[egressservices.KeyEgressServices], err)
}
}
return cm, cfgs, nil
}
// egressSvcChildResourceLabels returns labels that should be applied to the
// ClusterIP Service and the EndpointSlice created for the egress service.
// TODO(irbekrm): we currently set a bunch of labels based on Kubernetes
// resource names (ProxyGroup, Service). Maximum allowed label length is 63
// chars whilst the maximum allowed resource name length is 253 chars, so we
// should probably validate and truncate (?) the names is they are too long.
func egressSvcChildResourceLabels(svc *corev1.Service) map[string]string {
return map[string]string{
LabelManaged: "true",
labelProxyGroup: svc.Annotations[AnnotationProxyGroup],
labelExternalSvcName: svc.Name,
labelExternalSvcNamespace: svc.Namespace,
labelSvcType: typeEgress,
}
}
func svcConfigurationUpToDate(svc *corev1.Service, l *zap.SugaredLogger) bool {
cond := tsoperator.GetServiceCondition(svc, tsapi.EgressSvcConfigured)
if cond == nil {
return false
}
if cond.Status != metav1.ConditionTrue {
return false
}
wantsReadyReason := svcConfiguredReason(svc, true, l)
return strings.EqualFold(wantsReadyReason, cond.Reason)
}
func cfgHash(c cfg, l *zap.SugaredLogger) string {
bs, err := json.Marshal(c)
if err != nil {
// Don't use l.Error as that messes up component logs with, in this case, unnecessary stack trace.
l.Infof("error marhsalling Config: %v", err)
return ""
}
h := sha256.New()
if _, err := h.Write(bs); err != nil {
// Don't use l.Error as that messes up component logs with, in this case, unnecessary stack trace.
l.Infof("error producing Config hash: %v", err)
return ""
}
return fmt.Sprintf("%x", h.Sum(nil))
}
type cfg struct {
Ports []corev1.ServicePort `json:"ports"`
TailnetTarget egressservices.TailnetTarget `json:"tailnetTarget"`
ProxyGroup string `json:"proxyGroup"`
}
func svcConfiguredReason(svc *corev1.Service, configured bool, l *zap.SugaredLogger) string {
var r string
if configured {
r = "ConfiguredFor:"
} else {
r = fmt.Sprintf("ConfigurationFailed:%s", r)
}
r += fmt.Sprintf("ProxyGroup:%s", svc.Annotations[AnnotationProxyGroup])
tt := tailnetTargetFromSvc(svc)
s := cfg{
Ports: svc.Spec.Ports,
TailnetTarget: tt,
ProxyGroup: svc.Annotations[AnnotationProxyGroup],
}
r += fmt.Sprintf(":Config:%s", cfgHash(s, l))
return r
}
// tailnetSvc accepts and ExternalName Service name and returns a name that will be used to distinguish this tailnet
// service from other tailnet services exposed to cluster workloads.
func tailnetSvcName(extNSvc *corev1.Service) string {
return fmt.Sprintf("%s-%s", extNSvc.Namespace, extNSvc.Name)
}
// epsPortsFromSvc takes the ClusterIP Service created for an egress service and
// returns its Port array in a form that can be used for an EndpointSlice.
func epsPortsFromSvc(svc *corev1.Service) (ep []discoveryv1.EndpointPort) {
for _, p := range svc.Spec.Ports {
ep = append(ep, discoveryv1.EndpointPort{
Protocol: &p.Protocol,
Port: &p.TargetPort.IntVal,
Name: &p.Name,
})
}
return ep
}

@ -0,0 +1,268 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
"encoding/json"
"fmt"
"testing"
"github.com/AlekSi/pointer"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/egressservices"
"tailscale.com/tstest"
"tailscale.com/tstime"
)
func TestTailscaleEgressServices(t *testing.T) {
pg := &tsapi.ProxyGroup{
TypeMeta: metav1.TypeMeta{Kind: "ProxyGroup", APIVersion: "tailscale.com/v1alpha1"},
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
UID: types.UID("1234-UID"),
},
Spec: tsapi.ProxyGroupSpec{
Replicas: pointer.To(3),
Type: tsapi.ProxyGroupTypeEgress,
},
}
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(egressSvcsCMNameTemplate, "foo"),
Namespace: "operator-ns",
},
}
fc := fake.NewClientBuilder().
WithScheme(tsapi.GlobalScheme).
WithObjects(pg, cm).
WithStatusSubresource(pg).
Build()
zl, err := zap.NewDevelopment()
if err != nil {
t.Fatal(err)
}
clock := tstest.NewClock(tstest.ClockOpts{})
esr := &egressSvcsReconciler{
Client: fc,
logger: zl.Sugar(),
clock: clock,
tsNamespace: "operator-ns",
}
tailnetTargetFQDN := "foo.bar.ts.net."
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
UID: types.UID("1234-UID"),
Annotations: map[string]string{
AnnotationTailnetTargetFQDN: tailnetTargetFQDN,
AnnotationProxyGroup: "foo",
},
},
Spec: corev1.ServiceSpec{
ExternalName: "placeholder",
Type: corev1.ServiceTypeExternalName,
Selector: nil,
Ports: []corev1.ServicePort{
{
Name: "http",
Protocol: "TCP",
Port: 80,
},
{
Name: "https",
Protocol: "TCP",
Port: 443,
},
},
},
}
t.Run("proxy_group_not_ready", func(t *testing.T) {
mustCreate(t, fc, svc)
expectReconciled(t, esr, "default", "test")
// Service should have EgressSvcValid condition set to Unknown.
svc.Status.Conditions = []metav1.Condition{condition(tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, clock)}
expectEqual(t, fc, svc, nil)
})
t.Run("proxy_group_ready", func(t *testing.T) {
mustUpdateStatus(t, fc, "", "foo", func(pg *tsapi.ProxyGroup) {
pg.Status.Conditions = []metav1.Condition{
condition(tsapi.ProxyGroupReady, metav1.ConditionTrue, "", "", clock),
}
})
// Quirks of the fake client.
mustUpdateStatus(t, fc, "default", "test", func(svc *corev1.Service) {
svc.Status.Conditions = []metav1.Condition{}
})
expectReconciled(t, esr, "default", "test")
// Verify that a ClusterIP Service has been created.
name := findGenNameForEgressSvcResources(t, fc, svc)
expectEqual(t, fc, clusterIPSvc(name, svc), removeTargetPortsFromSvc)
clusterSvc := mustGetClusterIPSvc(t, fc, name)
// Verify that an EndpointSlice has been created.
expectEqual(t, fc, endpointSlice(name, svc, clusterSvc), nil)
// Verify that ConfigMap contains configuration for the new egress service.
mustHaveConfigForSvc(t, fc, svc, clusterSvc, cm)
r := svcConfiguredReason(svc, true, zl.Sugar())
// Verify that the user-created ExternalName Service has Configured set to true and ExternalName pointing to the
// CluterIP Service.
svc.Status.Conditions = []metav1.Condition{
condition(tsapi.EgressSvcConfigured, metav1.ConditionTrue, r, r, clock),
}
svc.ObjectMeta.Finalizers = []string{"tailscale.com/finalizer"}
svc.Spec.ExternalName = fmt.Sprintf("%s.operator-ns.svc.cluster.local", name)
expectEqual(t, fc, svc, nil)
})
t.Run("delete_external_name_service", func(t *testing.T) {
name := findGenNameForEgressSvcResources(t, fc, svc)
if err := fc.Delete(context.Background(), svc); err != nil {
t.Fatalf("error deleting ExternalName Service: %v", err)
}
expectReconciled(t, esr, "default", "test")
// Verify that ClusterIP Service and EndpointSlice have been deleted.
expectMissing[corev1.Service](t, fc, "operator-ns", name)
expectMissing[discoveryv1.EndpointSlice](t, fc, "operator-ns", fmt.Sprintf("%s-ipv4", name))
// Verify that service config has been deleted from the ConfigMap.
mustNotHaveConfigForSvc(t, fc, svc, cm)
})
}
func condition(typ tsapi.ConditionType, st metav1.ConditionStatus, r, msg string, clock tstime.Clock) metav1.Condition {
return metav1.Condition{
Type: string(typ),
Status: st,
LastTransitionTime: conditionTime(clock),
Reason: r,
Message: msg,
}
}
func findGenNameForEgressSvcResources(t *testing.T, client client.Client, svc *corev1.Service) string {
t.Helper()
labels := egressSvcChildResourceLabels(svc)
s, err := getSingleObject[corev1.Service](context.Background(), client, "operator-ns", labels)
if err != nil {
t.Fatalf("finding ClusterIP Service for ExternalName Service %s: %v", svc.Name, err)
}
if s == nil {
t.Fatalf("no ClusterIP Service found for ExternalName Service %q", svc.Name)
}
return s.GetName()
}
func clusterIPSvc(name string, extNSvc *corev1.Service) *corev1.Service {
labels := egressSvcChildResourceLabels(extNSvc)
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "operator-ns",
GenerateName: fmt.Sprintf("ts-%s-", extNSvc.Name),
Labels: labels,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Ports: extNSvc.Spec.Ports,
},
}
}
func mustGetClusterIPSvc(t *testing.T, cl client.Client, name string) *corev1.Service {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "operator-ns",
},
}
if err := cl.Get(context.Background(), client.ObjectKeyFromObject(svc), svc); err != nil {
t.Fatalf("error retrieving Service")
}
return svc
}
func endpointSlice(name string, extNSvc, clusterIPSvc *corev1.Service) *discoveryv1.EndpointSlice {
labels := egressSvcChildResourceLabels(extNSvc)
labels[discoveryv1.LabelManagedBy] = "tailscale.com"
labels[discoveryv1.LabelServiceName] = name
return &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-ipv4", name),
Namespace: "operator-ns",
Labels: labels,
},
Ports: portsForEndpointSlice(clusterIPSvc),
AddressType: discoveryv1.AddressTypeIPv4,
}
}
func portsForEndpointSlice(svc *corev1.Service) []discoveryv1.EndpointPort {
ports := make([]discoveryv1.EndpointPort, 0)
for _, p := range svc.Spec.Ports {
ports = append(ports, discoveryv1.EndpointPort{
Name: &p.Name,
Protocol: &p.Protocol,
Port: pointer.ToInt32(p.TargetPort.IntVal),
})
}
return ports
}
func mustHaveConfigForSvc(t *testing.T, cl client.Client, extNSvc, clusterIPSvc *corev1.Service, cm *corev1.ConfigMap) {
t.Helper()
wantsCfg := egressSvcCfg(extNSvc, clusterIPSvc)
if err := cl.Get(context.Background(), client.ObjectKeyFromObject(cm), cm); err != nil {
t.Fatalf("Error retrieving ConfigMap: %v", err)
}
name := tailnetSvcName(extNSvc)
gotCfg := configFromCM(t, cm, name)
if gotCfg == nil {
t.Fatalf("No config found for service %q", name)
}
if diff := cmp.Diff(*gotCfg, wantsCfg); diff != "" {
t.Fatalf("unexpected config for service %q (-got +want):\n%s", name, diff)
}
}
func mustNotHaveConfigForSvc(t *testing.T, cl client.Client, extNSvc *corev1.Service, cm *corev1.ConfigMap) {
t.Helper()
if err := cl.Get(context.Background(), client.ObjectKeyFromObject(cm), cm); err != nil {
t.Fatalf("Error retrieving ConfigMap: %v", err)
}
name := tailnetSvcName(extNSvc)
gotCfg := configFromCM(t, cm, name)
if gotCfg != nil {
t.Fatalf("Config %#+v for service %q found when it should not be present", gotCfg, name)
}
}
func configFromCM(t *testing.T, cm *corev1.ConfigMap, svcName string) *egressservices.Config {
t.Helper()
cfgBs, ok := cm.BinaryData[egressservices.KeyEgressServices]
if !ok {
return nil
}
cfgs := &egressservices.Configs{}
if err := json.Unmarshal(cfgBs, cfgs); err != nil {
t.Fatalf("error unmarshalling config: %v", err)
}
cfg, ok := (*cfgs)[svcName]
if ok {
return &cfg
}
return nil
}

@ -238,6 +238,7 @@ func runReconcilers(opts reconcilerOpts) {
ByObject: map[client.Object]cache.ByObject{ ByObject: map[client.Object]cache.ByObject{
&corev1.Secret{}: nsFilter, &corev1.Secret{}: nsFilter,
&corev1.ServiceAccount{}: nsFilter, &corev1.ServiceAccount{}: nsFilter,
&corev1.Pod{}: nsFilter,
&corev1.ConfigMap{}: nsFilter, &corev1.ConfigMap{}: nsFilter,
&appsv1.StatefulSet{}: nsFilter, &appsv1.StatefulSet{}: nsFilter,
&appsv1.Deployment{}: nsFilter, &appsv1.Deployment{}: nsFilter,
@ -353,6 +354,48 @@ func runReconcilers(opts reconcilerOpts) {
if err != nil { if err != nil {
startlog.Fatalf("could not create nameserver reconciler: %v", err) startlog.Fatalf("could not create nameserver reconciler: %v", err)
} }
egressSvcFilter := handler.EnqueueRequestsFromMapFunc(egressSvcsHandler)
proxyGroupFilter := handler.EnqueueRequestsFromMapFunc(egressSvcsFromEgressProxyGroup(mgr.GetClient(), opts.log))
err = builder.
ControllerManagedBy(mgr).
Named("egress-svcs-reconciler").
Watches(&corev1.Service{}, egressSvcFilter).
Watches(&tsapi.ProxyGroup{}, proxyGroupFilter).
Complete(&egressSvcsReconciler{
Client: mgr.GetClient(),
tsNamespace: opts.tailscaleNamespace,
recorder: eventRecorder,
clock: tstime.DefaultClock{},
logger: opts.log.Named("egress-svcs-reconciler"),
})
if err != nil {
startlog.Fatalf("could not create egress Services reconciler: %v", err)
}
if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(corev1.Service), indexEgressProxyGroup, indexEgressServices); err != nil {
startlog.Fatalf("failed setting up indexer for egress Services: %v", err)
}
epsFilter := handler.EnqueueRequestsFromMapFunc(egressEpsHandler)
podsSecretsFilter := handler.EnqueueRequestsFromMapFunc(egressEpsFromEgressPGChildResources(mgr.GetClient(), opts.log, opts.tailscaleNamespace))
epsFromExtNSvcFilter := handler.EnqueueRequestsFromMapFunc(epsFromExternalNameService(mgr.GetClient(), opts.log))
err = builder.
ControllerManagedBy(mgr).
Named("egress-eps-reconciler").
Watches(&discoveryv1.EndpointSlice{}, epsFilter).
Watches(&corev1.Pod{}, podsSecretsFilter).
Watches(&corev1.Secret{}, podsSecretsFilter).
Watches(&corev1.Service{}, epsFromExtNSvcFilter).
Complete(&egressEpsReconciler{
Client: mgr.GetClient(),
tsNamespace: opts.tailscaleNamespace,
logger: opts.log.Named("egress-eps-reconciler"),
})
if err != nil {
startlog.Fatalf("could not create egress EndpointSlices reconciler: %v", err)
}
err = builder.ControllerManagedBy(mgr). err = builder.ControllerManagedBy(mgr).
For(&tsapi.ProxyClass{}). For(&tsapi.ProxyClass{}).
Complete(&ProxyClassReconciler{ Complete(&ProxyClassReconciler{
@ -687,6 +730,10 @@ func serviceHandlerForIngress(cl client.Client, logger *zap.SugaredLogger) handl
} }
func serviceHandler(_ context.Context, o client.Object) []reconcile.Request { func serviceHandler(_ context.Context, o client.Object) []reconcile.Request {
if _, ok := o.GetAnnotations()[AnnotationProxyGroup]; ok {
// Do not reconcile Services for ProxyGroup.
return nil
}
if isManagedByType(o, "svc") { if isManagedByType(o, "svc") {
// If this is a Service managed by a Service we want to enqueue its parent // If this is a Service managed by a Service we want to enqueue its parent
return []reconcile.Request{{NamespacedName: parentFromObjectLabels(o)}} return []reconcile.Request{{NamespacedName: parentFromObjectLabels(o)}}
@ -712,3 +759,136 @@ func isMagicDNSName(name string) bool {
validMagicDNSName := regexp.MustCompile(`^[a-zA-Z0-9-]+\.[a-zA-Z0-9-]+\.ts\.net\.?$`) validMagicDNSName := regexp.MustCompile(`^[a-zA-Z0-9-]+\.[a-zA-Z0-9-]+\.ts\.net\.?$`)
return validMagicDNSName.MatchString(name) return validMagicDNSName.MatchString(name)
} }
// egressSvcsHandler returns accepts a Kubernetes object and returns a reconcile
// request for it , if the object is a Tailscale egress Service meant to be
// exposed on a ProxyGroup.
func egressSvcsHandler(_ context.Context, o client.Object) []reconcile.Request {
if !isEgressSvcForProxyGroup(o) {
return nil
}
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: o.GetNamespace(),
Name: o.GetName(),
},
},
}
}
// egressEpsHandler returns accepts an EndpointSlice and, if the EndpointSlice
// is for an egress service, returns a reconcile request for it.
func egressEpsHandler(_ context.Context, o client.Object) []reconcile.Request {
if typ := o.GetLabels()[labelSvcType]; typ != typeEgress {
return nil
}
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: o.GetNamespace(),
Name: o.GetName(),
},
},
}
}
// egressEpsFromEgressPGChildResources returns a handler that checks if an
// object is a child resource for an egress ProxyGroup (a Pod or a state Secret)
// and if it is, returns reconciler requests for all egress EndpointSlices for
// that ProxyGroup.
func egressEpsFromEgressPGChildResources(cl client.Client, logger *zap.SugaredLogger, ns string) handler.MapFunc {
return func(_ context.Context, o client.Object) []reconcile.Request {
pg, ok := o.GetLabels()[labelProxyGroup]
if !ok {
return nil
}
// TODO(irbekrm): depending on what labels we add to ProxyGroup
// resources and which resources, this might need some extra
// checks.
if typ, ok := o.GetLabels()[labelProxyGroupType]; !ok || typ != typeEgress {
return nil
}
epsList := discoveryv1.EndpointSliceList{}
if err := cl.List(context.Background(), &epsList, client.InNamespace(ns), client.MatchingLabels(map[string]string{labelProxyGroup: pg})); err != nil {
logger.Infof("error listing EndpointSlices: %v, skipping a reconcile for event on %s %s", err, o.GetName(), o.GetObjectKind().GroupVersionKind().Kind)
return nil
}
reqs := make([]reconcile.Request, 0)
for _, ep := range epsList.Items {
reqs = append(reqs, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: ep.Namespace,
Name: ep.Name,
},
})
}
return reqs
}
}
func egressSvcsFromEgressProxyGroup(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
return func(_ context.Context, o client.Object) []reconcile.Request {
pg, ok := o.(*tsapi.ProxyGroup)
if !ok {
logger.Infof("[unexpected] ProxyGroup handler triggered for an object that is not a ProxyGroup")
return nil
}
if pg.Spec.Type != tsapi.ProxyGroupTypeEgress {
return nil
}
svcList := &corev1.ServiceList{}
if err := cl.List(context.Background(), svcList, client.MatchingFields{indexEgressProxyGroup: pg.Name}); err != nil {
logger.Infof("error listing Services: %v, skipping a reconcile for event on ProxyGroup %s", err, pg.Name)
return nil
}
reqs := make([]reconcile.Request, 0)
for _, svc := range svcList.Items {
reqs = append(reqs, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: svc.Namespace,
Name: svc.Name,
},
})
}
return reqs
}
}
func epsFromExternalNameService(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
return func(_ context.Context, o client.Object) []reconcile.Request {
svc, ok := o.(*corev1.Service)
if !ok {
logger.Infof("[unexpected] Service handler triggered for an object that is not a Service")
return nil
}
if !isEgressSvcForProxyGroup(svc) {
return nil
}
epsList := &discoveryv1.EndpointSliceList{}
if err := cl.List(context.Background(), epsList, client.MatchingLabels(map[string]string{
labelExternalSvcName: svc.Name,
labelExternalSvcNamespace: svc.Namespace,
})); err != nil {
logger.Infof("error listing EndpointSlices: %v, skipping a reconcile for event on Service %s", err, svc.Name)
return nil
}
reqs := make([]reconcile.Request, 0)
for _, eps := range epsList.Items {
reqs = append(reqs, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: eps.Namespace,
Name: eps.Name,
},
})
}
return reqs
}
}
func indexEgressServices(o client.Object) []string {
if !isEgressSvcForProxyGroup(o) {
return nil
}
return []string{o.GetAnnotations()[AnnotationProxyGroup]}
}

@ -65,6 +65,8 @@ const (
//MagicDNS name of tailnet node. //MagicDNS name of tailnet node.
AnnotationTailnetTargetFQDN = "tailscale.com/tailnet-fqdn" AnnotationTailnetTargetFQDN = "tailscale.com/tailnet-fqdn"
AnnotationProxyGroup = "tailscale.com/proxy-group"
// Annotations settable by users on ingresses. // Annotations settable by users on ingresses.
AnnotationFunnel = "tailscale.com/funnel" AnnotationFunnel = "tailscale.com/funnel"

@ -354,6 +354,10 @@ func validateService(svc *corev1.Service) []string {
violations = append(violations, fmt.Sprintf("invalid value of annotation %s: %q does not appear to be a valid MagicDNS name", AnnotationTailnetTargetFQDN, fqdn)) violations = append(violations, fmt.Sprintf("invalid value of annotation %s: %q does not appear to be a valid MagicDNS name", AnnotationTailnetTargetFQDN, fqdn))
} }
} }
// TODO(irbekrm): validate that tailscale.com/tailnet-ip annotation is a
// valid IP address (tailscale/tailscale#13671).
svcName := nameForService(svc) svcName := nameForService(svc)
if err := dnsname.ValidLabel(svcName); err != nil { if err := dnsname.ValidLabel(svcName); err != nil {
if _, ok := svc.Annotations[AnnotationHostname]; ok { if _, ok := svc.Annotations[AnnotationHostname]; ok {

@ -639,6 +639,14 @@ func removeHashAnnotation(sts *appsv1.StatefulSet) {
delete(sts.Spec.Template.Annotations, podAnnotationLastSetConfigFileHash) delete(sts.Spec.Template.Annotations, podAnnotationLastSetConfigFileHash)
} }
func removeTargetPortsFromSvc(svc *corev1.Service) {
newPorts := make([]corev1.ServicePort, 0)
for _, p := range svc.Spec.Ports {
newPorts = append(newPorts, corev1.ServicePort{Protocol: p.Protocol, Port: p.Port})
}
svc.Spec.Ports = newPorts
}
func removeAuthKeyIfExistsModifier(t *testing.T) func(s *corev1.Secret) { func removeAuthKeyIfExistsModifier(t *testing.T) func(s *corev1.Secret) {
return func(secret *corev1.Secret) { return func(secret *corev1.Secret) {
t.Helper() t.Helper()

@ -174,7 +174,7 @@ require (
dario.cat/mergo v1.0.0 // indirect dario.cat/mergo v1.0.0 // indirect
filippo.io/edwards25519 v1.1.0 // indirect filippo.io/edwards25519 v1.1.0 // indirect
github.com/Abirdcfly/dupword v0.0.14 // indirect github.com/Abirdcfly/dupword v0.0.14 // indirect
github.com/AlekSi/pointer v1.2.0 // indirect github.com/AlekSi/pointer v1.2.0
github.com/Antonboom/errname v0.1.12 // indirect github.com/Antonboom/errname v0.1.12 // indirect
github.com/Antonboom/nilnil v0.1.7 // indirect github.com/Antonboom/nilnil v0.1.7 // indirect
github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c // indirect github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c // indirect

@ -467,21 +467,6 @@ _Appears in:_
| `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.3/#condition-v1-meta) array_ | List of status conditions to indicate the status of the ProxyClass.<br />Known condition types are `ProxyClassReady`. | | | | `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.3/#condition-v1-meta) array_ | List of status conditions to indicate the status of the ProxyClass.<br />Known condition types are `ProxyClassReady`. | | |
#### ProxyClassType
_Underlying type:_ _string_
_Validation:_
- Enum: [egress]
- Type: string
_Appears in:_
- [ProxyGroupSpec](#proxygroupspec)
#### ProxyGroup #### ProxyGroup
@ -537,7 +522,7 @@ _Appears in:_
| Field | Description | Default | Validation | | Field | Description | Default | Validation |
| --- | --- | --- | --- | | --- | --- | --- | --- |
| `type` _[ProxyClassType](#proxyclasstype)_ | Type of the ProxyGroup, either ingress or egress. Each set of proxies<br />managed by a single ProxyGroup definition operate as only ingress or<br />only egress proxies. | | Enum: [egress] <br />Type: string <br /> | | `type` _[ProxyGroupType](#proxygrouptype)_ | Type of the ProxyGroup, either ingress or egress. Each set of proxies<br />managed by a single ProxyGroup definition operate as only ingress or<br />only egress proxies. | | Enum: [egress] <br />Type: string <br /> |
| `tags` _[Tags](#tags)_ | Tags that the Tailscale devices will be tagged with. Defaults to [tag:k8s].<br />If you specify custom tags here, make sure you also make the operator<br />an owner of these tags.<br />See https://tailscale.com/kb/1236/kubernetes-operator/#setting-up-the-kubernetes-operator.<br />Tags cannot be changed once a ProxyGroup device has been created.<br />Tag values must be in form ^tag:[a-zA-Z][a-zA-Z0-9-]*$. | | Pattern: `^tag:[a-zA-Z][a-zA-Z0-9-]*$` <br />Type: string <br /> | | `tags` _[Tags](#tags)_ | Tags that the Tailscale devices will be tagged with. Defaults to [tag:k8s].<br />If you specify custom tags here, make sure you also make the operator<br />an owner of these tags.<br />See https://tailscale.com/kb/1236/kubernetes-operator/#setting-up-the-kubernetes-operator.<br />Tags cannot be changed once a ProxyGroup device has been created.<br />Tag values must be in form ^tag:[a-zA-Z][a-zA-Z0-9-]*$. | | Pattern: `^tag:[a-zA-Z][a-zA-Z0-9-]*$` <br />Type: string <br /> |
| `replicas` _integer_ | Replicas specifies how many replicas to create the StatefulSet with.<br />Defaults to 2. | | | | `replicas` _integer_ | Replicas specifies how many replicas to create the StatefulSet with.<br />Defaults to 2. | | |
| `hostnamePrefix` _[HostnamePrefix](#hostnameprefix)_ | HostnamePrefix is the hostname prefix to use for tailnet devices created<br />by the ProxyGroup. Each device will have the integer number from its<br />StatefulSet pod appended to this prefix to form the full hostname.<br />HostnamePrefix can contain lower case letters, numbers and dashes, it<br />must not start with a dash and must be between 1 and 62 characters long. | | Pattern: `^[a-z0-9][a-z0-9-]{0,61}$` <br />Type: string <br /> | | `hostnamePrefix` _[HostnamePrefix](#hostnameprefix)_ | HostnamePrefix is the hostname prefix to use for tailnet devices created<br />by the ProxyGroup. Each device will have the integer number from its<br />StatefulSet pod appended to this prefix to form the full hostname.<br />HostnamePrefix can contain lower case letters, numbers and dashes, it<br />must not start with a dash and must be between 1 and 62 characters long. | | Pattern: `^[a-z0-9][a-z0-9-]{0,61}$` <br />Type: string <br /> |
@ -561,6 +546,21 @@ _Appears in:_
| `devices` _[TailnetDevice](#tailnetdevice) array_ | List of tailnet devices associated with the ProxyGroup StatefulSet. | | | | `devices` _[TailnetDevice](#tailnetdevice) array_ | List of tailnet devices associated with the ProxyGroup StatefulSet. | | |
#### ProxyGroupType
_Underlying type:_ _string_
_Validation:_
- Enum: [egress]
- Type: string
_Appears in:_
- [ProxyGroupSpec](#proxygroupspec)
#### Recorder #### Recorder

@ -58,6 +58,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&DNSConfigList{}, &DNSConfigList{},
&Recorder{}, &Recorder{},
&RecorderList{}, &RecorderList{},
&ProxyGroup{},
&ProxyGroupList{},
) )
metav1.AddToGroupVersion(scheme, SchemeGroupVersion) metav1.AddToGroupVersion(scheme, SchemeGroupVersion)

@ -172,6 +172,14 @@ type ConditionType string
const ( const (
ConnectorReady ConditionType = `ConnectorReady` ConnectorReady ConditionType = `ConnectorReady`
ProxyClassready ConditionType = `ProxyClassReady` ProxyClassready ConditionType = `ProxyClassReady`
ProxyGroupReady ConditionType = `ProxyGroupReady`
ProxyReady ConditionType = `TailscaleProxyReady` // a Tailscale-specific condition type for corev1.Service ProxyReady ConditionType = `TailscaleProxyReady` // a Tailscale-specific condition type for corev1.Service
RecorderReady ConditionType = `RecorderReady` RecorderReady ConditionType = `RecorderReady`
// EgressSvcValid is set to true if the user configured ExternalName Service for exposing a tailnet target on
// ProxyGroup nodes is valid.
EgressSvcValid ConditionType = `EgressSvcValid`
// EgressSvcConfigured is set to true if the configuration for the egress Service (proxy ConfigMap update,
// EndpointSlice for the Service) has been successfully applied. The Reason for this condition
// contains the name of the ProxyGroup and the hash of the Service ports and the tailnet target.
EgressSvcConfigured ConditionType = `EgressSvcConfigured`
) )

@ -40,7 +40,7 @@ type ProxyGroupSpec struct {
// Type of the ProxyGroup, either ingress or egress. Each set of proxies // Type of the ProxyGroup, either ingress or egress. Each set of proxies
// managed by a single ProxyGroup definition operate as only ingress or // managed by a single ProxyGroup definition operate as only ingress or
// only egress proxies. // only egress proxies.
Type ProxyClassType `json:"type"` Type ProxyGroupType `json:"type"`
// Tags that the Tailscale devices will be tagged with. Defaults to [tag:k8s]. // Tags that the Tailscale devices will be tagged with. Defaults to [tag:k8s].
// If you specify custom tags here, make sure you also make the operator // If you specify custom tags here, make sure you also make the operator
@ -101,10 +101,10 @@ type TailnetDevice struct {
// +kubebuilder:validation:Type=string // +kubebuilder:validation:Type=string
// +kubebuilder:validation:Enum=egress // +kubebuilder:validation:Enum=egress
type ProxyClassType string type ProxyGroupType string
const ( const (
ProxyClassTypeEgress ProxyClassType = "egress" ProxyGroupTypeEgress ProxyGroupType = "egress"
) )
// +kubebuilder:validation:Type=string // +kubebuilder:validation:Type=string

@ -56,6 +56,18 @@ func SetServiceCondition(svc *corev1.Service, conditionType tsapi.ConditionType,
svc.Status.Conditions = conds svc.Status.Conditions = conds
} }
// GetServiceCondition returns Service condition with the specified type, if it exists on the Service.
func GetServiceCondition(svc *corev1.Service, conditionType tsapi.ConditionType) *metav1.Condition {
idx := xslices.IndexFunc(svc.Status.Conditions, func(cond metav1.Condition) bool {
return cond.Type == string(conditionType)
})
if idx == -1 {
return nil
}
return &svc.Status.Conditions[idx]
}
// RemoveServiceCondition will remove condition of the given type if it exists. // RemoveServiceCondition will remove condition of the given type if it exists.
func RemoveServiceCondition(svc *corev1.Service, conditionType tsapi.ConditionType) { func RemoveServiceCondition(svc *corev1.Service, conditionType tsapi.ConditionType) {
svc.Status.Conditions = slices.DeleteFunc(svc.Status.Conditions, func(cond metav1.Condition) bool { svc.Status.Conditions = slices.DeleteFunc(svc.Status.Conditions, func(cond metav1.Condition) bool {
@ -63,6 +75,16 @@ func RemoveServiceCondition(svc *corev1.Service, conditionType tsapi.ConditionTy
}) })
} }
func EgressServiceIsValidAndConfigured(svc *corev1.Service) bool {
for _, typ := range []tsapi.ConditionType{tsapi.EgressSvcValid, tsapi.EgressSvcConfigured} {
cond := GetServiceCondition(svc, typ)
if cond == nil || cond.Status != metav1.ConditionTrue {
return false
}
}
return true
}
// SetRecorderCondition ensures that Recorder status has a condition with the // SetRecorderCondition ensures that Recorder status has a condition with the
// given attributes. LastTransitionTime gets set every time condition's status // given attributes. LastTransitionTime gets set every time condition's status
// changes. // changes.
@ -116,6 +138,17 @@ func ProxyClassIsReady(pc *tsapi.ProxyClass) bool {
return cond.Status == metav1.ConditionTrue && cond.ObservedGeneration == pc.Generation return cond.Status == metav1.ConditionTrue && cond.ObservedGeneration == pc.Generation
} }
func ProxyGroupIsReady(pg *tsapi.ProxyGroup) bool {
idx := xslices.IndexFunc(pg.Status.Conditions, func(cond metav1.Condition) bool {
return cond.Type == string(tsapi.ProxyGroupReady)
})
if idx == -1 {
return false
}
cond := pg.Status.Conditions[idx]
return cond.Status == metav1.ConditionTrue && cond.ObservedGeneration == pg.Generation
}
func DNSCfgIsReady(cfg *tsapi.DNSConfig) bool { func DNSCfgIsReady(cfg *tsapi.DNSConfig) bool {
idx := xslices.IndexFunc(cfg.Status.Conditions, func(cond metav1.Condition) bool { idx := xslices.IndexFunc(cfg.Status.Conditions, func(cond metav1.Condition) bool {
return cond.Type == string(tsapi.NameserverReady) return cond.Type == string(tsapi.NameserverReady)

@ -64,17 +64,17 @@ func (pm *PortMap) UnmarshalText(t []byte) error {
if len(ss) != 3 { if len(ss) != 3 {
return fmt.Errorf("error unmarshalling portmap from JSON, wants a portmap in form <protocol>:<matchPort>:<targetPor>, got %q", tt) return fmt.Errorf("error unmarshalling portmap from JSON, wants a portmap in form <protocol>:<matchPort>:<targetPor>, got %q", tt)
} }
(*pm).Protocol = ss[0] pm.Protocol = ss[0]
matchPort, err := strconv.ParseUint(ss[1], 10, 16) matchPort, err := strconv.ParseUint(ss[1], 10, 16)
if err != nil { if err != nil {
return fmt.Errorf("error converting match port %q to uint16: %w", ss[1], err) return fmt.Errorf("error converting match port %q to uint16: %w", ss[1], err)
} }
(*pm).MatchPort = uint16(matchPort) pm.MatchPort = uint16(matchPort)
targetPort, err := strconv.ParseUint(ss[2], 10, 16) targetPort, err := strconv.ParseUint(ss[2], 10, 16)
if err != nil { if err != nil {
return fmt.Errorf("error converting target port %q to uint16: %w", ss[2], err) return fmt.Errorf("error converting target port %q to uint16: %w", ss[2], err)
} }
(*pm).TargetPort = uint16(targetPort) pm.TargetPort = uint16(targetPort)
return nil return nil
} }

@ -21,4 +21,5 @@ const (
MetricConnectorWithExitNodeCount = "k8s_connector_exitnode_resources" MetricConnectorWithExitNodeCount = "k8s_connector_exitnode_resources"
MetricNameserverCount = "k8s_nameserver_resources" MetricNameserverCount = "k8s_nameserver_resources"
MetricRecorderCount = "k8s_recorder_resources" MetricRecorderCount = "k8s_recorder_resources"
MetricEgressServiceCount = "k8s_egress_service_resources"
) )

Loading…
Cancel
Save