ipn/ipnlocal: listen to serve ports on netmap addrs (#6282)

Updates tailscale/corp#7515

Signed-off-by: Shayne Sweeney <shayne@tailscale.com>
pull/6294/head
shayne 2 years ago committed by GitHub
parent cbc89830c4
commit 74e892cbc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -203,6 +203,8 @@ type LocalBackend struct {
lastServeConfJSON mem.RO // last JSON that was parsed into serveConfig
serveConfig ipn.ServeConfigView // or !Valid if none
serveListeners map[netip.AddrPort]*serveListener // addrPort => serveListener
// statusLock must be held before calling statusChanged.Wait() or
// statusChanged.Broadcast().
statusLock sync.Mutex
@ -3449,58 +3451,24 @@ func (b *LocalBackend) setTCPPortsInterceptedFromNetmapAndPrefsLocked(prefs ipn.
}
}
if b.serveConfig.Valid() {
servePorts := make([]uint16, 0, 3)
b.serveConfig.TCP().Range(func(port uint16, _ ipn.TCPPortHandlerView) bool {
if port > 0 {
handlePorts = append(handlePorts, uint16(port))
servePorts = append(servePorts, uint16(port))
}
return true
})
handlePorts = append(handlePorts, servePorts...)
// don't listen on netmap addresses if we're in userspace mode
if !wgengine.IsNetstack(b.e) {
b.updateServeTCPPortNetMapAddrListenersLocked(servePorts)
}
}
}
}
b.setTCPPortsIntercepted(handlePorts)
}
// SetServeConfig establishes or replaces the current serve config.
func (b *LocalBackend) SetServeConfig(config *ipn.ServeConfig) error {
b.mu.Lock()
defer b.mu.Unlock()
nm := b.netMap
if nm == nil {
return errors.New("netMap is nil")
}
if nm.SelfNode == nil {
return errors.New("netMap SelfNode is nil")
}
profileID := b.pm.CurrentProfile().ID
confKey := ipn.ServeConfigKey(profileID)
var bs []byte
if config != nil {
j, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("encoding serve config: %w", err)
}
bs = j
}
if err := b.store.WriteState(confKey, bs); err != nil {
return fmt.Errorf("writing ServeConfig to StateStore: %w", err)
}
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs())
return nil
}
// ServeConfig provides a view of the current serve mappings.
// If serving is not configured, the returned view is not Valid.
func (b *LocalBackend) ServeConfig() ipn.ServeConfigView {
b.mu.Lock()
defer b.mu.Unlock()
return b.serveConfig
}
// operatorUserName returns the current pref's OperatorUser's name, or the
// empty string if none.
func (b *LocalBackend) operatorUserName() string {

@ -7,6 +7,7 @@ package ipnlocal
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
@ -23,9 +24,14 @@ import (
"sync"
"time"
"golang.org/x/exp/slices"
"tailscale.com/ipn"
"tailscale.com/logtail/backoff"
"tailscale.com/net/netutil"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/types/logger"
"tailscale.com/util/mak"
"tailscale.com/util/strs"
)
@ -37,6 +43,173 @@ type serveHTTPContext struct {
DestPort uint16
}
// serveListener is the state of host-level net.Listen for a specific (Tailscale IP, serve port)
// combination. If there are two TailscaleIPs (v4 and v6) and three ports being served,
// then there will be six of these active and looping in their Run method.
//
// This is not used in userspace-networking mode.
//
// Most serve traffic is intercepted by netstack. This exists purely for connections
// from the machine itself, as that goes via the kernel, so we need to be in the
// kernel's listening/routing tables.
type serveListener struct {
b *LocalBackend
ap netip.AddrPort
ctx context.Context // valid while listener is desired
cancel context.CancelFunc // for ctx, to close listener
logf logger.Logf
bo *backoff.Backoff // for retrying failed Listen calls
closeListener syncs.AtomicValue[func() error] // Listener's Close method, if any
}
func (b *LocalBackend) newServeListener(ctx context.Context, ap netip.AddrPort, logf logger.Logf) *serveListener {
ctx, cancel := context.WithCancel(ctx)
return &serveListener{
b: b,
ap: ap,
ctx: ctx,
cancel: cancel,
logf: logf,
bo: backoff.NewBackoff("serve-listener", logf, 30*time.Second),
}
}
// Close cancels the context and closes the listener, if any.
func (s *serveListener) Close() error {
s.cancel()
if close, ok := s.closeListener.LoadOk(); ok {
s.closeListener.Store(nil)
close()
}
return nil
}
// Run starts a net.Listen for the serveListener's address and port.
// If unable to listen, it retries with exponential backoff.
// Listen is retried until the context is canceled.
func (s *serveListener) Run() {
for {
ln, err := net.Listen("tcp", s.ap.String())
if err != nil {
s.logf("serve failed to listen on %v, backing off: %v", s.ap, err)
s.bo.BackOff(s.ctx, err)
continue
}
s.closeListener.Store(ln.Close)
s.logf("serve listening on %v", s.ap)
err = s.handleServeListenersAccept(ln)
if s.ctx.Err() != nil {
// context canceled, we're done
return
}
if err != nil {
s.logf("serve listener accept error, retrying: %v", err)
}
}
}
// handleServeListenersAccept accepts connections for the Listener.
// Calls incoming handler in a new goroutine for each accepted connection.
func (s *serveListener) handleServeListenersAccept(ln net.Listener) error {
for {
conn, err := ln.Accept()
if err != nil {
return err
}
srcAddr := conn.RemoteAddr().(*net.TCPAddr).AddrPort()
getConn := func() (net.Conn, bool) { return conn, true }
sendRST := func() {
s.b.logf("serve RST for %v", srcAddr)
conn.Close()
}
go s.b.HandleInterceptedTCPConn(s.ap.Port(), srcAddr, getConn, sendRST)
}
}
// updateServeTCPPortNetMapAddrListenersLocked starts a net.Listen for configured
// Serve ports on all the node's addresses.
// Existing Listeners are closed if port no longer in incoming ports list.
//
// b.mu must be held.
func (b *LocalBackend) updateServeTCPPortNetMapAddrListenersLocked(ports []uint16) {
// close existing listeners where port
// is no longer in incoming ports list
for ap, sl := range b.serveListeners {
if !slices.Contains(ports, ap.Port()) {
b.logf("closing listener %v", ap)
sl.Close()
delete(b.serveListeners, ap)
}
}
nm := b.netMap
if nm == nil {
b.logf("netMap is nil")
return
}
if nm.SelfNode == nil {
b.logf("netMap SelfNode is nil")
return
}
for _, a := range nm.Addresses {
for _, p := range ports {
addrPort := netip.AddrPortFrom(a.Addr(), p)
if _, ok := b.serveListeners[addrPort]; ok {
continue // already listening
}
sl := b.newServeListener(context.Background(), addrPort, b.logf)
mak.Set(&b.serveListeners, addrPort, sl)
go sl.Run()
}
}
}
// SetServeConfig establishes or replaces the current serve config.
func (b *LocalBackend) SetServeConfig(config *ipn.ServeConfig) error {
b.mu.Lock()
defer b.mu.Unlock()
nm := b.netMap
if nm == nil {
return errors.New("netMap is nil")
}
if nm.SelfNode == nil {
return errors.New("netMap SelfNode is nil")
}
profileID := b.pm.CurrentProfile().ID
confKey := ipn.ServeConfigKey(profileID)
var bs []byte
if config != nil {
j, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("encoding serve config: %w", err)
}
bs = j
}
if err := b.store.WriteState(confKey, bs); err != nil {
return fmt.Errorf("writing ServeConfig to StateStore: %w", err)
}
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs())
return nil
}
// ServeConfig provides a view of the current serve mappings.
// If serving is not configured, the returned view is not Valid.
func (b *LocalBackend) ServeConfig() ipn.ServeConfigView {
b.mu.Lock()
defer b.mu.Unlock()
return b.serveConfig
}
func (b *LocalBackend) HandleIngressTCPConn(ingressPeer *tailcfg.Node, target ipn.HostPort, srcAddr netip.AddrPort, getConn func() (net.Conn, bool), sendRST func()) {
b.mu.Lock()
sc := b.serveConfig

Loading…
Cancel
Save