mirror of https://github.com/tailscale/tailscale/
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
450 lines
14 KiB
Go
450 lines
14 KiB
Go
2 months ago
|
// Copyright (c) Tailscale Inc & AUTHORS
|
||
|
// SPDX-License-Identifier: BSD-3-Clause
|
||
|
|
||
|
package rsop
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"slices"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"tailscale.com/util/syspolicy/internal/loggerx"
|
||
|
"tailscale.com/util/syspolicy/setting"
|
||
|
|
||
|
"tailscale.com/util/syspolicy/source"
|
||
|
)
|
||
|
|
||
|
// ErrPolicyClosed is returned by [Policy.Reload], [Policy.addSource],
|
||
|
// [Policy.removeSource] and [Policy.replaceSource] if the policy has been closed.
|
||
|
var ErrPolicyClosed = errors.New("effective policy closed")
|
||
|
|
||
|
// The minimum and maximum wait times after detecting a policy change
|
||
|
// before reloading the policy. This only affects policy reloads triggered
|
||
|
// by a change in the underlying [source.Store] and does not impact
|
||
|
// synchronous, caller-initiated reloads, such as when [Policy.Reload] is called.
|
||
|
//
|
||
|
// Policy changes occurring within [policyReloadMinDelay] of each other
|
||
|
// will be batched together, resulting in a single policy reload
|
||
|
// no later than [policyReloadMaxDelay] after the first detected change.
|
||
|
// In other words, the effective policy will be reloaded no more often than once
|
||
|
// every 5 seconds, but at most 15 seconds after an underlying [source.Store]
|
||
|
// has issued a policy change callback.
|
||
|
//
|
||
|
// See [Policy.watchReload].
|
||
|
var (
|
||
|
policyReloadMinDelay = 5 * time.Second
|
||
|
policyReloadMaxDelay = 15 * time.Second
|
||
|
)
|
||
|
|
||
|
// Policy provides access to the current effective [setting.Snapshot] for a given
|
||
|
// scope and allows to reload it from the underlying [source.Store] list. It also allows to
|
||
|
// subscribe and receive a callback whenever the effective [setting.Snapshot] is changed.
|
||
|
//
|
||
|
// It is safe for concurrent use.
|
||
|
type Policy struct {
|
||
|
scope setting.PolicyScope
|
||
|
|
||
|
reloadCh chan reloadRequest // 1-buffered; written to when a policy reload is required
|
||
|
closeCh chan struct{} // closed to signal that the Policy is being closed
|
||
|
doneCh chan struct{} // closed by [Policy.closeInternal]
|
||
|
|
||
|
// effective is the most recent version of the [setting.Snapshot]
|
||
|
// containing policy settings merged from all applicable sources.
|
||
|
effective atomic.Pointer[setting.Snapshot]
|
||
|
|
||
|
changeCallbacks policyChangeCallbacks
|
||
|
|
||
|
mu sync.Mutex
|
||
|
watcherStarted bool // whether [Policy.watchReload] was started
|
||
|
sources source.ReadableSources
|
||
|
closing bool // whether [Policy.Close] was called (even if we're still closing)
|
||
|
}
|
||
|
|
||
|
// newPolicy returns a new [Policy] for the specified [setting.PolicyScope]
|
||
|
// that tracks changes and merges policy settings read from the specified sources.
|
||
|
func newPolicy(scope setting.PolicyScope, sources ...*source.Source) (_ *Policy, err error) {
|
||
|
readableSources := make(source.ReadableSources, 0, len(sources))
|
||
|
defer func() {
|
||
|
if err != nil {
|
||
|
readableSources.Close()
|
||
|
}
|
||
|
}()
|
||
|
for _, s := range sources {
|
||
|
reader, err := s.Reader()
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("failed to get a store reader: %w", err)
|
||
|
}
|
||
|
session, err := reader.OpenSession()
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("failed to open a reading session: %w", err)
|
||
|
}
|
||
|
readableSources = append(readableSources, source.ReadableSource{Source: s, ReadingSession: session})
|
||
|
}
|
||
|
|
||
|
// Sort policy sources by their precedence from lower to higher.
|
||
|
// For example, {UserPolicy},{ProfilePolicy},{DevicePolicy}.
|
||
|
readableSources.StableSort()
|
||
|
|
||
|
p := &Policy{
|
||
|
scope: scope,
|
||
|
sources: readableSources,
|
||
|
reloadCh: make(chan reloadRequest, 1),
|
||
|
closeCh: make(chan struct{}),
|
||
|
doneCh: make(chan struct{}),
|
||
|
}
|
||
|
if _, err := p.reloadNow(false); err != nil {
|
||
|
p.Close()
|
||
|
return nil, err
|
||
|
}
|
||
|
p.startWatchReloadIfNeeded()
|
||
|
return p, nil
|
||
|
}
|
||
|
|
||
|
// IsValid reports whether p is in a valid state and has not been closed.
|
||
|
//
|
||
|
// Since p's state can be changed by other goroutines at any time, this should
|
||
|
// only be used as an optimization.
|
||
|
func (p *Policy) IsValid() bool {
|
||
|
select {
|
||
|
case <-p.closeCh:
|
||
|
return false
|
||
|
default:
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Scope returns the [setting.PolicyScope] that this policy applies to.
|
||
|
func (p *Policy) Scope() setting.PolicyScope {
|
||
|
return p.scope
|
||
|
}
|
||
|
|
||
|
// Get returns the effective [setting.Snapshot].
|
||
|
func (p *Policy) Get() *setting.Snapshot {
|
||
|
return p.effective.Load()
|
||
|
}
|
||
|
|
||
|
// RegisterChangeCallback adds a function to be called whenever the effective
|
||
|
// policy changes. The returned function can be used to unregister the callback.
|
||
|
func (p *Policy) RegisterChangeCallback(callback PolicyChangeCallback) (unregister func()) {
|
||
|
return p.changeCallbacks.Register(callback)
|
||
|
}
|
||
|
|
||
|
// Reload synchronously re-reads policy settings from the underlying list of policy sources,
|
||
|
// constructing a new merged [setting.Snapshot] even if the policy remains unchanged.
|
||
|
// In most scenarios, there's no need to re-read the policy manually.
|
||
|
// Instead, it is recommended to register a policy change callback, or to use
|
||
|
// the most recent [setting.Snapshot] returned by the [Policy.Get] method.
|
||
|
//
|
||
|
// It must not be called with p.mu held.
|
||
|
func (p *Policy) Reload() (*setting.Snapshot, error) {
|
||
|
return p.reload(true)
|
||
|
}
|
||
|
|
||
|
// reload is like Reload, but allows to specify whether to re-read policy settings
|
||
|
// from unchanged policy sources.
|
||
|
//
|
||
|
// It must not be called with p.mu held.
|
||
|
func (p *Policy) reload(force bool) (*setting.Snapshot, error) {
|
||
|
if !p.startWatchReloadIfNeeded() {
|
||
|
return p.Get(), nil
|
||
|
}
|
||
|
|
||
|
respCh := make(chan reloadResponse, 1)
|
||
|
select {
|
||
|
case p.reloadCh <- reloadRequest{force: force, respCh: respCh}:
|
||
|
// continue
|
||
|
case <-p.closeCh:
|
||
|
return nil, ErrPolicyClosed
|
||
|
}
|
||
|
select {
|
||
|
case resp := <-respCh:
|
||
|
return resp.policy, resp.err
|
||
|
case <-p.closeCh:
|
||
|
return nil, ErrPolicyClosed
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// reloadAsync requests an asynchronous background policy reload.
|
||
|
// The policy will be reloaded no later than in [policyReloadMaxDelay].
|
||
|
//
|
||
|
// It must not be called with p.mu held.
|
||
|
func (p *Policy) reloadAsync() {
|
||
|
if !p.startWatchReloadIfNeeded() {
|
||
|
return
|
||
|
}
|
||
|
select {
|
||
|
case p.reloadCh <- reloadRequest{}:
|
||
|
// Sent.
|
||
|
default:
|
||
|
// A reload request is already en route.
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// reloadNow loads and merges policies from all sources, updating the effective policy.
|
||
|
// If the force parameter is true, it forcibly reloads policies
|
||
|
// from the underlying policy store, even if no policy changes were detected.
|
||
|
//
|
||
|
// Except for the initial policy reload during the [Policy] creation,
|
||
|
// this method should only be called from the [Policy.watchReload] goroutine.
|
||
|
func (p *Policy) reloadNow(force bool) (*setting.Snapshot, error) {
|
||
|
new, err := p.readAndMerge(force)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
old := p.effective.Swap(new)
|
||
|
// A nil old value indicates the initial policy load rather than a policy change.
|
||
|
// Additionally, we should not invoke the policy change callbacks unless the
|
||
|
// policy items have actually changed.
|
||
|
if old != nil && !old.EqualItems(new) {
|
||
|
snapshots := Change[*setting.Snapshot]{New: new, Old: old}
|
||
|
p.changeCallbacks.Invoke(snapshots)
|
||
|
}
|
||
|
return new, nil
|
||
|
}
|
||
|
|
||
|
// Done returns a channel that is closed when the [Policy] is closed.
|
||
|
func (p *Policy) Done() <-chan struct{} {
|
||
|
return p.doneCh
|
||
|
}
|
||
|
|
||
|
// readAndMerge reads and merges policy settings from all applicable sources,
|
||
|
// returning a [setting.Snapshot] with the merged result.
|
||
|
// If the force parameter is true, it re-reads policy settings from each source
|
||
|
// even if no policy change was observed, and returns an error if the read
|
||
|
// operation fails.
|
||
|
func (p *Policy) readAndMerge(force bool) (*setting.Snapshot, error) {
|
||
|
p.mu.Lock()
|
||
|
defer p.mu.Unlock()
|
||
|
// Start with an empty policy in the target scope.
|
||
|
effective := setting.NewSnapshot(nil, setting.SummaryWith(p.scope))
|
||
|
// Then merge policy settings from all sources.
|
||
|
// Policy sources with the highest precedence (e.g., the device policy) are merged last,
|
||
|
// overriding any conflicting policy settings with lower precedence.
|
||
|
for _, s := range p.sources {
|
||
|
var policy *setting.Snapshot
|
||
|
if force {
|
||
|
var err error
|
||
|
if policy, err = s.ReadSettings(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
} else {
|
||
|
policy = s.GetSettings()
|
||
|
}
|
||
|
effective = setting.MergeSnapshots(effective, policy)
|
||
|
}
|
||
|
return effective, nil
|
||
|
}
|
||
|
|
||
|
// addSource adds the specified source to the list of sources used by p,
|
||
|
// and triggers a synchronous policy refresh. It returns an error
|
||
|
// if the source is not a valid source for this effective policy,
|
||
|
// or if the effective policy is being closed,
|
||
|
// or if policy refresh fails with an error.
|
||
|
func (p *Policy) addSource(source *source.Source) error {
|
||
|
return p.applySourcesChange(source, nil)
|
||
|
}
|
||
|
|
||
|
// removeSource removes the specified source from the list of sources used by p,
|
||
|
// and triggers a synchronous policy refresh. It returns an error if the
|
||
|
// effective policy is being closed, or if policy refresh fails with an error.
|
||
|
func (p *Policy) removeSource(source *source.Source) error {
|
||
|
return p.applySourcesChange(nil, source)
|
||
|
}
|
||
|
|
||
|
// replaceSource replaces the old source with the new source atomically,
|
||
|
// and triggers a synchronous policy refresh. It returns an error
|
||
|
// if the source is not a valid source for this effective policy,
|
||
|
// or if the effective policy is being closed,
|
||
|
// or if policy refresh fails with an error.
|
||
|
func (p *Policy) replaceSource(old, new *source.Source) error {
|
||
|
return p.applySourcesChange(new, old)
|
||
|
}
|
||
|
|
||
|
func (p *Policy) applySourcesChange(toAdd, toRemove *source.Source) error {
|
||
|
if toAdd == toRemove {
|
||
|
return nil
|
||
|
}
|
||
|
if toAdd != nil && !toAdd.Scope().Contains(p.scope) {
|
||
|
return errors.New("scope mismatch")
|
||
|
}
|
||
|
|
||
|
changed, err := func() (changed bool, err error) {
|
||
|
p.mu.Lock()
|
||
|
defer p.mu.Unlock()
|
||
|
if toAdd != nil && !p.sources.Contains(toAdd) {
|
||
|
reader, err := toAdd.Reader()
|
||
|
if err != nil {
|
||
|
return false, fmt.Errorf("failed to get a store reader: %w", err)
|
||
|
}
|
||
|
session, err := reader.OpenSession()
|
||
|
if err != nil {
|
||
|
return false, fmt.Errorf("failed to open a reading session: %w", err)
|
||
|
}
|
||
|
|
||
|
addAt := p.sources.InsertionIndexOf(toAdd)
|
||
|
toAdd := source.ReadableSource{
|
||
|
Source: toAdd,
|
||
|
ReadingSession: session,
|
||
|
}
|
||
|
p.sources = slices.Insert(p.sources, addAt, toAdd)
|
||
|
go p.watchPolicyChanges(toAdd)
|
||
|
changed = true
|
||
|
}
|
||
|
if toRemove != nil {
|
||
|
if deleteAt := p.sources.IndexOf(toRemove); deleteAt != -1 {
|
||
|
p.sources.DeleteAt(deleteAt)
|
||
|
changed = true
|
||
|
}
|
||
|
}
|
||
|
return changed, nil
|
||
|
}()
|
||
|
if changed {
|
||
|
_, err = p.reload(false)
|
||
|
}
|
||
|
return err // may be nil or non-nil
|
||
|
}
|
||
|
|
||
|
func (p *Policy) watchPolicyChanges(s source.ReadableSource) {
|
||
|
for {
|
||
|
select {
|
||
|
case _, ok := <-s.ReadingSession.PolicyChanged():
|
||
|
if !ok {
|
||
|
p.mu.Lock()
|
||
|
abruptlyClosed := slices.Contains(p.sources, s)
|
||
|
p.mu.Unlock()
|
||
|
if abruptlyClosed {
|
||
|
// The underlying [source.Source] was closed abruptly without
|
||
|
// being properly removed or replaced by another policy source.
|
||
|
// We can't keep this [Policy] up to date, so we should close it.
|
||
|
p.Close()
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
// The PolicyChanged channel was signaled.
|
||
|
// Request an asynchronous policy reload.
|
||
|
p.reloadAsync()
|
||
|
case <-p.closeCh:
|
||
|
// The [Policy] is being closed.
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// startWatchReloadIfNeeded starts [Policy.watchReload] in a new goroutine
|
||
|
// if the list of policy sources is not empty, it hasn't been started yet,
|
||
|
// and the [Policy] is not being closed.
|
||
|
// It reports whether [Policy.watchReload] has ever been started.
|
||
|
//
|
||
|
// It must not be called with p.mu held.
|
||
|
func (p *Policy) startWatchReloadIfNeeded() bool {
|
||
|
p.mu.Lock()
|
||
|
defer p.mu.Unlock()
|
||
|
if len(p.sources) != 0 && !p.watcherStarted && !p.closing {
|
||
|
go p.watchReload()
|
||
|
for i := range p.sources {
|
||
|
go p.watchPolicyChanges(p.sources[i])
|
||
|
}
|
||
|
p.watcherStarted = true
|
||
|
}
|
||
|
return p.watcherStarted
|
||
|
}
|
||
|
|
||
|
// reloadRequest describes a policy reload request.
|
||
|
type reloadRequest struct {
|
||
|
// force policy reload regardless of whether a policy change was detected.
|
||
|
force bool
|
||
|
// respCh is an optional channel. If non-nil, it makes the reload request
|
||
|
// synchronous and receives the result.
|
||
|
respCh chan<- reloadResponse
|
||
|
}
|
||
|
|
||
|
// reloadResponse is a result of a synchronous policy reload.
|
||
|
type reloadResponse struct {
|
||
|
policy *setting.Snapshot
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
// watchReload processes incoming synchronous and asynchronous policy reload requests.
|
||
|
//
|
||
|
// Synchronous requests (with a non-nil respCh) are served immediately.
|
||
|
//
|
||
|
// Asynchronous requests are debounced and throttled: they are executed at least
|
||
|
// [policyReloadMinDelay] after the last request, but no later than [policyReloadMaxDelay]
|
||
|
// after the first request in a batch.
|
||
|
func (p *Policy) watchReload() {
|
||
|
defer p.closeInternal()
|
||
|
|
||
|
force := false // whether a forced refresh was requested
|
||
|
var delayCh, timeoutCh <-chan time.Time
|
||
|
reload := func(respCh chan<- reloadResponse) {
|
||
|
delayCh, timeoutCh = nil, nil
|
||
|
policy, err := p.reloadNow(force)
|
||
|
if err != nil {
|
||
|
loggerx.Errorf("%v policy reload failed: %v\n", p.scope, err)
|
||
|
}
|
||
|
if respCh != nil {
|
||
|
respCh <- reloadResponse{policy: policy, err: err}
|
||
|
}
|
||
|
force = false
|
||
|
}
|
||
|
|
||
|
loop:
|
||
|
for {
|
||
|
select {
|
||
|
case req := <-p.reloadCh:
|
||
|
if req.force {
|
||
|
force = true
|
||
|
}
|
||
|
if req.respCh != nil {
|
||
|
reload(req.respCh)
|
||
|
continue
|
||
|
}
|
||
|
if delayCh == nil {
|
||
|
timeoutCh = time.After(policyReloadMinDelay)
|
||
|
}
|
||
|
delayCh = time.After(policyReloadMaxDelay)
|
||
|
case <-delayCh:
|
||
|
reload(nil)
|
||
|
case <-timeoutCh:
|
||
|
reload(nil)
|
||
|
case <-p.closeCh:
|
||
|
break loop
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (p *Policy) closeInternal() {
|
||
|
p.mu.Lock()
|
||
|
defer p.mu.Unlock()
|
||
|
p.sources.Close()
|
||
|
p.changeCallbacks.Close()
|
||
|
close(p.doneCh)
|
||
|
deletePolicy(p)
|
||
|
}
|
||
|
|
||
|
// Close initiates the closing of the policy.
|
||
|
// The [Policy.Done] channel is closed to signal that the operation has been completed.
|
||
|
func (p *Policy) Close() {
|
||
|
p.mu.Lock()
|
||
|
alreadyClosing := p.closing
|
||
|
watcherStarted := p.watcherStarted
|
||
|
p.closing = true
|
||
|
p.mu.Unlock()
|
||
|
|
||
|
if alreadyClosing {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
close(p.closeCh)
|
||
|
if !watcherStarted {
|
||
|
// Normally, closing p.closeCh signals [Policy.watchReload] to exit,
|
||
|
// and [Policy.closeInternal] performs the actual closing when
|
||
|
// [Policy.watchReload] returns. However, if the watcher was never
|
||
|
// started, we need to call [Policy.closeInternal] manually.
|
||
|
go p.closeInternal()
|
||
|
}
|
||
|
}
|