@ -50,7 +50,7 @@ type relayManager struct {
// ===================================================================
// The following chan fields serve event inputs to a single goroutine,
// runLoop().
allocateHandshakeCh chan * endpoin t
startDiscoveryCh chan endpointWithLastBes t
allocateWorkDoneCh chan relayEndpointAllocWorkDoneEvent
handshakeWorkDoneCh chan relayEndpointHandshakeWorkDoneEvent
cancelWorkCh chan * endpoint
@ -77,8 +77,8 @@ type serverDiscoVNI struct {
// relayHandshakeWork serves to track in-progress relay handshake work for a
// [udprelay.ServerEndpoint]. This structure is immutable once initialized.
type relayHandshakeWork struct {
ep * endpoin t
se udprelay . ServerEndpoint
wlb endpointWithLastBes t
se udprelay . ServerEndpoint
// handshakeServerEndpoint() always writes to doneCh (len 1) when it
// returns. It may end up writing the same event afterward to
@ -97,7 +97,7 @@ type relayHandshakeWork struct {
// [disco.CallMeMaybeVia] reception. This structure is immutable once
// initialized.
type newRelayServerEndpointEvent struct {
ep * endpoin t
wlb endpointWithLastBes t
se udprelay . ServerEndpoint
server netip . AddrPort // zero value if learned via [disco.CallMeMaybeVia]
}
@ -142,9 +142,9 @@ func (r *relayManager) runLoop() {
for {
select {
case ep := <- r . allocateHandshake Ch:
if ! r . hasActiveWorkForEndpointRunLoop ( ep) {
r . allocateAllServersRunLoop ( ep )
case startDiscovery := <- r . startDiscovery Ch:
if ! r . hasActiveWorkForEndpointRunLoop ( startDiscovery. ep) {
r . allocateAllServersRunLoop ( startDiscovery )
}
if ! r . hasActiveWorkRunLoop ( ) {
return
@ -153,7 +153,7 @@ func (r *relayManager) runLoop() {
work , ok := r . allocWorkByEndpoint [ done . work . ep ]
if ok && work == done . work {
// Verify the work in the map is the same as the one that we're
// cleaning up. New events on r. allocateHandshake Ch can
// cleaning up. New events on r. startDiscovery Ch can
// overwrite pre-existing keys.
delete ( r . allocWorkByEndpoint , done . work . ep )
}
@ -237,7 +237,7 @@ func (r *relayManager) init() {
r . handshakeWorkByServerDiscoVNI = make ( map [ serverDiscoVNI ] * relayHandshakeWork )
r . handshakeWorkAwaitingPong = make ( map [ * relayHandshakeWork ] addrPortVNI )
r . addrPortVNIToHandshakeWork = make ( map [ addrPortVNI ] * relayHandshakeWork )
r . allocateHandshake Ch = make ( chan * endpoin t)
r . startDiscovery Ch = make ( chan endpoin tWithLastBes t)
r . allocateWorkDoneCh = make ( chan relayEndpointAllocWorkDoneEvent )
r . handshakeWorkDoneCh = make ( chan relayEndpointHandshakeWorkDoneEvent )
r . cancelWorkCh = make ( chan * endpoint )
@ -273,7 +273,7 @@ func (r *relayManager) ensureDiscoInfoFor(work *relayHandshakeWork) {
di . di = & discoInfo {
discoKey : work . se . ServerDisco ,
discoShort : work . se . ServerDisco . ShortString ( ) ,
sharedKey : work . ep. c . discoPrivate . Shared ( work . se . ServerDisco ) ,
sharedKey : work . wlb. ep. c . discoPrivate . Shared ( work . se . ServerDisco ) ,
}
}
}
@ -306,7 +306,7 @@ func (r *relayManager) discoInfo(serverDisco key.DiscoPublic) (_ *discoInfo, ok
return nil , false
}
func ( r * relayManager ) handleCallMeMaybeVia ( ep * endpoint , dm * disco . CallMeMaybeVia ) {
func ( r * relayManager ) handleCallMeMaybeVia ( ep * endpoint , lastBest addrQuality , lastBestIsTrusted bool , dm * disco . CallMeMaybeVia ) {
se := udprelay . ServerEndpoint {
ServerDisco : dm . ServerDisco ,
LamportID : dm . LamportID ,
@ -316,7 +316,11 @@ func (r *relayManager) handleCallMeMaybeVia(ep *endpoint, dm *disco.CallMeMaybeV
se . BindLifetime . Duration = dm . BindLifetime
se . SteadyStateLifetime . Duration = dm . SteadyStateLifetime
relayManagerInputEvent ( r , nil , & r . newServerEndpointCh , newRelayServerEndpointEvent {
ep : ep ,
wlb : endpointWithLastBest {
ep : ep ,
lastBest : lastBest ,
lastBestIsTrusted : lastBestIsTrusted ,
} ,
se : se ,
} )
}
@ -360,11 +364,19 @@ func relayManagerInputEvent[T any](r *relayManager, ctx context.Context, eventCh
}
}
// allocateAndHandshakeAllServers kicks off allocation and handshaking of relay
// endpoints for 'ep' on all known relay servers if there is no outstanding
// work.
func ( r * relayManager ) allocateAndHandshakeAllServers ( ep * endpoint ) {
relayManagerInputEvent ( r , nil , & r . allocateHandshakeCh , ep )
// endpointWithLastBest represents an [*endpoint], its last bestAddr, and if
// the last bestAddr was trusted (see endpoint.trustBestAddrUntil) at the time
// of init. This structure is immutable once initialized.
type endpointWithLastBest struct {
ep * endpoint
lastBest addrQuality
lastBestIsTrusted bool
}
// startUDPRelayPathDiscoveryFor starts UDP relay path discovery for ep on all
// known relay servers if ep has no in-progress work.
func ( r * relayManager ) startUDPRelayPathDiscoveryFor ( ep * endpoint , lastBest addrQuality , lastBestIsTrusted bool ) {
relayManagerInputEvent ( r , nil , & r . startDiscoveryCh , endpointWithLastBest { ep , lastBest , lastBestIsTrusted } )
}
// stopWork stops all outstanding allocation & handshaking work for 'ep'.
@ -432,7 +444,7 @@ func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDisc
r . addrPortVNIToHandshakeWork [ apv ] = work
case * disco . Ping :
// Always TX a pong. We might not have any associated work if ping
// reception raced with our call to [endpoint. r elayEndpointReady()], so
// reception raced with our call to [endpoint. udpR elayEndpointReady()], so
// err on the side of enabling the remote side to use this path.
//
// Conn.handlePingLocked() makes efforts to suppress duplicate pongs
@ -473,7 +485,7 @@ func (r *relayManager) handleRxHandshakeDiscoMsgRunLoop(event relayHandshakeDisc
}
func ( r * relayManager ) handleHandshakeWorkDoneRunLoop ( done relayEndpointHandshakeWorkDoneEvent ) {
byServerDisco , ok := r . handshakeWorkByEndpointByServerDisco [ done . work . ep]
byServerDisco , ok := r . handshakeWorkByEndpointByServerDisco [ done . work . wlb. ep]
if ! ok {
return
}
@ -483,7 +495,7 @@ func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshak
}
delete ( byServerDisco , done . work . se . ServerDisco )
if len ( byServerDisco ) == 0 {
delete ( r . handshakeWorkByEndpointByServerDisco , done . work . ep)
delete ( r . handshakeWorkByEndpointByServerDisco , done . work . wlb. ep)
}
delete ( r . handshakeWorkByServerDiscoVNI , serverDiscoVNI { done . work . se . ServerDisco , done . work . se . VNI } )
apv , ok := r . handshakeWorkAwaitingPong [ work ]
@ -499,10 +511,15 @@ func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshak
vni := virtualNetworkID { }
vni . set ( done . work . se . VNI )
addr := epAddr { ap : done . pongReceivedFrom , vni : vni }
// ep. r elayEndpointReady() must be called in a new goroutine to prevent
// ep. udpR elayEndpointReady() must be called in a new goroutine to prevent
// deadlocks as it acquires [endpoint] & [Conn] mutexes. See [relayManager]
// docs for details.
go done . work . ep . relayEndpointReady ( addr , done . latency )
go done . work . wlb . ep . udpRelayEndpointReady ( addrQuality {
epAddr : addr ,
relayServerDisco : done . work . se . ServerDisco ,
latency : done . latency ,
wireMTU : pingSizeToPktLen ( 0 , addr ) ,
} )
}
func ( r * relayManager ) handleNewServerEndpointRunLoop ( newServerEndpoint newRelayServerEndpointEvent ) {
@ -525,7 +542,7 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay
}
// Check for duplicate work by [*endpoint] + server disco.
byServerDisco , ok := r . handshakeWorkByEndpointByServerDisco [ newServerEndpoint . ep]
byServerDisco , ok := r . handshakeWorkByEndpointByServerDisco [ newServerEndpoint . wlb. ep]
if ok {
existingWork , ok := byServerDisco [ newServerEndpoint . se . ServerDisco ]
if ok {
@ -569,10 +586,40 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay
}
}
if newServerEndpoint . server . IsValid ( ) {
// Send a [disco.CallMeMaybeVia] to the remote peer if we allocated this
// endpoint, regardless of if we start a handshake below.
go r . sendCallMeMaybeVia ( newServerEndpoint . wlb . ep , newServerEndpoint . se )
}
lastBestMatchingServer := newServerEndpoint . se . ServerDisco . Compare ( newServerEndpoint . wlb . lastBest . relayServerDisco ) == 0
if lastBestMatchingServer && newServerEndpoint . wlb . lastBestIsTrusted {
// This relay server endpoint is the same as [endpoint]'s bestAddr at
// the time UDP relay path discovery was started, and it was also a
// trusted path (see endpoint.trustBestAddrUntil), so return early.
//
// If we were to start a new handshake, there is a chance that we
// cause [endpoint] to blackhole some packets on its bestAddr if we end
// up shifting to a new address family or src, e.g. IPv4 to IPv6, due to
// the window of time between the handshake completing, and our call to
// udpRelayEndpointReady(). The relay server can only forward packets
// from us on a single [epAddr].
return
}
// TODO(jwhited): if lastBest is untrusted, consider some strategies
// to reduce the chance we blackhole if it were to transition to
// trusted during/before the new handshake:
// 1. Start by attempting a handshake with only lastBest.epAddr. If
// that fails then try the remaining [epAddr]s.
// 2. Signal bestAddr trust transitions between [endpoint] and
// [relayManager] in order to prevent a handshake from starting
// and/or stop one that is running.
// We're ready to start a new handshake.
ctx , cancel := context . WithCancel ( context . Background ( ) )
work := & relayHandshakeWork {
ep : newServerEndpoint . ep ,
wlb: newServerEndpoint . wlb ,
se : newServerEndpoint . se ,
rxDiscoMsgCh : make ( chan relayHandshakeDiscoMsgEvent ) ,
doneCh : make ( chan relayEndpointHandshakeWorkDoneEvent , 1 ) ,
@ -581,16 +628,11 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay
}
if byServerDisco == nil {
byServerDisco = make ( map [ key . DiscoPublic ] * relayHandshakeWork )
r . handshakeWorkByEndpointByServerDisco [ newServerEndpoint . ep] = byServerDisco
r . handshakeWorkByEndpointByServerDisco [ newServerEndpoint . wlb. ep] = byServerDisco
}
byServerDisco [ newServerEndpoint . se . ServerDisco ] = work
r . handshakeWorkByServerDiscoVNI [ sdv ] = work
if newServerEndpoint . server . IsValid ( ) {
// Send CallMeMaybeVia to the remote peer if we allocated this endpoint.
go r . sendCallMeMaybeVia ( work . ep , work . se )
}
r . handshakeGeneration ++
if r . handshakeGeneration == 0 { // generation must be nonzero
r . handshakeGeneration ++
@ -633,7 +675,8 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat
work . cancel ( )
} ( )
epDisco := work . ep . disco . Load ( )
ep := work . wlb . ep
epDisco := ep . disco . Load ( )
if epDisco == nil {
return
}
@ -653,7 +696,7 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat
for _ , addrPort := range work . se . AddrPorts {
if addrPort . IsValid ( ) {
sentBindAny = true
go work. ep. c . sendDiscoMessage ( epAddr { ap : addrPort , vni : vni } , key . NodePublic { } , work . se . ServerDisco , bind , discoVerboseLog )
go ep. c . sendDiscoMessage ( epAddr { ap : addrPort , vni : vni } , key . NodePublic { } , work . se . ServerDisco , bind , discoVerboseLog )
}
}
if ! sentBindAny {
@ -684,15 +727,15 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat
sentPingAt [ txid ] = time . Now ( )
ping := & disco . Ping {
TxID : txid ,
NodeKey : work. ep. c . publicKeyAtomic . Load ( ) ,
NodeKey : ep. c . publicKeyAtomic . Load ( ) ,
}
go func ( ) {
if withAnswer != nil {
answer := & disco . BindUDPRelayEndpointAnswer { BindUDPRelayEndpointCommon : common }
answer . Challenge = * withAnswer
work. ep. c . sendDiscoMessage ( epAddr { ap : to , vni : vni } , key . NodePublic { } , work . se . ServerDisco , answer , discoVerboseLog )
ep. c . sendDiscoMessage ( epAddr { ap : to , vni : vni } , key . NodePublic { } , work . se . ServerDisco , answer , discoVerboseLog )
}
work. ep. c . sendDiscoMessage ( epAddr { ap : to , vni : vni } , key . NodePublic { } , epDisco . key , ping , discoVerboseLog )
ep. c . sendDiscoMessage ( epAddr { ap : to , vni : vni } , key . NodePublic { } , epDisco . key , ping , discoVerboseLog )
} ( )
}
@ -760,17 +803,17 @@ func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork, generat
}
}
func ( r * relayManager ) allocateAllServersRunLoop ( ep * endpoin t) {
func ( r * relayManager ) allocateAllServersRunLoop ( wlb endpointWithLastBes t) {
if len ( r . serversByAddrPort ) == 0 {
return
}
ctx , cancel := context . WithCancel ( context . Background ( ) )
started := & relayEndpointAllocWork { ep : ep, cancel : cancel , wg : & sync . WaitGroup { } }
started := & relayEndpointAllocWork { ep : wlb. ep, cancel : cancel , wg : & sync . WaitGroup { } }
for k := range r . serversByAddrPort {
started . wg . Add ( 1 )
go r . allocateSingleServer ( ctx , started . wg , k , ep )
go r . allocateSingleServer ( ctx , started . wg , k , wlb )
}
r . allocWorkByEndpoint [ ep] = started
r . allocWorkByEndpoint [ wlb. ep] = started
go func ( ) {
started . wg . Wait ( )
relayManagerInputEvent ( r , ctx , & r . allocateWorkDoneCh , relayEndpointAllocWorkDoneEvent { work : started } )
@ -829,25 +872,25 @@ func doAllocate(ctx context.Context, server netip.AddrPort, discoKeys [2]key.Dis
}
}
func ( r * relayManager ) allocateSingleServer ( ctx context . Context , wg * sync . WaitGroup , server netip . AddrPort , ep * endpoin t) {
func ( r * relayManager ) allocateSingleServer ( ctx context . Context , wg * sync . WaitGroup , server netip . AddrPort , wlb endpointWithLastBes t) {
// TODO(jwhited): introduce client metrics counters for notable failures
defer wg . Done ( )
remoteDisco := ep. disco . Load ( )
remoteDisco := wlb. ep. disco . Load ( )
if remoteDisco == nil {
return
}
firstTry := true
for {
se , err := doAllocate ( ctx , server , [ 2 ] key . DiscoPublic { ep. c . discoPublic , remoteDisco . key } )
se , err := doAllocate ( ctx , server , [ 2 ] key . DiscoPublic { wlb. ep. c . discoPublic , remoteDisco . key } )
if err == nil {
relayManagerInputEvent ( r , ctx , & r . newServerEndpointCh , newRelayServerEndpointEvent {
ep: ep ,
wlb: wlb ,
se : se ,
server : server , // we allocated this endpoint (vs CallMeMaybeVia reception), mark it as such
} )
return
}
ep. c . logf ( "[v1] magicsock: relayManager: error allocating endpoint on %v for %v: %v" , server , ep . discoShort ( ) , err )
wlb. ep. c . logf ( "[v1] magicsock: relayManager: error allocating endpoint on %v for %v: %v" , server , wlb . ep . discoShort ( ) , err )
var notReady errNotReady
if firstTry && errors . As ( err , & notReady ) {
select {