control/controlclient: use lite map request handler to avoid aborting streams

Previously, any change to endpoints or hostinfo (or hostinfo's
netinfo) would result in the long-running map request HTTP stream
being torn down and restarted, losing all compression context along
with it.

This change makes us instead send a lite map request (OmitPeers: true,
Stream: false) that doesn't subscribe to anything, and then the
coordination server knows to not close other streams for that node
when it recives a lite request.

Fixes tailscale/corp#797

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/1098/head
Brad Fitzpatrick 4 years ago committed by Brad Fitzpatrick
parent e692e3866b
commit 2f04f49376

@ -124,6 +124,7 @@ type Client struct {
synced bool // true if our netmap is up-to-date synced bool // true if our netmap is up-to-date
hostinfo *tailcfg.Hostinfo hostinfo *tailcfg.Hostinfo
inPollNetMap bool // true if currently running a PollNetMap inPollNetMap bool // true if currently running a PollNetMap
inLiteMapUpdate bool // true if a lite (non-streaming) map request is outstanding
inSendStatus int // number of sendStatus calls currently in progress inSendStatus int // number of sendStatus calls currently in progress
state State state State
@ -201,6 +202,50 @@ func (c *Client) Start() {
go c.mapRoutine() go c.mapRoutine()
} }
// sendNewMapRequest either sends a new OmitPeers, non-streaming map request
// (to just send Hostinfo/Netinfo/Endpoints info, while keeping an existing
// streaming response open), or start a new streaming one if necessary.
//
// It should be called whenever there's something new to tell the server.
func (c *Client) sendNewMapRequest() {
c.mu.Lock()
// If we're not already streaming a netmap, or if we're already stuck
// in a lite update, then tear down everything and start a new stream
// (which starts by sending a new map request)
if !c.inPollNetMap || c.inLiteMapUpdate {
c.mu.Unlock()
c.cancelMapSafely()
return
}
// Otherwise, send a lite update that doesn't keep a
// long-running stream response.
defer c.mu.Unlock()
c.inLiteMapUpdate = true
ctx, cancel := context.WithTimeout(c.mapCtx, 10*time.Second)
go func() {
defer cancel()
t0 := time.Now()
err := c.direct.SendLiteMapUpdate(ctx)
d := time.Since(t0).Round(time.Millisecond)
c.mu.Lock()
c.inLiteMapUpdate = false
c.mu.Unlock()
if err == nil {
c.logf("[v1] successful lite map update in %v", d)
return
}
if ctx.Err() == nil {
c.logf("lite map update after %v: %v", d, err)
}
// Fall back to restarting the long-polling map
// request (the old heavy way) if the lite update
// failed for any reason.
c.cancelMapSafely()
}()
}
func (c *Client) cancelAuth() { func (c *Client) cancelAuth() {
c.mu.Lock() c.mu.Lock()
if c.authCancel != nil { if c.authCancel != nil {
@ -545,7 +590,7 @@ func (c *Client) SetHostinfo(hi *tailcfg.Hostinfo) {
c.logf("Hostinfo: %v", hi) c.logf("Hostinfo: %v", hi)
// Send new Hostinfo to server // Send new Hostinfo to server
c.cancelMapSafely() c.sendNewMapRequest()
} }
func (c *Client) SetNetInfo(ni *tailcfg.NetInfo) { func (c *Client) SetNetInfo(ni *tailcfg.NetInfo) {
@ -553,13 +598,12 @@ func (c *Client) SetNetInfo(ni *tailcfg.NetInfo) {
panic("nil NetInfo") panic("nil NetInfo")
} }
if !c.direct.SetNetInfo(ni) { if !c.direct.SetNetInfo(ni) {
c.logf("[unexpected] duplicate NetInfo: %v", ni)
return return
} }
c.logf("NetInfo: %v", ni) c.logf("NetInfo: %v", ni)
// Send new Hostinfo (which includes NetInfo) to server // Send new Hostinfo (which includes NetInfo) to server
c.cancelMapSafely() c.sendNewMapRequest()
} }
func (c *Client) sendStatus(who string, err error, url string, nm *NetworkMap) { func (c *Client) sendStatus(who string, err error, url string, nm *NetworkMap) {
@ -636,7 +680,7 @@ func (c *Client) Logout() {
func (c *Client) UpdateEndpoints(localPort uint16, endpoints []string) { func (c *Client) UpdateEndpoints(localPort uint16, endpoints []string) {
changed := c.direct.SetEndpoints(localPort, endpoints) changed := c.direct.SetEndpoints(localPort, endpoints)
if changed { if changed {
c.cancelMapSafely() c.sendNewMapRequest()
} }
} }

@ -521,6 +521,18 @@ func inTest() bool { return flag.Lookup("test.v") != nil }
// maxPolls is how many network maps to download; common values are 1 // maxPolls is how many network maps to download; common values are 1
// or -1 (to keep a long-poll query open to the server). // or -1 (to keep a long-poll query open to the server).
func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkMap)) error { func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkMap)) error {
return c.sendMapRequest(ctx, maxPolls, cb)
}
// SendLiteMapUpdate makes a /map request to update the server of our latest state,
// but does not fetch anything. It returns an error if the server did not return a
// successful 200 OK response.
func (c *Direct) SendLiteMapUpdate(ctx context.Context) error {
return c.sendMapRequest(ctx, 1, nil)
}
// cb nil means to omit peers.
func (c *Direct) sendMapRequest(ctx context.Context, maxPolls int, cb func(*NetworkMap)) error {
c.mu.Lock() c.mu.Lock()
persist := c.persist persist := c.persist
serverURL := c.serverURL serverURL := c.serverURL
@ -555,6 +567,7 @@ func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkM
Stream: allowStream, Stream: allowStream,
Hostinfo: hostinfo, Hostinfo: hostinfo,
DebugFlags: c.debugFlags, DebugFlags: c.debugFlags,
OmitPeers: cb == nil,
} }
if hostinfo != nil && ipForwardingBroken(hostinfo.RoutableIPs) { if hostinfo != nil && ipForwardingBroken(hostinfo.RoutableIPs) {
old := request.DebugFlags old := request.DebugFlags
@ -607,6 +620,11 @@ func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkM
} }
defer res.Body.Close() defer res.Body.Close()
if cb == nil {
io.Copy(ioutil.Discard, res.Body)
return nil
}
// If we go more than pollTimeout without hearing from the server, // If we go more than pollTimeout without hearing from the server,
// end the long poll. We should be receiving a keep alive ping // end the long poll. We should be receiving a keep alive ping
// every minute. // every minute.
@ -724,6 +742,14 @@ func (c *Direct) PollNetMap(ctx context.Context, maxPolls int, cb func(*NetworkM
lastParsedPacketFilter = c.parsePacketFilter(pf) lastParsedPacketFilter = c.parsePacketFilter(pf)
} }
// Get latest localPort. This might've changed if
// a lite map update occured meanwhile. This only affects
// the end-to-end test.
// TODO(bradfitz): remove the NetworkMap.LocalPort field entirely.
c.mu.Lock()
localPort = c.localPort
c.mu.Unlock()
nm := &NetworkMap{ nm := &NetworkMap{
NodeKey: tailcfg.NodeKey(persist.PrivateNodeKey.Public()), NodeKey: tailcfg.NodeKey(persist.PrivateNodeKey.Public()),
PrivateKey: persist.PrivateNodeKey, PrivateKey: persist.PrivateNodeKey,

Loading…
Cancel
Save