cmd/k8s-operator: refactor reconcile loop, un-plumbing reconcile.Result.

We used to need to do timed requeues in a few places in the reconcile logic,
and the easiest way to do that was to plumb reconcile.Result return values
around. But now we're purely event-driven, so the only thing we care about
is whether or not an error occurred.

Incidentally also fix a very minor bug where headless services would get
completely ignored, rather than reconciled into the correct state. This
shouldn't matter in practice because you can't transition from a headful
to a headless service without a deletion, but for consistency let's avoid
having a path that takes no definite action if a service of interest does
exist.

Updates #502.

Signed-off-by: David Anderson <danderson@tailscale.com>
pull/6746/head
David Anderson 2 years ago committed by Dave Anderson
parent da53b1347b
commit a7ab3429b6

@ -220,15 +220,37 @@ func childResourceLabels(parent *corev1.Service) map[string]string {
} }
} }
// cleanupIfRequired removes any existing resources related to svc. func (a *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
logger := a.logger.With("service-ns", req.Namespace, "service-name", req.Name)
logger.Debugf("starting reconcile")
defer logger.Debugf("reconcile finished")
svc := new(corev1.Service)
err = a.Get(ctx, req.NamespacedName, svc)
if apierrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
logger.Debugf("service not found, assuming it was deleted")
return reconcile.Result{}, nil
} else if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to get svc: %w", err)
}
if !svc.DeletionTimestamp.IsZero() || !a.shouldExpose(svc) {
logger.Debugf("service is being deleted or should not be exposed, cleaning up")
return reconcile.Result{}, a.maybeCleanup(ctx, logger, svc)
}
return reconcile.Result{}, a.maybeProvision(ctx, logger, svc)
}
// maybeCleanup removes any existing resources related to serving svc over tailscale.
// //
// This function is responsible for removing the finalizer from the service, // This function is responsible for removing the finalizer from the service,
// once all associated resources are gone. // once all associated resources are gone.
func (a *ServiceReconciler) cleanupIfRequired(ctx context.Context, logger *zap.SugaredLogger, svc *corev1.Service) (reconcile.Result, error) { func (a *ServiceReconciler) maybeCleanup(ctx context.Context, logger *zap.SugaredLogger, svc *corev1.Service) error {
ix := slices.Index(svc.Finalizers, FinalizerName) ix := slices.Index(svc.Finalizers, FinalizerName)
if ix < 0 { if ix < 0 {
logger.Debugf("no finalizer, nothing to do") logger.Debugf("no finalizer, nothing to do")
return reconcile.Result{}, nil return nil
} }
ml := childResourceLabels(svc) ml := childResourceLabels(svc)
@ -241,32 +263,32 @@ func (a *ServiceReconciler) cleanupIfRequired(ctx context.Context, logger *zap.S
// should be removed. // should be removed.
sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, ml) sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, ml)
if err != nil { if err != nil {
return reconcile.Result{}, fmt.Errorf("getting statefulset: %w", err) return fmt.Errorf("getting statefulset: %w", err)
} }
if sts != nil { if sts != nil {
if !sts.GetDeletionTimestamp().IsZero() { if !sts.GetDeletionTimestamp().IsZero() {
// Deletion in progress, check again later. We'll get another // Deletion in progress, check again later. We'll get another
// notification when the deletion is complete. // notification when the deletion is complete.
logger.Debugf("waiting for statefulset %s/%s deletion", sts.GetNamespace(), sts.GetName()) logger.Debugf("waiting for statefulset %s/%s deletion", sts.GetNamespace(), sts.GetName())
return reconcile.Result{}, nil return nil
} }
err := a.DeleteAllOf(ctx, &appsv1.StatefulSet{}, client.InNamespace(a.operatorNamespace), client.MatchingLabels(ml), client.PropagationPolicy(metav1.DeletePropagationForeground)) err := a.DeleteAllOf(ctx, &appsv1.StatefulSet{}, client.InNamespace(a.operatorNamespace), client.MatchingLabels(ml), client.PropagationPolicy(metav1.DeletePropagationForeground))
if err != nil { if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting statefulset: %w", err) return fmt.Errorf("deleting statefulset: %w", err)
} }
logger.Debugf("started deletion of statefulset %s/%s", sts.GetNamespace(), sts.GetName()) logger.Debugf("started deletion of statefulset %s/%s", sts.GetNamespace(), sts.GetName())
return reconcile.Result{}, nil return nil
} }
id, _, err := a.getDeviceInfo(ctx, svc) id, _, err := a.getDeviceInfo(ctx, svc)
if err != nil { if err != nil {
return reconcile.Result{}, fmt.Errorf("getting device info: %w", err) return fmt.Errorf("getting device info: %w", err)
} }
if id != "" { if id != "" {
// TODO: handle case where the device is already deleted, but the secret // TODO: handle case where the device is already deleted, but the secret
// is still around. // is still around.
if err := a.tsClient.DeleteDevice(ctx, id); err != nil { if err := a.tsClient.DeleteDevice(ctx, id); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting device: %w", err) return fmt.Errorf("deleting device: %w", err)
} }
} }
@ -276,13 +298,13 @@ func (a *ServiceReconciler) cleanupIfRequired(ctx context.Context, logger *zap.S
} }
for _, typ := range types { for _, typ := range types {
if err := a.DeleteAllOf(ctx, typ, client.InNamespace(a.operatorNamespace), client.MatchingLabels(ml)); err != nil { if err := a.DeleteAllOf(ctx, typ, client.InNamespace(a.operatorNamespace), client.MatchingLabels(ml)); err != nil {
return reconcile.Result{}, err return err
} }
} }
svc.Finalizers = append(svc.Finalizers[:ix], svc.Finalizers[ix+1:]...) svc.Finalizers = append(svc.Finalizers[:ix], svc.Finalizers[ix+1:]...)
if err := a.Update(ctx, svc); err != nil { if err := a.Update(ctx, svc); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to remove finalizer: %w", err) return fmt.Errorf("failed to remove finalizer: %w", err)
} }
// Unlike most log entries in the reconcile loop, this will get printed // Unlike most log entries in the reconcile loop, this will get printed
@ -290,49 +312,15 @@ func (a *ServiceReconciler) cleanupIfRequired(ctx context.Context, logger *zap.S
// cleanup removes the tailscale finalizer, which will make all future // cleanup removes the tailscale finalizer, which will make all future
// reconciles exit early. // reconciles exit early.
logger.Infof("unexposed service from tailnet") logger.Infof("unexposed service from tailnet")
return reconcile.Result{}, nil return nil
}
func (a *ServiceReconciler) hasLoadBalancerClass(svc *corev1.Service) bool {
return svc != nil &&
svc.Spec.Type == corev1.ServiceTypeLoadBalancer &&
svc.Spec.LoadBalancerClass != nil &&
*svc.Spec.LoadBalancerClass == "tailscale"
}
func (a *ServiceReconciler) hasAnnotation(svc *corev1.Service) bool {
return svc != nil &&
svc.Annotations[AnnotationExpose] == "true"
}
func (a *ServiceReconciler) shouldExpose(svc *corev1.Service) bool {
return a.hasLoadBalancerClass(svc) || a.hasAnnotation(svc)
} }
func (a *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) { // maybeProvision ensures that svc is exposed over tailscale, taking any actions
logger := a.logger.With("service-ns", req.Namespace, "service-name", req.Name) // necessary to reach that state.
logger.Debugf("starting reconcile") //
defer logger.Debugf("reconcile finished") // This function adds a finalizer to svc, ensuring that we can handle orderly
// deprovisioning later.
svc := new(corev1.Service) func (a *ServiceReconciler) maybeProvision(ctx context.Context, logger *zap.SugaredLogger, svc *corev1.Service) error {
err = a.Get(ctx, req.NamespacedName, svc)
if err != nil {
if apierrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
logger.Debugf("service not found, assuming it was deleted")
return reconcile.Result{}, nil
}
return reconcile.Result{}, fmt.Errorf("failed to get svc: %w", err)
}
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
logger.Debugf("service is headless, nothing to do")
return reconcile.Result{}, nil
}
if !svc.DeletionTimestamp.IsZero() || !a.shouldExpose(svc) {
logger.Debugf("service is being deleted or should not be exposed, cleaning up")
return a.cleanupIfRequired(ctx, logger, svc)
}
if !slices.Contains(svc.Finalizers, FinalizerName) { if !slices.Contains(svc.Finalizers, FinalizerName) {
// This log line is printed exactly once during initial provisioning, // This log line is printed exactly once during initial provisioning,
// because once the finalizer is in place this block gets skipped. So, // because once the finalizer is in place this block gets skipped. So,
@ -341,14 +329,14 @@ func (a *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request
logger.Infof("exposing service over tailscale") logger.Infof("exposing service over tailscale")
svc.Finalizers = append(svc.Finalizers, FinalizerName) svc.Finalizers = append(svc.Finalizers, FinalizerName)
if err := a.Update(ctx, svc); err != nil { if err := a.Update(ctx, svc); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to add finalizer: %w", err) return fmt.Errorf("failed to add finalizer: %w", err)
} }
} }
// Do full reconcile. // Do full reconcile.
hsvc, err := a.reconcileHeadlessService(ctx, logger, svc) hsvc, err := a.reconcileHeadlessService(ctx, logger, svc)
if err != nil { if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to reconcile headless service: %w", err) return fmt.Errorf("failed to reconcile headless service: %w", err)
} }
tags := a.defaultTags tags := a.defaultTags
@ -357,30 +345,30 @@ func (a *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request
} }
secretName, err := a.createOrGetSecret(ctx, logger, svc, hsvc, tags) secretName, err := a.createOrGetSecret(ctx, logger, svc, hsvc, tags)
if err != nil { if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to create or get API key secret: %w", err) return fmt.Errorf("failed to create or get API key secret: %w", err)
} }
_, err = a.reconcileSTS(ctx, logger, svc, hsvc, secretName) _, err = a.reconcileSTS(ctx, logger, svc, hsvc, secretName)
if err != nil { if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to reconcile statefulset: %w", err) return fmt.Errorf("failed to reconcile statefulset: %w", err)
} }
if !a.hasLoadBalancerClass(svc) { if !a.hasLoadBalancerClass(svc) {
logger.Debugf("service is not a LoadBalancer, so not updating ingress") logger.Debugf("service is not a LoadBalancer, so not updating ingress")
return reconcile.Result{}, nil return nil
} }
_, tsHost, err := a.getDeviceInfo(ctx, svc) _, tsHost, err := a.getDeviceInfo(ctx, svc)
if err != nil { if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to get device ID: %w", err) return fmt.Errorf("failed to get device ID: %w", err)
} }
if tsHost == "" { if tsHost == "" {
logger.Debugf("no Tailscale hostname known yet, waiting for proxy pod to finish auth") logger.Debugf("no Tailscale hostname known yet, waiting for proxy pod to finish auth")
// No hostname yet. Wait for the proxy pod to auth. // No hostname yet. Wait for the proxy pod to auth.
svc.Status.LoadBalancer.Ingress = nil svc.Status.LoadBalancer.Ingress = nil
if err := a.Status().Update(ctx, svc); err != nil { if err := a.Status().Update(ctx, svc); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update service status: %w", err) return fmt.Errorf("failed to update service status: %w", err)
} }
return reconcile.Result{}, nil return nil
} }
logger.Debugf("setting ingress hostname to %q", tsHost) logger.Debugf("setting ingress hostname to %q", tsHost)
@ -390,9 +378,31 @@ func (a *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request
}, },
} }
if err := a.Status().Update(ctx, svc); err != nil { if err := a.Status().Update(ctx, svc); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update service status: %w", err) return fmt.Errorf("failed to update service status: %w", err)
}
return nil
}
func (a *ServiceReconciler) shouldExpose(svc *corev1.Service) bool {
// Headless services can't be exposed, since there is no ClusterIP to
// forward to.
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
return false
} }
return reconcile.Result{}, nil
return a.hasLoadBalancerClass(svc) || a.hasAnnotation(svc)
}
func (a *ServiceReconciler) hasLoadBalancerClass(svc *corev1.Service) bool {
return svc != nil &&
svc.Spec.Type == corev1.ServiceTypeLoadBalancer &&
svc.Spec.LoadBalancerClass != nil &&
*svc.Spec.LoadBalancerClass == "tailscale"
}
func (a *ServiceReconciler) hasAnnotation(svc *corev1.Service) bool {
return svc != nil &&
svc.Annotations[AnnotationExpose] == "true"
} }
func (a *ServiceReconciler) reconcileHeadlessService(ctx context.Context, logger *zap.SugaredLogger, svc *corev1.Service) (*corev1.Service, error) { func (a *ServiceReconciler) reconcileHeadlessService(ctx context.Context, logger *zap.SugaredLogger, svc *corev1.Service) (*corev1.Service, error) {

Loading…
Cancel
Save