@ -6,8 +6,10 @@ package tsdns
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"math/rand"
"net"
@ -16,6 +18,7 @@ import (
"time"
"inet.af/netaddr"
"tailscale.com/logtail/backoff"
"tailscale.com/types/logger"
)
@ -109,7 +112,7 @@ type forwarder struct {
// conns are the UDP connections used for forwarding.
// A random one is selected for each request, regardless of the target upstream.
conns [ ] * net. UDP Conn
conns [ ] * fwd Conn
mu sync . Mutex
// upstreams are the nameserver addresses that should be used for forwarding.
@ -127,24 +130,16 @@ func newForwarder(logf logger.Logf, responses chan Packet) *forwarder {
logf : logger . WithPrefix ( logf , "forward: " ) ,
responses : responses ,
closed : make ( chan struct { } ) ,
conns : make ( [ ] * net. UDP Conn, connCount ) ,
conns : make ( [ ] * fwd Conn, connCount ) ,
txMap : make ( map [ txid ] forwardingRecord ) ,
}
}
func ( f * forwarder ) Start ( ) error {
var err error
for i := range f . conns {
f . conns [ i ] , err = net . ListenUDP ( "udp" , nil )
if err != nil {
return err
}
}
f . wg . Add ( connCount + 1 )
for idx , conn := range f . conns {
go f . recv ( uint16 ( idx ) , conn )
for idx := range f . conns {
f . conns [ idx ] = newFwdConn ( f . logf , idx )
go f . recv ( f . conns [ idx ] )
}
go f . cleanMap ( )
@ -161,14 +156,10 @@ func (f *forwarder) Close() {
close ( f . closed )
for _ , conn := range f . conns {
conn . SetDeadline ( aLongTimeAgo )
conn . close ( )
}
f . wg . Wait ( )
for _ , conn := range f . conns {
conn . Close ( )
}
}
func ( f * forwarder ) setUpstreams ( upstreams [ ] net . Addr ) {
@ -177,31 +168,27 @@ func (f *forwarder) setUpstreams(upstreams []net.Addr) {
f . mu . Unlock ( )
}
// send sends packet to dst. It is best effort.
func ( f * forwarder ) send ( packet [ ] byte , dst net . Addr ) {
connIdx := rand . Intn ( connCount )
conn := f . conns [ connIdx ]
_ , err := conn . WriteTo ( packet , dst )
// Do not log errors due to expired deadline.
if err != nil && ! errors . Is ( err , os . ErrDeadlineExceeded ) {
f . logf ( "send: %v" , err )
}
conn . send ( packet , dst )
}
func ( f * forwarder ) recv ( conn Idx uint16 , conn * net . UDP Conn) {
func ( f * forwarder ) recv ( conn * fwdConn ) {
defer f . wg . Done ( )
for {
out := make ( [ ] byte , maxResponseBytes )
n , err := conn . Read ( out )
if err != nil {
// Do not log errors due to expired deadline.
if ! errors . Is ( err , os . ErrDeadlineExceeded ) {
f . logf ( "recv: %v" , err )
}
select {
case <- f . closed :
return
default :
}
out := make ( [ ] byte , maxResponseBytes )
n := conn . read ( out )
if n == 0 {
continue
}
if n < headerBytes {
f . logf ( "recv: packet too small (%d bytes)" , n )
}
@ -285,3 +272,194 @@ func (f *forwarder) forward(query Packet) error {
return nil
}
// A fwdConn manages a single connection used to forward DNS requests.
// Net link changes can cause a *net.UDPConn to become permanently unusable, particularly on macOS.
// fwdConn detects such situations and transparently creates new connections.
type fwdConn struct {
// logf allows a fwdConn to log.
logf logger . Logf
// wg tracks the number of outstanding conn.Read and conn.Write calls.
wg sync . WaitGroup
// change allows calls to read to block until a the network connection has been replaced.
change * sync . Cond
// mu protects fields that follow it; it is also change's Locker.
mu sync . Mutex
// closed tracks whether fwdConn has been permanently closed.
closed bool
// conn is the current active connection.
conn * net . UDPConn
}
func newFwdConn ( logf logger . Logf , idx int ) * fwdConn {
c := new ( fwdConn )
c . logf = logger . WithPrefix ( logf , fmt . Sprintf ( "fwdConn %d: " , idx ) )
c . change = sync . NewCond ( & c . mu )
// c.conn is created lazily in send
return c
}
// send sends packet to dst using c's connection.
// It is best effort. It is UDP, after all. Failures are logged.
func ( c * fwdConn ) send ( packet [ ] byte , dst net . Addr ) {
var b * backoff . Backoff // lazily initialized, since it is not needed in the common case
backOff := func ( err error ) {
if b == nil {
b = backoff . NewBackoff ( "tsdns-fwdConn-send" , c . logf , 30 * time . Second )
}
b . BackOff ( context . Background ( ) , err )
}
for {
// Gather the current connection.
// We can't hold the lock while we call WriteTo.
c . mu . Lock ( )
conn := c . conn
closed := c . closed
if closed {
c . mu . Unlock ( )
return
}
if conn == nil {
c . reconnectLocked ( )
c . mu . Unlock ( )
continue
}
c . mu . Unlock ( )
c . wg . Add ( 1 )
_ , err := conn . WriteTo ( packet , dst )
c . wg . Done ( )
if err == nil {
// Success
return
}
if errors . Is ( err , os . ErrDeadlineExceeded ) {
// We intentionally closed this connection.
// It has been replaced by a new connection. Try again.
continue
}
// Something else went wrong.
// We have three choices here: try again, give up, or create a new connection.
var opErr * net . OpError
if ! errors . As ( err , & opErr ) {
// Weird. All errors from the net package should be *net.OpError. Bail.
c . logf ( "send: non-*net.OpErr %v (%T)" , err , err )
return
}
if opErr . Temporary ( ) || opErr . Timeout ( ) {
// I doubt that either of these can happen (this is UDP),
// but go ahead and try again.
backOff ( err )
continue
}
if networkIsDown ( err ) {
// Fail.
c . logf ( "send: network is down" )
return
}
if networkIsUnreachable ( err ) {
// This can be caused by a link change.
// Replace the existing connection with a new one.
c . mu . Lock ( )
// It's possible that multiple senders discovered simultaneously
// that the network is unreachable. Avoid reconnecting multiple times:
// Only reconnect if the current connection is the one that we
// discovered to be problematic.
if c . conn == conn {
backOff ( err )
c . reconnectLocked ( )
}
c . mu . Unlock ( )
// Try again with our new network connection.
continue
}
// Unrecognized error. Fail.
c . logf ( "send: unrecognized error: %v" , err )
return
}
}
// read waits for a response from c's connection.
// It returns the number of bytes read, which may be 0
// in case of an error or a closed connection.
func ( c * fwdConn ) read ( out [ ] byte ) int {
for {
// Gather the current connection.
// We can't hold the lock while we call Read.
c . mu . Lock ( )
conn := c . conn
closed := c . closed
if closed {
c . mu . Unlock ( )
return 0
}
if conn == nil {
// There is no current connection.
// Wait for the connection to change, then try again.
c . change . Wait ( )
c . mu . Unlock ( )
continue
}
c . mu . Unlock ( )
c . wg . Add ( 1 )
n , err := conn . Read ( out )
c . wg . Done ( )
if err == nil {
// Success.
return n
}
if errors . Is ( err , os . ErrDeadlineExceeded ) {
// We intentionally closed this connection.
// It has been replaced by a new connection. Try again.
continue
}
c . logf ( "read: unrecognized error: %v" , err )
return 0
}
}
// reconnectLocked replaces the current connection with a new one.
// c.mu must be locked.
func ( c * fwdConn ) reconnectLocked ( ) {
c . closeConnLocked ( )
// Make a new connection.
conn , err := net . ListenUDP ( "udp" , nil )
if err != nil {
c . logf ( "ListenUDP failed: %v" , err )
} else {
c . conn = conn
}
// Broadcast that a new connection is available.
c . change . Broadcast ( )
}
// closeCurrentConn closes the current connection.
// c.mu must be locked.
func ( c * fwdConn ) closeConnLocked ( ) {
if c . conn == nil {
return
}
// Unblock all readers/writers, wait for them, close the connection.
c . conn . SetDeadline ( aLongTimeAgo )
c . wg . Wait ( )
c . conn . Close ( )
c . conn = nil
}
// close permanently closes c.
func ( c * fwdConn ) close ( ) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
if c . closed {
return
}
c . closed = true
c . closeConnLocked ( )
// Unblock any remaining readers.
c . change . Broadcast ( )
}